Skip to content

Commit

Permalink
[SYSTEMML-2164] Improved serialization of compressed matrix blocks
Browse files Browse the repository at this point in the history
Compressed matrix blocks have the option to share common dictionaries of
distinct values per column group across groups in DDC1 encoding format
(dense dictionary coding, 1byte aligned). This feature is very useful
for small block sizes (e.g., default 1K) and common value domains as is
often the case for image data. However, so far each group redundantly
serialized/deserialized the common dictionary with subsequent recovery
of the in-memory shared dictionary. 

This patch improves this code path to serialize (and thus deserialize)
the common dictionary just once, which reduces - for large datasets that
exceed aggregate cluster memory - the size on disk and related I/O
costs, as well as deserialization and GC overheads.
  • Loading branch information
mboehm7 committed Mar 2, 2018
1 parent 622d36c commit b00a665
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 10 deletions.
23 changes: 22 additions & 1 deletion src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ public long estimateInMemorySize() {
public abstract void write(DataOutput out)
throws IOException;

/**
* Serializes column group to data output.
*
* @param out data output
* @param skipDict skip shared dictionary
* @throws IOException if IOException occurs
*/
public void write(DataOutput out, boolean skipDict) throws IOException {
write(out); //skipDict ignored by default
}

/**
* Deserializes column group from data input.
*
Expand All @@ -197,7 +208,17 @@ public abstract void write(DataOutput out)
*/
public abstract void readFields(DataInput in)
throws IOException;


/**
* Deserializes column group from data input.
*
* @param in data input
* @param skipDict skip shared dictionary
* @throws IOException if IOException occurs
*/
public void readFields(DataInput in, boolean skipDict) throws IOException {
readFields(in); //skipDict ignored by default
}

/**
* Returns the exact serialized size of column group.
Expand Down
26 changes: 20 additions & 6 deletions src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public void recodeData(HashMap<Double,Integer> map) {

@Override
public void write(DataOutput out) throws IOException {
write(out, false);
}

@Override
public void write(DataOutput out, boolean skipDict) throws IOException {
int numCols = getNumCols();
int numVals = getNumValues();
out.writeInt(_numRows);
Expand All @@ -129,16 +134,23 @@ public void write(DataOutput out) throws IOException {
out.writeInt( _colIndexes[i] );

//write distinct values
for( int i=0; i<_values.length; i++ )
out.writeDouble(_values[i]);

if( !skipDict ) {
for( int i=0; i<_values.length; i++ )
out.writeDouble(_values[i]);
}

//write data
for( int i=0; i<_numRows; i++ )
out.writeByte(_data[i]);
}

@Override
public void readFields(DataInput in) throws IOException {
readFields(in, false);
}

@Override
public void readFields(DataInput in, boolean skipDict) throws IOException {
_numRows = in.readInt();
int numCols = in.readInt();
int numVals = in.readInt();
Expand All @@ -149,9 +161,11 @@ public void readFields(DataInput in) throws IOException {
_colIndexes[i] = in.readInt();

//read distinct values
_values = new double[numVals*numCols];
for( int i=0; i<numVals*numCols; i++ )
_values[i] = in.readDouble();
if( !skipDict || numCols!=1 ) {
_values = new double[numVals*numCols];
for( int i=0; i<numVals*numCols; i++ )
_values[i] = in.readDouble();
}

//read data
_data = new byte[_numRows];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,9 @@ public void readFields(DataInput in)
grp = new ColGroupDDC2(); break;
}

//deserialize and add column group
grp.readFields(in);
//deserialize and add column group (flag for shared dictionary passed
//and numCols evaluated in DDC1 because numCols not available yet
grp.readFields(in, sharedDict!=null);

//use shared DDC1 dictionary if applicable
if( _sharedDDC1Dict && grp.getNumCols()==1
Expand Down Expand Up @@ -875,9 +876,13 @@ public void write(DataOutput out)
out.writeBoolean(_sharedDDC1Dict);
out.writeInt(_colGroups.size());

boolean skipDict = false;
for( ColGroup grp : _colGroups ) {
boolean shared = (grp instanceof ColGroupDDC1
&& _sharedDDC1Dict && grp.getNumCols()==1);
out.writeByte( grp.getCompType().ordinal() );
grp.write(out); //delegate serialization
grp.write(out, skipDict & shared); //delegate serialization
skipDict |= shared;
}
}

Expand Down

0 comments on commit b00a665

Please sign in to comment.