Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K-HDTCat #181

Merged
merged 2 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add k-HDTCat to HDTCat-Tree with profiling
  • Loading branch information
ate47 committed Dec 1, 2022
commit eac943b6ef4ae65ab6e0711f6d115aafc0939cf6
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public class HDTOptionsKeys {
*/
@Key(type = Key.Type.DOUBLE, desc = "Memory fault factor for HDTCat tree method split")
public static final String LOADER_CATTREE_MEMORY_FAULT_FACTOR = "loader.cattree.memoryFaultFactor";
/**
* Key for the k-merge HDTCat for the {@link org.rdfhdt.hdt.hdt.HDTManager} catTree default to 2 using default
* implementation of HDTCat, not K-HDTCat
*/
@Key(type = Key.Type.NUMBER, desc = "Number of HDT to merge at the same time with K-HDTCat, by default it use the default HDTCat implementation")
public static final String LOADER_CATTREE_KCAT = "loader.cattree.kcat";

/**
* Key for the hdt supplier type, default to memory
Expand Down
1 change: 1 addition & 0 deletions hdt-api/src/main/java/org/rdfhdt/hdt/util/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public void writeToDisk(Path outputPath) throws IOException {
public Section getMainSection() {
if (this.mainSection == null) {
this.mainSection = new Section(name);
maxSize = Math.max(name.length() + deep * 2, maxSize);
}
return this.mainSection;
}
Expand Down
5 changes: 5 additions & 0 deletions hdt-java-cli/bin/hdtCat.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
@echo off

call "%~dp0\javaenv.bat"

"%JAVACMD%" %JAVAOPTIONS% -classpath %~dp0\..\lib\* org.rdfhdt.hdt.tools.HDTCat %*
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public SequenceLog64Map(CountInputStream in, File f) throws IOException {
this(in, f, false);
}

@SuppressWarnings("resource")
private SequenceLog64Map(CountInputStream in, File f, boolean closeInput) throws IOException {
CRCInputStream crcin = new CRCInputStream(in, new CRC8());

Expand Down Expand Up @@ -189,7 +188,7 @@ private long getWord(long w) {
@Override
public long get(long index) {
if(index<0 || index>=numentries) {
throw new IndexOutOfBoundsException();
throw new IndexOutOfBoundsException(index + " < 0 || " + index + ">= " + numentries);
}
if(numbits==0) return 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public TripleID next() {
}

private static long firstSubjectTripleId(HDT hdt) {
if (hdt.getDictionary().getSubjects().getNumberOfElements() == 0) {
// no subjects
return -1;
}
IteratorTripleID it = hdt.getTriples().search(new TripleID(
hdt.getDictionary().getNshared() + 1,
0,
Expand All @@ -107,21 +111,31 @@ public static Iterator<TripleID> fromHDTs(KCatMerger merger, HDT[] hdts) {
// extract hdt elements for this index
HDT hdt = hdts[hdtIndex];

if (hdt.getTriples().getNumberOfElements() == 0) {
// no triples
return ExceptionIterator.<TripleID, RuntimeException>empty();
}
// get the first subject triple id
long firstSubjectTripleId = firstSubjectTripleId(hdt);

// create a subject iterator, mapped to the new IDs
IteratorTripleID subjectIterator = hdt.getTriples().searchAll();
subjectIterator.goTo(firstSubjectTripleId);
ExceptionIterator<TripleID, RuntimeException> subjectIteratorMapped = ExceptionIterator.of(
new SharedOnlyIterator(
new MapIterator<>(subjectIterator, (tid) -> {
assert inHDT(tid, hdts[hdtIndex]);
return merger.extractMapped(hdtIndex, tid);
}),
shared
)
);
ExceptionIterator<TripleID, RuntimeException> subjectIteratorMapped;
if (firstSubjectTripleId == -1) {
// no triples
subjectIteratorMapped = ExceptionIterator.empty();
} else {
// create a subject iterator, mapped to the new IDs
IteratorTripleID subjectIterator = hdt.getTriples().searchAll();
subjectIterator.goTo(firstSubjectTripleId);
subjectIteratorMapped = ExceptionIterator.of(
new SharedOnlyIterator(
new MapIterator<>(subjectIterator, (tid) -> {
assert inHDT(tid, hdts[hdtIndex]);
return merger.extractMapped(hdtIndex, tid);
}),
shared
)
);
}

if (shared == 0) {
return subjectIteratorMapped;
Expand All @@ -147,6 +161,10 @@ public static Iterator<TripleID> fromHDTs(KCatMerger merger, HDT[] hdts) {
// get the first subject triple id
long firstSubjectTripleId = firstSubjectTripleId(hdt);

if (firstSubjectTripleId == -1) {
return ExceptionIterator.<TripleID, RuntimeException>empty();
}

// create a subject iterator, mapped to the new IDs
IteratorTripleID subjectIterator = hdt.getTriples().searchAll();
subjectIterator.goTo(firstSubjectTripleId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.rdfhdt.hdt.hdt.impl.HDTBase;
import org.rdfhdt.hdt.hdt.impl.WriteHDTImpl;
import org.rdfhdt.hdt.header.HeaderFactory;
import org.rdfhdt.hdt.listener.MultiThreadListener;
import org.rdfhdt.hdt.listener.ProgressListener;
import org.rdfhdt.hdt.options.HDTOptions;
import org.rdfhdt.hdt.options.HDTOptionsKeys;
Expand All @@ -17,6 +18,7 @@
import org.rdfhdt.hdt.triples.impl.BitmapTriples;
import org.rdfhdt.hdt.triples.impl.OneReadTempTriples;
import org.rdfhdt.hdt.triples.impl.WriteBitmapTriples;
import org.rdfhdt.hdt.util.Profiler;
import org.rdfhdt.hdt.util.io.CloseSuppressPath;
import org.rdfhdt.hdt.util.io.IOUtil;
import org.rdfhdt.hdt.util.listener.IntermediateListener;
Expand Down Expand Up @@ -54,12 +56,13 @@ private static TripleComponentOrder getOrder(HDT hdt) {
private final Path futureLocation;
private final boolean futureMap;
private final boolean clearLocation;
private final ProgressListener listener;
private final MultiThreadListener listener;
private final String dictionaryType;
private final int bufferSize;
private final HDTOptions hdtFormat;
private final TripleComponentOrder order;
private final long rawSize;
private final Profiler profiler;

/**
* Create implementation
Expand All @@ -82,6 +85,8 @@ public KCatImpl(List<String> hdtFileNames, HDTOptions hdtFormat, ProgressListene
bufferSize = (int) bufferSizeLong;
}

profiler = Profiler.createOrLoadSubSection("doHDTCatk", hdtFormat, true);

try {
ListIterator<String> it = hdtFileNames.listIterator();

Expand Down Expand Up @@ -147,10 +152,14 @@ public KCatImpl(List<String> hdtFileNames, HDTOptions hdtFormat, ProgressListene

location.closeWithDeleteRecurse();
} catch (Throwable t) {
for (HDT hdt : hdts) {
IOUtil.closeQuietly(hdt);
try {
throw t;
} finally {
for (HDT hdt : hdts) {
IOUtil.closeQuietly(hdt);
}
profiler.close();
}
throw t;
}
}

Expand All @@ -172,40 +181,53 @@ public HDT cat() throws IOException {
il.setRange(0, 40);
il.setPrefix("Merge Dict: ");
try (KCatMerger merger = createMerger(il)) {
profiler.pushSection("dict");
// create the dictionary
try (DictionaryPrivate dictionary = merger.buildDictionary()) {
profiler.popSection();
assert merger.assertReadCorrectly();
listener.unregisterAllThreads();
profiler.pushSection("triples");
// create a GROUP BY subject iterator to get the new ordered stream
Iterator<TripleID> tripleIterator = GroupBySubjectMapIterator.fromHDTs(merger, hdts);
try (WriteBitmapTriples triples = new WriteBitmapTriples(hdtFormat, location.resolve("triples"), bufferSize)) {
long count = Arrays.stream(hdts).mapToLong(h -> h.getTriples().getNumberOfElements()).sum();

il.setRange(40, 80);
il.setPrefix("Merge triples: ");
il.notifyProgress(0, "start");
triples.load(new OneReadTempTriples(tripleIterator, order, count), il);
profiler.popSection();

WriteHDTImpl writeHDT = new WriteHDTImpl(hdtFormat, location, dictionary, triples, HeaderFactory.createHeader(hdtFormat));
profiler.pushSection("header");
writeHDT.populateHeaderStructure(baseURI);
// add a raw size from the previous values (if available)
if (rawSize != -1) {
writeHDT.getHeader().insert("_:statistics", HDTVocabulary.ORIGINAL_SIZE, String.valueOf(rawSize));
}
profiler.popSection();

profiler.pushSection("save");
il.setRange(80, 90);
il.setPrefix("Save HDT: ");
il.notifyProgress(0, "save to " + futureLocationStr);
writeHDT.saveToHDT(futureLocationStr, il);
profiler.popSection();
}
}

listener.unregisterAllThreads();
} catch (InterruptedException e) {
throw new IOException("Interruption", e);
}

il.setRange(90, 100);
HDT hdt;
if (futureMap) {
il.notifyProgress(0, "map hdt");
hdt = HDTManager.mapHDT(futureLocationStr, il);
} else {
il.notifyProgress(0, "load hdt");
hdt = HDTManager.loadHDT(futureLocationStr, il);
Files.deleteIfExists(futureLocation);
}
Expand All @@ -216,7 +238,16 @@ public HDT cat() throws IOException {
@Override
public void close() throws IOException {
try {
IOUtil.closeAll(hdts);
try {
try {
profiler.stop();
profiler.writeProfiling();
} finally {
profiler.close();
}
} finally {
IOUtil.closeAll(hdts);
}
} finally {
if (clearLocation) {
location.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public class KCatMerger implements AutoCloseable {
private final PipedCopyIterator<DuplicateBuffer> subjectPipe = new PipedCopyIterator<>();
private final PipedCopyIterator<DuplicateBuffer> objectPipe = new PipedCopyIterator<>();
private final PipedCopyIterator<BiDuplicateBuffer> sharedPipe = new PipedCopyIterator<>();
private final DuplicateBufferIterator<RuntimeException> sortedSubject;
private final DuplicateBufferIterator<RuntimeException> sortedObject;
private final DuplicateBufferIterator<RuntimeException> sortedPredicates;
private final Map<ByteString, DuplicateBufferIterator<RuntimeException>> sortedSubSections;
private final ExceptionIterator<DuplicateBuffer, RuntimeException> sortedSubject;
private final ExceptionIterator<DuplicateBuffer, RuntimeException> sortedObject;
private final ExceptionIterator<DuplicateBuffer, RuntimeException> sortedPredicates;
private final Map<ByteString, ExceptionIterator<DuplicateBuffer, RuntimeException>> sortedSubSections;

private final long estimatedSizeP;
private final AtomicLong countTyped = new AtomicLong();
Expand Down Expand Up @@ -107,6 +107,7 @@ public KCatMerger(HDT[] hdts, CloseSuppressPath location, ProgressListener liste
long sizeS = 0;
long sizeP = 0;
long sizeO = 0;
long sizeONoTyped = 0;
long sizeShared = 0;

Map<ByteString, PreIndexSection[]> subSections = new TreeMap<>();
Expand All @@ -118,6 +119,7 @@ public KCatMerger(HDT[] hdts, CloseSuppressPath location, ProgressListener liste
sizeS += cat.countSubjects();
sizeP += cat.countPredicates();
sizeO += cat.countObjects();
sizeONoTyped += cat.getObjectSection().getNumberOfElements();
sizeShared += cat.countShared();

long start = 1L + cat.countShared();
Expand Down Expand Up @@ -165,7 +167,8 @@ public KCatMerger(HDT[] hdts, CloseSuppressPath location, ProgressListener liste
c.getSharedSection().getSortedEntries(),
c.countShared()
)
);
).notif(sizeS, 20, "Merge subjects", listener);

sortedObject = mergeSection(
cats,
(hdtIndex, c) -> createMergeIt(
Expand All @@ -174,20 +177,20 @@ public KCatMerger(HDT[] hdts, CloseSuppressPath location, ProgressListener liste
c.getSharedSection().getSortedEntries(),
c.objectShift()
)
);
).notif(sizeONoTyped, 20, "Merge objects", listener);

// merge the other sections
sortedPredicates = mergeSection(cats, (hdtIndex, c) -> {
ExceptionIterator<? extends CharSequence, RuntimeException> of = ExceptionIterator.of(c.getPredicateSection().getSortedEntries());
return of.map(((element, index) -> new LocatedIndexedNode(hdtIndex, index + 1, ByteString.of(element))));
});
}).notif(sizeP, 20, "Merge predicates", listener);

sortedSubSections = new TreeMap<>();
// create a merge section for each section
subSections.forEach((key, sections) -> sortedSubSections.put(key, mergeSection(sections, (hdtIndex, pre) -> {
ExceptionIterator<? extends CharSequence, RuntimeException> of = ExceptionIterator.of(pre.getSortedEntries());
return of.map(((element, index) -> new LocatedIndexedNode(hdtIndex, pre.getStart() + index, ByteString.of(element))));
})));
}).notif(Arrays.stream(sections).mapToLong(s -> s == null || s.section == null ? 0 : s.section.getNumberOfElements()).sum(), 20, "Merge typed objects", listener)));

// convert the dupe buffer streams to byte string streams

Expand Down Expand Up @@ -288,7 +291,7 @@ public static <T> DuplicateBufferIterator<RuntimeException> mergeSection(T[] sec
return mapper.apply(hdtIndex, e);
},
LocatedIndexedNode::compareTo,
List.of(sections),
Arrays.asList(sections),
0,
sections.length
),
Expand Down Expand Up @@ -471,7 +474,7 @@ private void runSubSectionCompute() {
ByteString key = e.getKey();
WriteDictionarySection section = e.getValue();

DuplicateBufferIterator<RuntimeException> bufferIterator = sortedSubSections.get(key);
ExceptionIterator<DuplicateBuffer, RuntimeException> bufferIterator = sortedSubSections.get(key);

final long currentShift = shift;
section.load(new OneReadDictionarySection(bufferIterator.map((db, id) -> {
Expand Down Expand Up @@ -505,7 +508,7 @@ public void close() throws IOException {
throw new RuntimeException(e);
} finally {
Closer.of(sectionSubject, sectionPredicate, sectionObject, sectionShared)
.with(sectionSub.values())
.with(sectionSub == null ? List.of() : sectionSub.values())
.with(subjectsMaps)
.with(predicatesMaps)
.with(objectsMaps)
Expand Down
Loading