Skip to content

Commit

Permalink
Feature delay argument for iterator (#32)
Browse files Browse the repository at this point in the history
* Added feature to delay iterator's endpoint requests (including generator's construct query requests)

---------

Co-authored-by: Laurens Rietveld <[email protected]>
  • Loading branch information
philipperenzen and LaurensRietveld authored Jan 8, 2024
1 parent 85f3d7b commit 937f3e3
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 42 deletions.
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"millify": "^6.1.0",
"n3": "^1.17.2",
"ora": "^7.0.1",
"parse-duration": "^1.1.0",
"pretty-ms": "^8.0.0",
"sparqljs": "^3.7.1"
}
Expand Down
1 change: 1 addition & 0 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,5 @@ class Generator extends EventEmitter {
}
}


export default Generator
92 changes: 50 additions & 42 deletions src/lib/Iterator.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import getEndpoint from "../utils/getEndpoint.js";
import type { Endpoint, QueryEngine } from "./types.js";
import getEngine from "../utils/getEngine.js";
import getEngineSource from "../utils/getEngineSource.js";
import parse from 'parse-duration'

const DEFAULT_LIMIT = 10;
declare interface Iterator {
Expand All @@ -26,6 +27,7 @@ class Iterator extends EventEmitter {
private readonly query: SelectQuery;
public readonly endpoint: Endpoint;
private readonly engine: QueryEngine;
private readonly delay: number | undefined
private source: string = "";
private $offset = 0;
private totalResults = 0;
Expand All @@ -39,52 +41,58 @@ class Iterator extends EventEmitter {
DEFAULT_LIMIT;
this.endpoint = getEndpoint(stage);
this.engine = getEngine(this.endpoint);
if (stage.configuration.iterator.delay !== undefined){
const delay = parse(stage.configuration.iterator.delay)
if (delay === undefined) throw new Error(`Error in stage \`${stage.configuration.name}\`: incorrect delay format was provided.`)
this.delay = delay
}
}

public run(): void {
let resultsPerPage = 0;
if (this.source === "") this.source = getEngineSource(this.endpoint);
this.query.offset = this.$offset;
const queryString = getSPARQLQueryString(this.query);
const error = (e: any): Error => new Error(
`The Iterator did not run succesfully, it could not get the results from the endpoint ${this.source} (offset: ${this.$offset}, limit ${this.query.limit}): ${(e as Error).message}`
)
this.engine
.queryBindings(queryString, {
sources: [this.source],
})
.then((stream) => {
stream.on("data", (binding: Bindings) => {
resultsPerPage++;
if (!binding.has("this"))
throw new Error("Missing binding $this in the Iterator result.");
const $this = binding.get("this")!;
if ($this.termType !== "NamedNode") {
throw new Error(
`Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.`
);
} else {
this.emit("data", $this);
}
});

stream.on("end", () => {
this.totalResults += resultsPerPage;
this.$offset += this.query.limit!;
if (resultsPerPage < this.query.limit!) {
this.emit("end", this.totalResults);
} else {
this.run();
}
});

stream.on('error', (e) => {
this.emit("error", error(e))
setTimeout(() => {
let resultsPerPage = 0;
if (this.source === "") this.source = getEngineSource(this.endpoint);
this.query.offset = this.$offset;
const queryString = getSPARQLQueryString(this.query);
const error = (e: any): Error => new Error(
`The Iterator did not run succesfully, it could not get the results from the endpoint ${this.source} (offset: ${this.$offset}, limit ${this.query.limit}): ${(e as Error).message}`
)
this.engine
.queryBindings(queryString, {
sources: [this.source],
})
.then((stream) => {
stream.on("data", (binding: Bindings) => {
resultsPerPage++;
if (!binding.has("this"))
throw new Error("Missing binding $this in the Iterator result.");
const $this = binding.get("this")!;
if ($this.termType !== "NamedNode") {
throw new Error(
`Binding $this in the Iterator result must be an Iri/NamedNode, but it is of type ${$this.termType}.`
);
} else {
this.emit("data", $this);
}
});
stream.on("end", () => {
this.totalResults += resultsPerPage;
this.$offset += this.query.limit!;
if (resultsPerPage < this.query.limit!) {
this.emit("end", this.totalResults);
} else {
this.run();
}
});

stream.on('error', (e) => {
this.emit("error", error(e))
})
})
})
.catch((e) => {
this.emit("error", error(e))
});
.catch((e) => {
this.emit("error", error(e))
});
}, this.delay ?? 0)
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/lib/LDWorkbenchConfiguration.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ export interface LDWorkbenchConfiguration {
* Overrule the iterator's behaviour of fetching 10 results per request, regardless of any limit's in your query.
*/
batchSize?: number;
/**
* Human readable time delay for the iterator's SPARQL endpoint requests (e.g. '5ms', '100 milliseconds', '1s').
*/
delay?: string;
};
/**
* @minItems 1
Expand Down Expand Up @@ -117,6 +121,10 @@ export interface LDWorkbenchConfiguration {
* Overrule the iterator's behaviour of fetching 10 results per request, regardless of any limit's in your query.
*/
batchSize?: number;
/**
* Human readable time delay for the iterator's SPARQL endpoint requests (e.g. '5ms', '100 milliseconds', '1s').
*/
delay?: string;
};
/**
* @minItems 1
Expand Down
1 change: 1 addition & 0 deletions static/example/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ stages:
iterator:
query: file://static/example/iterator-stage-1.rq
endpoint: https://api.triplydb.com/datasets/Triply/iris/services/demo-service/sparql
delay: "50ms"
generator:
# First generator
- query: file://static/example/generator-stage-1-1.rq
Expand Down
4 changes: 4 additions & 0 deletions static/ld-workbench.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
"type": "number",
"minimum": 1,
"description": "Overrule the iterator's behaviour of fetching 10 results per request, regardless of any limit's in your query."
},
"delay": {
"type": "string",
"description": "Human readable time delay for the iterator's SPARQL endpoint requests (e.g. '5ms', '100 milliseconds', '1s'). "
}
}
},
Expand Down

0 comments on commit 937f3e3

Please sign in to comment.