Skip to content

Commit

Permalink
remove triples file, better tests/asserts without sysouts
Browse files Browse the repository at this point in the history
  • Loading branch information
ate47 committed May 19, 2022
1 parent 5ee6368 commit 9409243
Show file tree
Hide file tree
Showing 18 changed files with 530 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ public long get(long position) {

@Override
public void set(long position, long value) {
//if(value<0 || value>maxvalue) {
//throw new IllegalArgumentException("Value exceeds the maximum for this data structure");
//}
if (value<0 || value>maxvalue) {
throw new IllegalArgumentException("Value exceeds the maximum for this data structure");
}
//System.out.println("numbits "+this.numbits);
setField(data, numbits, position, value);
}
Expand Down Expand Up @@ -296,4 +296,4 @@ public void close() throws IOException {
}
data=null;
}
}
}
19 changes: 13 additions & 6 deletions hdt-java-core/src/main/java/org/rdfhdt/hdt/hdt/HDTManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.rdfhdt.hdt.util.concurrent.TreeWorker;
import org.rdfhdt.hdt.util.io.CloseSuppressPath;
import org.rdfhdt.hdt.util.io.IOUtil;
import org.rdfhdt.hdt.util.io.compress.CompressTripleReader;
import org.rdfhdt.hdt.util.io.compress.MapCompressTripleMerger;
import org.rdfhdt.hdt.util.io.compress.TripleGenerator;
import org.rdfhdt.hdt.util.listener.ListenerUtil;

import java.io.File;
Expand Down Expand Up @@ -257,8 +257,9 @@ public HDT doGenerateHDTDisk(Iterator<TripleString> iterator, String baseURI, HD
FileTripleIterator triplesFile = new FileTripleIterator(iterator, chunkSize / 3);

CompressionResult compressionResult;
try (SectionCompressor sectionCompressor = new SectionCompressor(basePath, triplesFile, listener)) {
compressionResult = sectionCompressor.compress(workers, compressMode);
try {
compressionResult = new SectionCompressor(basePath, triplesFile, listener)
.compress(workers, compressMode);
} catch (TreeWorker.TreeWorkerException | InterruptedException e) {
throw new ParserException(e);
}
Expand Down Expand Up @@ -294,8 +295,14 @@ public HDT doGenerateHDTDisk(Iterator<TripleString> iterator, String baseURI, HD
TripleCompressionResult tripleCompressionResult;
TriplesPrivate triples = hdt.getTriples();
TripleComponentOrder order = triples.getOrder();
try (CompressTripleReader tripleReader = new CompressTripleReader(compressionResult.getTriples().openInputStream(true))) {
MapCompressTripleMerger tripleMapper = new MapCompressTripleMerger(basePath, new FileTripleIDIterator(tripleReader.asIterator(), chunkSize), mapper, listener, order);
try {
MapCompressTripleMerger tripleMapper = new MapCompressTripleMerger(
basePath,
new FileTripleIDIterator(new TripleGenerator(compressionResult.getTripleCount()), chunkSize),
mapper,
listener,
order
);
tripleCompressionResult = tripleMapper.merge(workers, compressMode);
} catch (TreeWorker.TreeWorkerException | InterruptedException e) {
throw new ParserException(e);
Expand Down Expand Up @@ -380,4 +387,4 @@ protected HDT doHDTDiffBit(String location, String hdtFileName, Bitmap deleteBit
hdt.diffBit(location, hdtOriginal, deleteBitmap, listener);
return hdt;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public CompressTripleMapper(CloseSuppressPath location, long tripleCount) {
locationPredicates = location.resolve("map_predicates");
locationObjects = location.resolve("map_objects");
int numbits = BitUtil.log2(tripleCount + 2) + CompressUtil.INDEX_SHIFT;
subjects = new SequenceLog64BigDisk(locationSubjects.toAbsolutePath().toString(), numbits, tripleCount + 2);
predicates = new SequenceLog64BigDisk(locationPredicates.toAbsolutePath().toString(), numbits, tripleCount + 2);
objects = new SequenceLog64BigDisk(locationObjects.toAbsolutePath().toString(), numbits, tripleCount + 2);
subjects = new SequenceLog64BigDisk(locationSubjects.toAbsolutePath().toString(), numbits, tripleCount + 2, true);
predicates = new SequenceLog64BigDisk(locationPredicates.toAbsolutePath().toString(), numbits, tripleCount + 2, true);
objects = new SequenceLog64BigDisk(locationObjects.toAbsolutePath().toString(), numbits, tripleCount + 2, true);
}

/**
Expand All @@ -56,16 +56,22 @@ public void delete() {

@Override
public void onSubject(long preMapId, long newMapId) {
assert preMapId > 0;
assert newMapId >= CompressUtil.getHeaderId(1);
subjects.set(preMapId, newMapId);
}

@Override
public void onPredicate(long preMapId, long newMapId) {
assert preMapId > 0;
assert newMapId >= CompressUtil.getHeaderId(1);
predicates.set(preMapId, newMapId);
}

@Override
public void onObject(long preMapId, long newMapId) {
assert preMapId > 0;
assert newMapId >= CompressUtil.getHeaderId(1);
objects.set(preMapId, newMapId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ public interface CompressionResult extends Closeable {
*/
String COMPRESSION_MODE_COMPLETE = HDTOptionsKeys.LOADER_DISK_COMPRESSION_MODE_VALUE_COMPLETE;

/**
* @return the triples file
*/
CloseSuppressPath getTriples();
/**
* @return the number of triple
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.rdfhdt.hdt.iterator.utils.ExceptionIterator;
import org.rdfhdt.hdt.triples.IndexedNode;
import org.rdfhdt.hdt.util.io.CloseSuppressPath;
import org.rdfhdt.hdt.util.io.IOUtil;
import org.rdfhdt.hdt.util.io.compress.CompressNodeReader;

Expand All @@ -13,27 +12,20 @@
* @author Antoine Willerval
*/
public class CompressionResultFile implements CompressionResult {
private final CloseSuppressPath triples;
private final long tripleCount;
private final CompressNodeReader subjects;
private final CompressNodeReader predicates;
private final CompressNodeReader objects;
private final SectionCompressor.TripleFile sections;

public CompressionResultFile(CloseSuppressPath triples, long tripleCount, SectionCompressor.TripleFile sections) throws IOException {
this.triples = triples;
public CompressionResultFile(long tripleCount, SectionCompressor.TripleFile sections) throws IOException {
this.tripleCount = tripleCount;
this.subjects = new CompressNodeReader(sections.openRSubject());
this.predicates = new CompressNodeReader(sections.openRPredicate());
this.objects = new CompressNodeReader(sections.openRObject());
this.sections = sections;
}

@Override
public CloseSuppressPath getTriples() {
return triples;
}

@Override
public long getTripleCount() {
return tripleCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import org.rdfhdt.hdt.iterator.utils.ExceptionIterator;
import org.rdfhdt.hdt.triples.IndexedNode;
import org.rdfhdt.hdt.util.io.CloseSuppressPath;
import org.rdfhdt.hdt.util.io.IOUtil;
import org.rdfhdt.hdt.util.io.compress.CompressNodeMergeIterator;
import org.rdfhdt.hdt.util.io.compress.CompressNodeReader;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
Expand All @@ -21,18 +19,16 @@
*/
public class CompressionResultPartial implements CompressionResult {
private final List<CompressNodeReaderTriple> files;
private final CloseSuppressPath triples;
private final long triplesCount;
private final ExceptionIterator<IndexedNode, IOException> subject;
private final ExceptionIterator<IndexedNode, IOException> predicate;
private final ExceptionIterator<IndexedNode, IOException> object;

public CompressionResultPartial(List<SectionCompressor.TripleFile> files, CloseSuppressPath triples, long triplesCount) throws IOException {
public CompressionResultPartial(List<SectionCompressor.TripleFile> files, long triplesCount) throws IOException {
this.files = new ArrayList<>(files.size());
for (SectionCompressor.TripleFile file : files) {
this.files.add(new CompressNodeReaderTriple(file));
}
this.triples = triples;
this.triplesCount = triplesCount;

// building iterator trees
Expand All @@ -55,11 +51,6 @@ private ExceptionIterator<IndexedNode, IOException> createBTree(int start, int e
return new CompressNodeMergeIterator(left, right);
}

@Override
public CloseSuppressPath getTriples() {
return triples;
}

@Override
public long getTripleCount() {
return triplesCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.rdfhdt.hdt.util.concurrent.TreeWorker;
import org.rdfhdt.hdt.util.io.CloseSuppressPath;
import org.rdfhdt.hdt.util.io.IOUtil;
import org.rdfhdt.hdt.util.io.compress.CompressTripleWriter;
import org.rdfhdt.hdt.util.io.compress.CompressUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,15 +25,13 @@
* Tree worker object to compress the section of a triple stream into 3 sections (SPO) and a compress triple file
* @author Antoine Willerval
*/
public class SectionCompressor implements Closeable, TreeWorker.TreeWorkerObject<SectionCompressor.TripleFile> {
public class SectionCompressor implements TreeWorker.TreeWorkerObject<SectionCompressor.TripleFile> {
private static final AtomicInteger ID_INC = new AtomicInteger();
private static final Logger log = LoggerFactory.getLogger(SectionCompressor.class);

private final CloseSuppressPath baseFileName;
private final FileTripleIterator source;
private final CompressTripleWriter writer;
private boolean done;
private final CloseSuppressPath triplesOutput;
private final MultiThreadListener listener;
private long triples = 0;
private final IdFetcher subjectIdFetcher = new IdFetcher();
Expand All @@ -44,12 +41,10 @@ public class SectionCompressor implements Closeable, TreeWorker.TreeWorkerObject
private final ParallelSortableArrayList<IndexedNode> predicates = new ParallelSortableArrayList<>(IndexedNode[].class);
private final ParallelSortableArrayList<IndexedNode> objects = new ParallelSortableArrayList<>(IndexedNode[].class);

public SectionCompressor(CloseSuppressPath baseFileName, FileTripleIterator source, MultiThreadListener listener) throws IOException {
public SectionCompressor(CloseSuppressPath baseFileName, FileTripleIterator source, MultiThreadListener listener) {
this.source = source;
this.listener = listener;
this.baseFileName = baseFileName;
this.triplesOutput = baseFileName.resolve("triple.raw");
this.writer = new CompressTripleWriter(triplesOutput.openOutputStream(true));
}

/**
Expand All @@ -68,7 +63,6 @@ public SectionCompressor.TripleFile get() {
subjects.clear();
predicates.clear();
objects.clear();
IndexedNode lastS = IndexedNode.UNKNOWN, lastP = IndexedNode.UNKNOWN, lastO = IndexedNode.UNKNOWN;
IndexedTriple triple = new IndexedTriple();
listener.notifyProgress(10, "reading triples " + triples);
while (source.hasNext()) {
Expand All @@ -78,34 +72,31 @@ public SectionCompressor.TripleFile get() {
continue;
}
TripleString next = source.next();

// get indexed mapped char sequence
CharSequence sc = convertSubject(next.getSubject());
long s = subjectIdFetcher.getNodeId();
if (s != lastS.getIndex()) {
// create new node if not the same as the previous one
subjects.add(lastS = new IndexedNode(sc, s));
}
IndexedNode subjectNode = new IndexedNode(
convertSubject(next.getSubject()),
subjectIdFetcher.getNodeId()
);
subjects.add(subjectNode);

// get indexed mapped char sequence
CharSequence pc = convertPredicate(next.getPredicate());
long p = predicateIdFetcher.getNodeId();
if (p != lastP.getIndex()) {
// create new node if not the same as the previous one
predicates.add(lastP = new IndexedNode(pc, p));
}
IndexedNode predicateNode = new IndexedNode(
convertPredicate(next.getPredicate()),
predicateIdFetcher.getNodeId()
);
predicates.add(predicateNode);

// get indexed mapped char sequence
CharSequence oc = convertObject(next.getObject());
long o = objectIdFetcher.getNodeId();
if (o != lastO.getIndex()) {
// create new node if not the same as the previous one
objects.add(lastO = new IndexedNode(oc, o));
}
IndexedNode objectNode = new IndexedNode(
convertObject(next.getObject()),
objectIdFetcher.getNodeId()
);
objects.add(objectNode);

// load the map triple and write it in the writer
triple.load(lastS, lastP, lastO);
triple.load(subjectNode, predicateNode, objectNode);
triples++;
writer.appendTriple(triple);

if (triples % 100_000 == 0) {
listener.notifyProgress(10, "reading triples " + triples);
Expand Down Expand Up @@ -204,11 +195,6 @@ public void delete(SectionCompressor.TripleFile f) {
f.close();
}

@Override
public void close() throws IOException {
writer.close();
}

/*
* FIXME: create a factory and override these methods with the hdt spec
*/
Expand Down Expand Up @@ -261,7 +247,7 @@ public CompressionResult compressToFile(int workers) throws IOException, Interru
treeWorker.start();
// wait for the workers to merge the sections and create the triples
TripleFile sections = treeWorker.waitToComplete();
return new CompressionResultFile(triplesOutput, triples, sections);
return new CompressionResultFile(triples, sections);
}

/**
Expand All @@ -283,7 +269,7 @@ public CompressionResult compressPartial() throws IOException {
IOUtil.closeAll(files);
throw e;
}
return new CompressionResultPartial(files, triplesOutput, triples);
return new CompressionResultPartial(files, triples);
}

/**
Expand Down Expand Up @@ -394,7 +380,7 @@ public InputStream openRObject() throws IOException {
}

private static class IdFetcher {
private long id = 1;
private long id = 0;

public long getNodeId() {
return ++id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.rdfhdt.hdt.triples.TripleID;
import org.rdfhdt.hdt.triples.Triples;
import org.rdfhdt.hdt.util.io.CountInputStream;
import org.rdfhdt.hdt.util.io.compress.NoDuplicateTripleIDIterator;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -140,7 +141,7 @@ public TripleComponentOrder getOrder() {

@Override
public IteratorTripleID searchAll() {
return iterator;
return new NoDuplicateTripleIDIterator(iterator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static int log2(long n) {
}

public static long maxVal(int numbits) {
return ~(~0L<<numbits);
return numbits == Long.SIZE ? Long.MAX_VALUE : ~(~0L<<numbits);
}

public static long readLowerBitsByteAligned(long numbits, InputStream in) throws IOException {
Expand Down
Loading

0 comments on commit 9409243

Please sign in to comment.