Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async CatTree and dir parser #186

Merged
merged 3 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add async CatTree using k-HDTCatDiff and reduce memory usage in HDT i…
…ndexing
  • Loading branch information
ate47 committed Jan 4, 2023
commit 72a9d9aa238e740bb559fcaf5c139c4bf5ea1393
21 changes: 21 additions & 0 deletions hdt-api/src/main/java/org/rdfhdt/hdt/options/HDTOptionsKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ public class HDTOptionsKeys {
*/
@Key(type = Key.Type.PATH, desc = "Path of the CatTree generation")
public static final String LOADER_CATTREE_LOCATION_KEY = "loader.cattree.location";
/**
* Key to use async version of the {@link org.rdfhdt.hdt.hdt.HDTManager} catTree methods, will run the k-HDTCAT
* algorithm, by default the value is false, boolean value
*/
@Key(type = Key.Type.BOOLEAN, desc = "Use async version")
public static final String LOADER_CATTREE_ASYNC_KEY = "loader.cattree.async";
/**
* Same as {@link #LOADER_TYPE_KEY} for loader in the CATTREE method
*/
Expand Down Expand Up @@ -202,6 +208,16 @@ public class HDTOptionsKeys {
*/
@Key(type = Key.Type.PATH, desc = "Profiler output file")
public static final String PROFILER_OUTPUT_KEY = "profiler.output";
/**
* Key for enabling the profiler (if implemented) for async bi tasks, default to false. Boolean value
*/
@Key(type = Key.Type.BOOLEAN, desc = "Run a second profiler for async bi tasks")
public static final String PROFILER_ASYNC_KEY = "profiler.async";
/**
* Key for the profiler output (if implemented). File value
*/
@Key(type = Key.Type.PATH, desc = "Profiler output file")
public static final String PROFILER_ASYNC_OUTPUT_KEY = "profiler.async.output";
/**
* Key for enabling the canonical NTriple file simple parser, default to false. Boolean value
*/
Expand Down Expand Up @@ -293,6 +309,11 @@ public class HDTOptionsKeys {
@Key(type = Key.Type.BOOLEAN, desc = "Delete the HDTCat temp files directory after HDTCat, default to true")
public static final String HDTCAT_DELETE_LOCATION = "hdtcat.deleteLocation";

@Key(type = Key.Type.BOOLEAN, desc = "Use disk implementation to generate the hdt sub-index")
public static final String BITMAPTRIPLES_SEQUENCE_DISK = "bitmaptriples.sequence.disk";

@Key(type = Key.Type.BOOLEAN, desc = "Disk location for the " + BITMAPTRIPLES_SEQUENCE_DISK + " option")
public static final String BITMAPTRIPLES_SEQUENCE_DISK_LOCATION = "bitmaptriples.sequence.disk.location";
// use tree-map to have a better order
private static final Map<String, Option> OPTION_MAP = new TreeMap<>();

Expand Down
48 changes: 41 additions & 7 deletions hdt-api/src/main/java/org/rdfhdt/hdt/util/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,24 @@ private static byte[] readBuffer(InputStream input, int length) throws IOExcepti
* @return profiler
*/
public static Profiler createOrLoadSubSection(String name, HDTOptions options, boolean setId) {
return createOrLoadSubSection(name, options, setId, false);
}

/**
* create or load a profiler from the options into a subsection
*
* @param name name
* @param options options
* @param setId set the id after loading (if required)
* @param async use async profiler
* @return profiler
*/
public static Profiler createOrLoadSubSection(String name, HDTOptions options, boolean setId, boolean async) {
// no options, we can't create
if (options == null) {
return new Profiler(name, null);
return new Profiler(name, null, async);
}
String profiler = options.get(HDTOptionsKeys.PROFILER_KEY);
String profiler = options.get(async ? HDTOptionsKeys.PROFILER_ASYNC_KEY : HDTOptionsKeys.PROFILER_KEY);
if (profiler != null && profiler.length() != 0 && profiler.charAt(0) == '!') {
Profiler prof = getProfilerById(Long.parseLong(profiler.substring(1)));
if (prof != null) {
Expand All @@ -125,9 +138,9 @@ public static Profiler createOrLoadSubSection(String name, HDTOptions options, b
}
}
// no id, not an id
Profiler prof = new Profiler(name, options);
Profiler prof = new Profiler(name, options, async);
if (setId) {
options.set(HDTOptionsKeys.PROFILER_KEY, prof);
options.set(async ? HDTOptionsKeys.PROFILER_ASYNC_KEY : HDTOptionsKeys.PROFILER_KEY, prof);
}
return prof;
}
Expand All @@ -147,7 +160,17 @@ public static Profiler createOrLoadSubSection(String name, HDTOptions options, b
* @param name the profiler name
*/
public Profiler(String name) {
this(name, null);
this(name, false);
}

/**
* create a disabled profiler
*
* @param name the profiler name
* @param async async profiler
*/
public Profiler(String name, boolean async) {
this(name, null, async);
}

/**
Expand All @@ -157,13 +180,24 @@ public Profiler(String name) {
* @param spec spec (nullable)
*/
public Profiler(String name, HDTOptions spec) {
this(name, spec, false);
}

/**
* create a profiler from specifications
*
* @param name profiler name
* @param spec spec (nullable)
* @param async async profiler
*/
public Profiler(String name, HDTOptions spec, boolean async) {
this.id = PROFILER_IDS.incrementAndGet();
PROFILER.put(this.id, this);
this.name = Objects.requireNonNull(name, "name can't be null!");
if (spec != null) {
String b = spec.get(HDTOptionsKeys.PROFILER_KEY);
String b = spec.get(async ? HDTOptionsKeys.PROFILER_ASYNC_KEY : HDTOptionsKeys.PROFILER_KEY);
disabled = b == null || b.length() == 0 || !(b.charAt(0) == '!' || "true".equalsIgnoreCase(b));
String profilerOutputLocation = spec.get(HDTOptionsKeys.PROFILER_OUTPUT_KEY);
String profilerOutputLocation = spec.get(async ? HDTOptionsKeys.PROFILER_ASYNC_OUTPUT_KEY : HDTOptionsKeys.PROFILER_OUTPUT_KEY);
if (profilerOutputLocation != null && !profilerOutputLocation.isEmpty()) {
outputPath = Path.of(profilerOutputLocation);
}
Expand Down
187 changes: 5 additions & 182 deletions hdt-java-core/src/main/java/org/rdfhdt/hdt/hdt/HDTManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.rdfhdt.hdt.hdt.impl.HDTImpl;
import org.rdfhdt.hdt.hdt.impl.TempHDTImporterOnePass;
import org.rdfhdt.hdt.hdt.impl.TempHDTImporterTwoPass;
import org.rdfhdt.hdt.hdt.impl.diskimport.CatTreeImpl;
import org.rdfhdt.hdt.hdt.writer.TripleWriterHDT;
import org.rdfhdt.hdt.header.HeaderUtil;
import org.rdfhdt.hdt.iterator.utils.FluxStopTripleStringIterator;
Expand Down Expand Up @@ -131,7 +132,8 @@ private RDFFluxStop readFluxStopOrSizeLimit(HDTOptions spec) {

if (!HDTOptionsKeys.LOADER_TYPE_VALUE_DISK.equals(loaderType)) {
// memory based implementation, we can only store the NT file
return RDFFluxStop.sizeLimit(getMaxChunkSize());
// divide because the memory implementation is using a lot of memory
return RDFFluxStop.sizeLimit(getMaxChunkSize() / 4);
}

// disk based implementation, we only have to reduce the fault-factor of the map files
Expand Down Expand Up @@ -384,189 +386,10 @@ protected HDT doHDTCatTree(RDFFluxStop fluxStop, HDTSupplier supplier, InputStre
}
}

/**
* get the previous HDTs to merge with current
*
* @param nextFile if we can create a new HDT after this one
* @param files hdt files to merge
* @param current current created HDT
* @param maxFiles max file to merge
* @return list of HDT to merge with current, mi
*/
private List<HDTFile> getNextHDTs(boolean nextFile, List<HDTFile> files, HDTFile current, int maxFiles) {
if (files.isEmpty()) {
return List.of();
}
List<HDTFile> next = new ArrayList<>();
if (nextFile || files.size() > maxFiles) {
for (int i = 1; i < maxFiles && i <= files.size(); i++) {
HDTFile old = files.get(files.size() - i);

// check if the chunks are matching
if (nextFile && old.getChunks() > current.getChunks()) {
break;
}

next.add(old);
}
if (!nextFile || next.size() == maxFiles - 1) {
// we have all the elements, or we have enough file
// we remove the elements from the files
for (int i = 0; i < next.size(); i++) {
files.remove(files.size() - 1);
}
} else {
return List.of();
}
} else {
next.addAll(files);
files.clear();
}
next.add(current);
return next;
}

@Override
protected HDT doHDTCatTree(RDFFluxStop fluxStop, HDTSupplier supplier, Iterator<TripleString> iterator, String baseURI, HDTOptions hdtFormat, ProgressListener listener) throws IOException, ParserException {
Path basePath;

long khdtCatOpt = hdtFormat.getInt(HDTOptionsKeys.LOADER_CATTREE_KCAT, 1);

int kHDTCat;

if (khdtCatOpt > 0 && khdtCatOpt < Integer.MAX_VALUE - 6) {
kHDTCat = (int) khdtCatOpt;
} else {
throw new IllegalArgumentException("Invalid kcat value: " + khdtCatOpt);
}

String baseNameOpt = hdtFormat.get(HDTOptionsKeys.LOADER_CATTREE_LOCATION_KEY);

if (baseNameOpt == null || baseNameOpt.isEmpty()) {
basePath = Files.createTempDirectory("hdt-java-cat-tree");
} else {
basePath = Path.of(baseNameOpt);
}

// hide the loader type to avoid infinite recursion
hdtFormat = new HideHDTOptions(hdtFormat, key -> HDTOptionsKeys.LOADER_TYPE_KEY.equals(key) ? HDTOptionsKeys.LOADER_CATTREE_LOADERTYPE_KEY : key);

Path futureHDTLocation = Optional.ofNullable(hdtFormat.get(HDTOptionsKeys.LOADER_CATTREE_FUTURE_HDT_LOCATION_KEY)).map(Path::of).orElse(null);

try (Profiler profiler = Profiler.createOrLoadSubSection("doHDTCatTree", hdtFormat, true)) {
FluxStopTripleStringIterator it = new FluxStopTripleStringIterator(iterator, fluxStop);

List<HDTFile> files = new ArrayList<>();

long gen = 0;
long cat = 0;

Path hdtStore = basePath.resolve("hdt-store");
Path hdtCatLocationPath = basePath.resolve("cat");
String hdtCatLocation = hdtCatLocationPath.toAbsolutePath().toString();

Files.createDirectories(hdtStore);
Files.createDirectories(hdtCatLocationPath);

boolean nextFile;
do {
// generate the hdt
gen++;
profiler.pushSection("generateHDT #" + gen);
PrefixListener il = PrefixListener.of("gen#" + gen, listener);
Path hdtLocation = hdtStore.resolve("hdt-" + gen + ".hdt");
// help memory flooding algorithm
System.gc();
supplier.doGenerateHDT(it, baseURI, hdtFormat, il, hdtLocation);
il.clearThreads();

nextFile = it.hasNextFlux();
HDTFile hdtFile = new HDTFile(hdtLocation, 1);
profiler.popSection();

// merge the generated hdt with each block with enough size
if (kHDTCat == 1) { // default impl
while (!files.isEmpty() && (!nextFile || (files.get(files.size() - 1)).getChunks() <= hdtFile.getChunks())) {
HDTFile lastHDTFile = files.remove(files.size() - 1);
cat++;
profiler.pushSection("catHDT #" + cat);
PrefixListener ilc = PrefixListener.of("cat#" + cat, listener);
Path hdtCatFileLocation = hdtStore.resolve("hdtcat-" + cat + ".hdt");
try (HDT abcat = HDTManager.catHDT(
hdtCatLocation,
lastHDTFile.getHdtFile().toAbsolutePath().toString(),
hdtFile.getHdtFile().toAbsolutePath().toString(),
hdtFormat, ilc)) {
abcat.saveToHDT(hdtCatFileLocation.toAbsolutePath().toString(), ilc);
}
ilc.clearThreads();
// delete previous chunks
Files.delete(lastHDTFile.getHdtFile());
Files.delete(hdtFile.getHdtFile());
// note the new hdt file and the number of chunks
hdtFile = new HDTFile(hdtCatFileLocation, lastHDTFile.getChunks() + hdtFile.getChunks());

profiler.popSection();
}
} else { // kcat
List<HDTFile> nextHDTs;

while (!(nextHDTs = getNextHDTs(nextFile, files, hdtFile, kHDTCat)).isEmpty()) {
// merge all the files
cat++;
profiler.pushSection("catHDT #" + cat);
PrefixListener ilc = PrefixListener.of("cat#" + cat, listener);
Path hdtCatFileLocation = hdtStore.resolve("hdtcat-" + cat + ".hdt");

assert nextHDTs.size() > 1;

try (HDT abcat = HDTManager.catHDT(
nextHDTs.stream().map(f -> f.getHdtFile().toAbsolutePath().toString()).collect(Collectors.toList()),
hdtFormat,
ilc)) {
abcat.saveToHDT(hdtCatFileLocation.toAbsolutePath().toString(), ilc);
}
ilc.clearThreads();

// delete previous chunks
for (HDTFile nextHDT : nextHDTs) {
Files.delete(nextHDT.getHdtFile());
}
// note the new hdt file and the number of chunks
long chunks = nextHDTs.stream().mapToLong(HDTFile::getChunks).sum();
hdtFile = new HDTFile(hdtCatFileLocation, chunks);

profiler.popSection();
}
}
assert nextFile || files.isEmpty() : "no data remaining, but contains files";
files.add(hdtFile);
} while (nextFile);

listener.notifyProgress(100, "done, loading HDT");

Path hdtFile = files.get(0).hdtFile;

assert files.size() == 1 : "more than 1 file: " + files;
assert cat < gen : "more cat than gen";
assert files.get(0).getChunks() == gen : "gen size isn't the same as excepted: " + files.get(0).getChunks() + " != " + gen;

try {
// if a future HDT location has been asked, move to it and map the HDT
if (futureHDTLocation != null) {
Files.createDirectories(futureHDTLocation.toAbsolutePath().getParent());
Files.deleteIfExists(futureHDTLocation);
Files.move(hdtFile, futureHDTLocation);
return HDTManager.mapHDT(futureHDTLocation.toAbsolutePath().toString());
}

// if no future location has been asked, load the HDT and delete it after
return HDTManager.loadHDT(hdtFile.toAbsolutePath().toString());
} finally {
Files.deleteIfExists(hdtFile);
profiler.stop();
profiler.writeProfiling();
}
try (CatTreeImpl tree = new CatTreeImpl(hdtFormat)) {
return tree.doGeneration(fluxStop, supplier, iterator, baseURI, listener);
}
}

Expand Down
Loading