Skip to content

Commit

Permalink
[SYSTEMDS-3262] CLA Offset memorizer
Browse files Browse the repository at this point in the history
This commit reintroduce a memorizer for the offsets if a offset is
requested and it is not to be fund in the thread local cache, it will
populate a global static memorizer with a iterator for this index,
this allows queries to first try the thread local version of the
cache and use this if it is appropriate and then fall back to the
memorizer.

If operations adhere to use the thread local cache once jobs are allocated
this implementation is memory friendly and effecient. But if threads are
allocated with small jobs that in turn populate this cache it becomes bad.
In practice decompression now is designed to only hit the memorizer once
because each thread have a portion of rows to process, and there is no
extra jobs allocated making the memorizer Iterator * Threads size.
  • Loading branch information
Baunsgaard committed Dec 28, 2021
1 parent ccd6a36 commit 5873e09
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,31 @@ protected void decompressToDenseBlockSparseDictionary(DenseBlock db, int rl, int
@Override
protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
final int nCol = _colIndexes.length;
for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
final double[] c = db.values(offT);
final int off = db.pos(offT) + offC;
final int rowIndex = _data.getIndex(i) * nCol;
for(int j = 0; j < nCol; j++)
c[off + _colIndexes[j]] += values[rowIndex + j];
if(db.isContiguous() && _colIndexes.length == 1)
decompressToDenseBlockDenseDictSingleColContiguous(db, rl, ru, offR, offC, values);
else {
// generic
final int nCol = _colIndexes.length;
for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
final double[] c = db.values(offT);
final int off = db.pos(offT) + offC;
final int rowIndex = _data.getIndex(i) * nCol;
for(int j = 0; j < nCol; j++)
c[off + _colIndexes[j]] += values[rowIndex + j];
}
}
}

private void decompressToDenseBlockDenseDictSingleColContiguous(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
final double[] c = db.values(0);
final int nCols = db.getDim(1);
final int colOff = _colIndexes[0] + offC;
for(int i = rl, offT = (rl + offR) * nCols + colOff; i < ru; i++, offT += nCols)
c[offT] += values[_data.getIndex(i)];

}

@Override
protected void decompressToSparseBlockSparseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
SparseBlock sb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ else if(it != null && ru >= indexes.getOffsetToLast()) {
}
else if(it != null) {
while(r < ru) {
if(it.value() == r)
c[r] += preAgg[data.getIndex(it.getDataIndexAndIncrement())];
if(it.value() == r){
c[r] += preAgg[data.getIndex(it.getDataIndex())];
it.next();
}
else
c[r] += def;
r++;
Expand Down Expand Up @@ -168,8 +170,10 @@ else if(it != null && ru >= indexes.getOffsetToLast()) {
}
else if(it != null) {
while(r < ru) {
if(it.value() == r)
c[r] = builtin.execute(c[r], vals[data.getIndex(it.getDataIndexAndIncrement())]);
if(it.value() == r){
c[r] = builtin.execute(c[r], vals[data.getIndex(it.getDataIndex())]);
it.next();
}
else
c[r] = builtin.execute(c[r], def);
r++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,6 @@ public int getOffsetsIndex() {
return index;
}

/**
* Get the current data index and increment the pointers using the next operator.
*
* @return The current data index.
*/
public int getDataIndexAndIncrement() {
int x = dataIndex;
next();
return x;
}

/**
* Skip values until index is achieved.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
Expand All @@ -44,13 +46,17 @@ public abstract class AOffset implements Serializable {

protected static final Log LOG = LogFactory.getLog(AOffset.class.getName());

/** Thread local cache for a single recently used Iterator, this is used for cache blocking */
private ThreadLocal<OffsetCache> cacheRow = new ThreadLocal<OffsetCache>() {
@Override
protected OffsetCache initialValue() {
return null;
}
};

/** Memorizer for the row indexes mostly used for when we parallelize across rows */
private Map<Integer, AIterator> memorizer = null;

/**
* Get an iterator of the offsets.
*
Expand All @@ -73,19 +79,25 @@ else if(row > getOffsetToLast())
// try the cache first.
OffsetCache c = cacheRow.get();
if(c == null) {
if(memorizer != null && memorizer.containsKey(row))
return memorizer.get(row).clone();
AIterator it = getIterator();
it.skipTo(row);
cacheIterator(it.clone(), row);
memorizeIterator(it.clone(), row);
return it;
}
else if(c.row == row)
return c.it.clone();
else {
if(memorizer != null && memorizer.containsKey(row))
return memorizer.get(row).clone();
// Use the cached iterator if it is closer to the queried row.
AIterator it = c.row < row ? c.it.clone() : getIterator();
it.skipTo(row);
// cache this new iterator.
cacheIterator(it.clone(), row);
memorizeIterator(it.clone(), row);
return it;
}

Expand All @@ -103,6 +115,14 @@ public void cacheIterator(AIterator it, int row) {
cacheRow.set(new OffsetCache(it, row));
}

private void memorizeIterator(AIterator it, int row) {
if(it == null)
return;
else if(memorizer == null)
memorizer = new HashMap<>();
memorizer.put(row, it);
}

/**
* Write the offsets to disk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class OffsetByte extends AOffset {
private final int offsetToFirst;
private final int offsetToLast;
private final boolean noOverHalf;
private final boolean noZero;

public OffsetByte(int[] indexes) {
this(indexes, 0, indexes.length);
Expand Down Expand Up @@ -72,32 +73,39 @@ public OffsetByte(int[] indexes, int apos, int alen) {

ov = nv;
}
boolean noOverHalf = true;
for(byte b : offsets)
if(b < 0) {
noOverHalf = false;
break;
}
this.noOverHalf = noOverHalf;

this.noOverHalf = getNoOverHalf();
this.noZero = getNoZero();
}

protected OffsetByte(byte[] offsets, int offsetToFirst, int offsetToLast) {
this.offsets = offsets;
this.offsetToFirst = offsetToFirst;
this.offsetToLast = offsetToLast;
this.noOverHalf = getNoOverHalf();
this.noZero = getNoZero();
}

private boolean getNoOverHalf() {
boolean noOverHalf = true;
for(byte b : offsets)
if(b < 0) {
if(b < 1) {
noOverHalf = false;
break;
}
return noOverHalf;
}

private boolean getNoZero() {
boolean noZero = true;
for(byte b : offsets)
if(b == 0) {
noZero = false;
break;
}
return noZero;
}

@Override
public IterateByteOffset getIterator() {
return new IterateByteOffset();
Expand Down Expand Up @@ -172,7 +180,6 @@ public static OffsetByte readFields(DataInput in) throws IOException {
protected final void preAggregateDenseMapRowByte(double[] mV, int off, double[] preAV, int cu, int nVal, byte[] data,
AIterator it) {
IterateByteOffset itb = (IterateByteOffset) it;
final boolean noZero = offsets.length == data.length - 1;
if(cu < offsetToLast + 1) {
final boolean nvalHalf = nVal < 127;
if(noOverHalf && noZero && nvalHalf)
Expand Down Expand Up @@ -616,8 +623,20 @@ public void next() {

@Override
public int skipTo(int idx) {
while(offset < idx && index < offsets.length)
next();
if(noOverHalf) {
while(offset < idx && index < offsets.length) {
byte v = offsets[index];
offset += v;
index++;
}
dataIndex = index;
}
else if(idx < offsetToLast)
while(offset < idx)
next();
else
while(offset < idx && index < offsets.length)
next();
return offset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,8 @@ private static MatrixBlock decompressExecute(CompressedMatrixBlock cmb, int k) {
ret.allocateDenseBlock();
}

// final int block = (int) Math.ceil((double) (CompressionSettings.BITMAP_BLOCK_SZ) / nCols);
// final int blklen = Math.max(block, 64);
final int blklen = 32;

// final int blklen = block > 1000 ? block + 1000 - block % 1000 : Math.max(64, block);
// final int blklen = Math.max(nRows / (k * 2), 512);
final int blklen = Math.max(nRows / k , 512);

// check if we are using filtered groups, and if we are not force constV to null
if(groups == filteredGroups)
Expand Down Expand Up @@ -317,13 +314,19 @@ protected DecompressDenseTask(List<AColGroup> colGroups, MatrixBlock ret, double

@Override
public Long call() {
for(AColGroup grp : _colGroups)
grp.decompressToDenseBlock(_ret.getDenseBlock(), _rl, _ru);

if(_constV != null)
addVector(_ret, _constV, _eps, _rl, _ru);
final int blk = 1024;
long nnz = 0;
for(int b = _rl; b < _ru; b+= blk){
int e = Math.min(b + blk , _ru);
for(AColGroup grp : _colGroups)
grp.decompressToDenseBlock(_ret.getDenseBlock(), b, e);

if(_constV != null)
addVector(_ret, _constV, _eps, b, e);
nnz += _overlapping ? 0 : _ret.recomputeNonZeros(b, e - 1);
}

return _overlapping ? 0 : _ret.recomputeNonZeros(_rl, _ru - 1);
return nnz;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public void testAskForLastElement() {
}

@Test
public void testAskForLastElementP1IsNull(){
public void testAskForLastElementP1IsNull() {
if(data.length == 2)
assertTrue(o.getIterator(data[1] + 1) == null);
}
Expand Down Expand Up @@ -383,7 +383,8 @@ public void testGetDataAfterNextComb2() {
}

public void testGetDataAfterNextCombN(AIterator it) {
int d = it.getDataIndexAndIncrement();
int d = it.getDataIndex();
it.next();
assertEquals(d + 1, it.getDataIndex());
}

Expand Down

0 comments on commit 5873e09

Please sign in to comment.