Skip to content

Commit

Permalink
update to new js-runnre
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Jan 4, 2024
1 parent 8845914 commit fad208a
Show file tree
Hide file tree
Showing 9 changed files with 812 additions and 140 deletions.
521 changes: 510 additions & 11 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"license": "ISC",
"dependencies": {
"@rdfjs/types": "^1.1.0",
"@treecg/connector-types": "^1.1.11",
"@treecg/sds-storage-writer-mongo": "^0.0.5",
"@treecg/types": "^0.4.5",
"@types/n3": "^1.10.4",
Expand All @@ -29,6 +28,7 @@
"sds-processors": "^0.0.2"
},
"devDependencies": {
"@ajuvercr/js-runner": "^0.1.12",
"@types/node": "^18.11.15",
"ts-node": "^10.9.1",
"typescript": "^4.9.4"
Expand Down
71 changes: 38 additions & 33 deletions pipeline.ttl
Original file line number Diff line number Diff line change
@@ -1,77 +1,82 @@
@prefix js: <https://w3id.org/conn/js#> .
@prefix ws: <https://w3id.org/conn/ws#> .
@prefix : <https://w3id.org/conn#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix rml: <https://w3id.org/conn/rml#> .
@prefix ql: <http://semweb.mmlab.be/ns/ql#> .

<> owl:imports <https://raw.githubusercontent.com/TREEcg/connector-architecture/main/channel/http.ttl>. <> owl:imports <https://raw.githubusercontent.com/TREEcg/connector-architecture/main/channel/file.ttl>.
@prefix rml: <http://w3id.org/rml/>.
@prefix js: <https://w3id.org/conn/js#>.
@prefix ws: <https://w3id.org/conn/ws#>.
@prefix : <https://w3id.org/conn#>.
@prefix owl: <http://www.w3.org/2002/07/owl#>.
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>.
@prefix rml: <https://w3id.org/conn/rml#>.
@prefix ql: <http://semweb.mmlab.be/ns/ql#>.

<> owl:imports <https://raw.githubusercontent.com/TREEcg/connector-architecture/main/channel/http.ttl>.
<> owl:imports <https://raw.githubusercontent.com/TREEcg/connector-architecture/main/channel/file.ttl>.
<> owl:imports <https://raw.githubusercontent.com/TREEcg/connector-architecture/main/channel/kafka.ttl>.

<> owl:imports <https://raw.githubusercontent.com/ajuvercr/rml-runner/master/ontology.ttl>.
<> owl:imports <./proc/fetch.ttl>.
<> owl:imports <./proc/mumo_mapper.ttl>.
<> owl:imports <./node_modules/sds-processors/configs/2_bucketstep.ttl>.
<> owl:imports <./node_modules/sds-processors/configs/sdsify.ttl>.
<> owl:imports <./node_modules/@treecg/sds-storage-writer-mongo/step.ttl>.


<json/writer> a :JsWriterChannel.
<json/reader> a :JsReaderChannel.
[] a js:JsChannel; :reader <json/reader>; :writer <json/writer>.

[ ] a js:JsChannel;
:reader <json/reader>;
:writer <json/writer>.

<ttl/writer> a :JsWriterChannel.
<ttl/reader> a :JsReaderChannel.
[] a js:JsChannel; :reader <ttl/reader>; :writer <ttl/writer>.

[ ] a js:JsChannel;
:reader <ttl/reader>;
:writer <ttl/writer>.

<sds-raw/writer> a :JsWriterChannel.
<sds-raw/reader> a :JsReaderChannel.
[] a js:JsChannel; :reader <sds-raw/reader>; :writer <sds-raw/writer>.
[ ] a js:JsChannel;
:reader <sds-raw/reader>;
:writer <sds-raw/writer>.

<raw/buckets/writer> a :JsWriterChannel.
<raw/buckets/reader> a :JsReaderChannel.
[] a js:JsChannel; :reader <raw/buckets/reader>; :writer <raw/buckets/writer>.
[ ] a js:JsChannel;
:reader <raw/buckets/reader>;
:writer <raw/buckets/writer>.

<meta/buckets/writer> a :JsWriterChannel.
<meta/buckets/reader> a :JsReaderChannel.
[] a js:JsChannel; :reader <meta/buckets/reader>; :writer <meta/buckets/writer>.
[ ] a js:JsChannel;
:reader <meta/buckets/reader>;
:writer <meta/buckets/writer>.


[] a js:MumoFetch;
[ ] a js:MumoFetch;
js:startUrl "https://mumo.ilabt.imec.be/ingest/?key=mumo-is-cool&index=0";
js:savePath <save>;
js:intervalMs 1000;
js:dataOutput <json/writer>.

[] a js:MumoMapper;
[ ] a js:MumoMapper;
js:dataInput <json/reader>;
js:dataOutput <ttl/writer>.

[] a js:Sdsify;
[ ] a js:Sdsify;
js:input <ttl/reader>;
js:output <sds-raw/writer>;
js:objectType <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation>;
js:stream <https://w3id.org/sds#Stream>.


<metadata/in> a :FileReaderChannel;
:fileReadFirstContent "true";
:fileOnReplace "true";
:filePath <./metadataIn.ttl>.

[] a js:Bucketize;
js:dataInput <sds-raw/reader>;
js:metadataInput <metadata/in>;
js:dataOutput <raw/buckets/writer>;
js:metadataOutput <meta/buckets/writer>;
js:bucketizeStrategy <./bucketizeStrategy.ttl>;
js:inputStreamId <https://w3id.org/sds#Stream>;
js:outputStreamId <http://example.com/test>.
[ ] a js:Bucketize;
js:dataInput <sds-raw/reader>;
js:metadataInput <metadata/in>;
js:dataOutput <raw/buckets/writer>;
js:metadataOutput <meta/buckets/writer>;
js:bucketizeStrategy <./bucketizeStrategy.ttl>;
js:inputStreamId <https://w3id.org/sds#Stream>;
js:outputStreamId <http://example.com/test>.

[] a js:Ingest;
[ ] a js:Ingest;
js:dataInput <raw/buckets/reader>;
js:metadataInput <meta/buckets/reader>;
js:metadataCollection "MUMO_META";
Expand Down
34 changes: 19 additions & 15 deletions proc/fetch.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .

<> owl:imports <https://raw.githubusercontent.com/ajuvercr/js-runner/master/ontology.ttl>.


js:MumoFetch a js:JsProcess;
js:file <../lib/fetch.js>;
Expand All @@ -17,19 +15,19 @@ js:MumoFetch a js:JsProcess;
a fno:Mapping;
fno:parameterMapping [
a fnom:PositionParameterMapping ;
fnom:functionParameter js:startUrl;
fnom:implementationParameterPosition "1"^^xsd:int
fnom:functionParameter "Data output";
fnom:implementationParameterPosition "0"^^xsd:int
],[
a fnom:PositionParameterMapping ;
fnom:functionParameter js:dataOutput;
fnom:implementationParameterPosition "0"^^xsd:int
fnom:functionParameter "Mumo start url";
fnom:implementationParameterPosition "1"^^xsd:int
],[
a fnom:PositionParameterMapping ;
fnom:functionParameter js:savePath;
fnom:functionParameter "Save path";
fnom:implementationParameterPosition "2"^^xsd:int
],[
a fnom:PositionParameterMapping ;
fnom:functionParameter js:intervalMs;
fnom:functionParameter "Interval";
fnom:implementationParameterPosition "3"^^xsd:int
]
].
Expand All @@ -39,19 +37,25 @@ js:MumoFetch a js:JsProcess;
sh:property [
sh:class :WriterChannel;
sh:path js:dataOutput;
sh:name "Data output channel";
sh:name "Data output";
sh:minCount 1;
sh:maxCount 1;
], [
sh:dataType xsd:string;
sh:datatype xsd:string;
sh:path js:startUrl;
sh:name "Start consuming Mumo ingested data from this url";
sh:name "Mumo start url";
sh:minCount 1;
sh:maxCount 1;
], [
sh:dataType xsd:string;
sh:datatype xsd:string;
sh:path js:savePath;
sh:name "Save last consumed URL at this location (optional)";
sh:name "Save path";
sh:maxCount 1;
], [
sh:dataType xsd:integer;
sh:datatype xsd:integer;
sh:path js:intervalMs;
sh:name "Wait intervalMs millisecond before retrying the current URL.";
sh:name "Interval";
sh:maxCount 1;
].


10 changes: 4 additions & 6 deletions proc/mumo_mapper.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .

<> owl:imports <https://raw.githubusercontent.com/ajuvercr/js-runner/master/ontology.ttl>.


js:MumoMapper a js:JsProcess;
js:file <../lib/mapper.js>;
Expand All @@ -17,11 +15,11 @@ js:MumoMapper a js:JsProcess;
a fno:Mapping;
fno:parameterMapping [
a fnom:PositionParameterMapping ;
fnom:functionParameter js:dataInput;
fnom:functionParameter "data input";
fnom:implementationParameterPosition "0"^^xsd:int
],[
a fnom:PositionParameterMapping ;
fnom:functionParameter js:dataOutput;
fnom:functionParameter "data output";
fnom:implementationParameterPosition "1"^^xsd:int
]
].
Expand All @@ -31,11 +29,11 @@ js:MumoMapper a js:JsProcess;
sh:property [
sh:class :WriterChannel;
sh:path js:dataOutput;
sh:name "Data output channel";
sh:name "data output";
], [
sh:class :ReaderChannel;
sh:path js:dataInput;
sh:name "Data input channel";
sh:name "data input";
].


38 changes: 28 additions & 10 deletions src/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Writer } from "@treecg/connector-types";
import { readFileSync, writeFileSync } from "fs";
import fetch, { Headers } from "node-fetch";
import type { Writer } from "@ajuvercr/js-runner";

interface Link {
target: string;
Expand Down Expand Up @@ -31,11 +31,17 @@ function extract_links(headers: Headers): Link[] {
return out;
}

async function findNextUrl(url: string, interval_ms: number, maybeLinks?: Link[]): Promise<string> {
let links = !!maybeLinks ? maybeLinks : await fetch(url).then(resp => extract_links(resp.headers));
async function findNextUrl(
url: string,
interval_ms: number,
maybeLinks?: Link[],
): Promise<string> {
let links = !!maybeLinks
? maybeLinks
: await fetch(url).then((resp) => extract_links(resp.headers));

while (true) {
const next = links.find(x => x.target === "next");
const next = links.find((x) => x.target === "next");
if (next) {
// Found next url
const url_url = new URL(url);
Expand All @@ -44,25 +50,32 @@ async function findNextUrl(url: string, interval_ms: number, maybeLinks?: Link[]

console.log("waiting");
// Wait and refetch and look for headers
await new Promise(res => setTimeout(res, interval_ms));
await new Promise((res) => setTimeout(res, interval_ms));
const resp = await fetch(url);
links = extract_links(resp.headers);
}
}

async function start(writer: Writer<string>, start_url: string, interval_ms: number, save_path?: string) {
async function start(
writer: Writer<string>,
start_url: string,
interval_ms: number,
save_path?: string,
) {
console.log("Start url", start_url)
console.log("Save path", save_path)
const save = (url: string) => {
if (save_path)
if (save_path) {
writeFileSync(save_path, url, { encoding: "utf8" });
}
};

let url = start_url;
if (save_path) {
try {
url = readFileSync(save_path, { encoding: "utf8" });
url = await findNextUrl(url, interval_ms);
} catch (ex: any) {
}
} catch (ex: any) {}
}

while (true) {
Expand All @@ -80,6 +93,11 @@ async function start(writer: Writer<string>, start_url: string, interval_ms: num
}
}

export function fetcher(writer: Writer<string>, start_url: string, save_path?: string, interval_ms = 1000) {
export function fetcher(
writer: Writer<string>,
start_url: string,
save_path?: string,
interval_ms = 1000,
) {
start(writer, start_url, interval_ms, save_path);
}
Loading

0 comments on commit fad208a

Please sign in to comment.