Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async CatTree and dir parser #186

Merged
merged 3 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add async dir parser
  • Loading branch information
ate47 committed Jan 9, 2023
commit 83353667eb9feffcbb7a429796045dbb53091744
8 changes: 6 additions & 2 deletions hdt-api/src/main/java/org/rdfhdt/hdt/enums/RDFNotation.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,12 @@ public static RDFNotation guess(String fileName) throws IllegalArgumentException

throw new IllegalArgumentException("Could not guess the format for "+fileName);
}

public static RDFNotation guess(File fileName) throws IllegalArgumentException {
return guess(fileName.getName());
return guess(fileName.getAbsolutePath());
}

public static RDFNotation guess(Path fileName) throws IllegalArgumentException {
return guess(fileName.toAbsolutePath().toString());
}
}
54 changes: 54 additions & 0 deletions hdt-api/src/main/java/org/rdfhdt/hdt/options/HDTOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.rdfhdt.hdt.rdf.RDFFluxStop;
import org.rdfhdt.hdt.util.Profiler;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.DoubleSupplier;
Expand All @@ -42,6 +44,58 @@
* @author mario.arias
*/
public interface HDTOptions {
/**
* empty option, can be used to set values
*/
HDTOptions EMPTY = new HDTOptions() {
@Override
public void clear() {
// already empty
}

@Override
public String get(String key) {
// no value for key
return null;
}

@Override
public void set(String key, String value) {
throw new NotImplementedException("set");
}
};

/**
* @return create empty, modifiable options
*/
static HDTOptions of() {
return of(Map.of());
}

/**
* create modifiable options starting from the copy of the data map
* @param data data map
* @return options
*/
static HDTOptions of(Map<String, String> data) {
Map<String, String> map = new HashMap<>(data);
return new HDTOptions() {
@Override
public void clear() {
map.clear();
}

@Override
public String get(String key) {
return map.get(key);
}

@Override
public void set(String key, String value) {
map.put(key, value);
}
};
}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ public class HDTOptionsKeys {
*/
@Key(type = Key.Type.BOOLEAN, desc = "Use the canonical NT file parser, removing checks")
public static final String NT_SIMPLE_PARSER_KEY = "parser.ntSimpleParser";
/**
* Key for setting the maximum amount of file loaded with the directory parser, 1 for no async parsing, 0
* for the number of processors, default 1. Number value
*/
@Key(type = Key.Type.NUMBER, desc = "Use async dir parser")
public static final String ASYNC_DIR_PARSER_KEY = "parser.dir.async";
/**
* Key for setting the triple order. see {@link org.rdfhdt.hdt.enums.TripleComponentOrder}'s names to have the values
* default to {@link org.rdfhdt.hdt.enums.TripleComponentOrder#SPO}
Expand Down
11 changes: 11 additions & 0 deletions hdt-api/src/main/java/org/rdfhdt/hdt/rdf/RDFParserCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ public interface RDFParserCallback {
@FunctionalInterface
interface RDFCallback {
void processTriple(TripleString triple, long pos);

/**
* @return an async version of this callback
*/
default RDFCallback async() {
return ((triple, pos) -> {
synchronized (this) {
this.processTriple(triple, pos);
}
});
}
}

void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) throws ParserException;
Expand Down
15 changes: 5 additions & 10 deletions hdt-java-core/src/main/java/org/rdfhdt/hdt/hdt/HDTManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@
public class HDTManagerImpl extends HDTManager {
private static final Logger logger = LoggerFactory.getLogger(HDTManagerImpl.class);

private boolean useSimple(HDTOptions spec) {
String value = spec.get(HDTOptionsKeys.NT_SIMPLE_PARSER_KEY);
return value != null && !value.isEmpty() && !value.equals("false");
}

@Override
public HDTOptions doReadOptions(String file) throws IOException {
return new HDTSpecification(file);
Expand Down Expand Up @@ -161,13 +156,13 @@ public HDT doGenerateHDT(String rdfFileName, String baseURI, RDFNotation rdfNota
} else if (HDTOptionsKeys.LOADER_TYPE_VALUE_CAT.equals(loaderType)) {
return doHDTCatTree(readFluxStopOrSizeLimit(spec), HDTSupplier.fromSpec(spec), rdfFileName, baseURI, rdfNotation, spec, listener);
} else if (HDTOptionsKeys.LOADER_TYPE_VALUE_TWO_PASS.equals(loaderType)) {
loader = new TempHDTImporterTwoPass(useSimple(spec));
loader = new TempHDTImporterTwoPass(spec);
} else {
if (loaderType != null && !HDTOptionsKeys.LOADER_TYPE_VALUE_ONE_PASS.equals(loaderType)) {
logger.warn("Used the option {} with value {}, which isn't recognize, using default value {}",
HDTOptionsKeys.LOADER_TYPE_KEY, loaderType, HDTOptionsKeys.LOADER_TYPE_VALUE_ONE_PASS);
}
loader = new TempHDTImporterOnePass(useSimple(spec));
loader = new TempHDTImporterOnePass(spec);
}

// Create TempHDT
Expand Down Expand Up @@ -229,7 +224,7 @@ public HDT doGenerateHDT(Iterator<TripleString> triples, String baseURI, HDTOpti
HDTOptionsKeys.LOADER_TYPE_KEY, loaderType, HDTOptionsKeys.LOADER_TYPE_VALUE_ONE_PASS);
}
}
loader = new TempHDTImporterOnePass(useSimple(spec));
loader = new TempHDTImporterOnePass(spec);
}

// Create TempHDT
Expand Down Expand Up @@ -264,7 +259,7 @@ public HDT doGenerateHDTDisk(InputStream fileStream, String baseURI, RDFNotation
// uncompress the stream if required
fileStream = IOUtil.asUncompressed(fileStream, compressionType);
// create a parser for this rdf stream
RDFParserCallback parser = RDFParserFactory.getParserCallback(rdfNotation, useSimple(hdtFormat));
RDFParserCallback parser = RDFParserFactory.getParserCallback(rdfNotation, hdtFormat);
// read the stream as triples
try (PipedCopyIterator<TripleString> iterator = RDFParserFactory.readAsIterator(parser, fileStream, baseURI, true, rdfNotation)) {
return doGenerateHDTDisk0(iterator, true, baseURI, hdtFormat, listener);
Expand Down Expand Up @@ -380,7 +375,7 @@ protected HDT doHDTCatTree(RDFFluxStop fluxStop, HDTSupplier supplier, String fi

@Override
protected HDT doHDTCatTree(RDFFluxStop fluxStop, HDTSupplier supplier, InputStream stream, String baseURI, RDFNotation rdfNotation, HDTOptions hdtFormat, ProgressListener listener) throws IOException, ParserException {
RDFParserCallback parser = RDFParserFactory.getParserCallback(rdfNotation, useSimple(hdtFormat));
RDFParserCallback parser = RDFParserFactory.getParserCallback(rdfNotation, hdtFormat);
try (PipedCopyIterator<TripleString> iterator = RDFParserFactory.readAsIterator(parser, stream, baseURI, true, rdfNotation)) {
return doHDTCatTree(fluxStop, supplier, iterator, baseURI, hdtFormat, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ public void processTriple(TripleString triple, long pos) {
}
}

private final boolean useSimple;
private final HDTOptions spec;

public TempHDTImporterOnePass(boolean useSimple) {
this.useSimple = useSimple;
public TempHDTImporterOnePass(HDTOptions spec) {
this.spec = spec;
}

@Override
public TempHDT loadFromRDF(HDTOptions specs, String filename, String baseUri, RDFNotation notation, ProgressListener listener)
throws ParserException {

RDFParserCallback parser = RDFParserFactory.getParserCallback(notation, useSimple);
RDFParserCallback parser = RDFParserFactory.getParserCallback(notation, spec);

// Create Modifiable Instance
TempHDT modHDT = new TempHDTImpl(specs, baseUri, ModeOfLoading.ONE_PASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,17 @@ public void processTriple(TripleString triple, long pos) {
}
}

private final boolean useSimple;
private final HDTOptions spec;

public TempHDTImporterTwoPass(boolean useSimple) {
this.useSimple = useSimple;
public TempHDTImporterTwoPass(HDTOptions spec) {
this.spec = spec;
}

@Override
public TempHDT loadFromRDF(HDTOptions specs, String filename, String baseUri, RDFNotation notation, ProgressListener listener)
throws ParserException {

RDFParserCallback parser = RDFParserFactory.getParserCallback(notation, useSimple);
RDFParserCallback parser = RDFParserFactory.getParserCallback(notation, spec);

// Create Modifiable Instance and parser
TempHDT modHDT = new TempHDTImpl(specs, baseUri, ModeOfLoading.TWO_PASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.rdfhdt.hdt.enums.RDFNotation;
import org.rdfhdt.hdt.exceptions.NotImplementedException;
import org.rdfhdt.hdt.iterator.utils.PipedCopyIterator;
import org.rdfhdt.hdt.options.HDTOptions;
import org.rdfhdt.hdt.options.HDTOptionsKeys;
import org.rdfhdt.hdt.rdf.parsers.RDFParserDir;
import org.rdfhdt.hdt.rdf.parsers.RDFParserHDT;
import org.rdfhdt.hdt.rdf.parsers.RDFParserList;
Expand All @@ -40,23 +42,23 @@
import org.rdfhdt.hdt.rdf.parsers.RDFParserZip;
import org.rdfhdt.hdt.triples.TripleString;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;

/**
* @author mario.arias
*
*/
public class RDFParserFactory {
public static boolean useSimple(HDTOptions options) {
return options != null && options.getBoolean(HDTOptionsKeys.NT_SIMPLE_PARSER_KEY, false);
}
public static RDFParserCallback getParserCallback(RDFNotation notation) {
return getParserCallback(notation, false);
return getParserCallback(notation, HDTOptions.EMPTY);
}
public static RDFParserCallback getParserCallback(RDFNotation notation, boolean useSimple) {
public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptions spec) {
switch(notation) {
case NTRIPLES:
if (useSimple) {
if (useSimple(spec)) {
return new RDFParserSimple();
}
case NQUAD:
Expand All @@ -65,15 +67,15 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, boolean
case RDFXML:
return new RDFParserRIOT();
case DIR:
return new RDFParserDir(useSimple);
return new RDFParserDir(spec);
case LIST:
return new RDFParserList();
return new RDFParserList(spec);
case ZIP:
return new RDFParserZip(useSimple);
return new RDFParserZip(spec);
case TAR:
return new RDFParserTar(useSimple);
return new RDFParserTar(spec);
case RAR:
return new RDFParserRAR(useSimple);
return new RDFParserRAR(spec);
case HDT:
return new RDFParserHDT();
case JSONLD:
Expand Down
Loading