From e1ac3d0be9ed5a50cb90187d3bcbf499b652a7dc Mon Sep 17 00:00:00 2001 From: philipperenzen <112689808+philipperenzen@users.noreply.github.com> Date: Tue, 2 Jan 2024 14:07:52 +0100 Subject: [PATCH] Fixes #3 batch processing for generator (#23) * Implement batchprocessing for generator using UNION with bind for $this * added + tests * Update github-ci.yml - local files instead of remote endpoint --------- Co-authored-by: Mark Lindeman --- .github/workflows/github-ci.yml | 4 +- src/lib/Generator.class.ts | 102 ++++++++++++++++++-------- src/lib/Stage.class.ts | 17 ++--- src/lib/tests/Generator.class.test.ts | 42 ++++++++++- static/ld-workbench.schema.json | 2 + 5 files changed, 124 insertions(+), 43 deletions(-) diff --git a/.github/workflows/github-ci.yml b/.github/workflows/github-ci.yml index 447d08d..106878a 100644 --- a/.github/workflows/github-ci.yml +++ b/.github/workflows/github-ci.yml @@ -41,7 +41,7 @@ jobs: - name: Build run: npm run build - name: LDWorkbench init & test run - run: rm -rf ./pipelines && npx ld-workbench --init && npx ld-workbench + run: rm -rf ./pipelines && npx ld-workbench --init && npx ld-workbench -c "src/utils/tests/static/single/conf.yml" - name: Test run: npm run test - - run: echo "Job status ${{ job.status }}." \ No newline at end of file + - run: echo "Job status ${{ job.status }}." diff --git a/src/lib/Generator.class.ts b/src/lib/Generator.class.ts index 87eac4e..5765a18 100644 --- a/src/lib/Generator.class.ts +++ b/src/lib/Generator.class.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/method-signature-style */ -import type { ConstructQuery } from "sparqljs"; +import type { BindPattern, ConstructQuery, GroupPattern, UnionPattern } from "sparqljs"; import type Stage from "./Stage.class.js"; import getSPARQLQuery from "../utils/getSPARQLQuery.js"; import type { Quad, NamedNode } from "@rdfjs/types"; @@ -9,63 +9,107 @@ import type { Endpoint, QueryEngine } from "./types.js"; import getEngine from '../utils/getEngine.js'; import getEngineSource from '../utils/getEngineSource.js'; import EventEmitter from 'node:events'; +import { DataFactory } from 'n3'; + +const DEFAULT_BATCH_SIZE = 10 declare interface Generator { on(event: "data", listener: (statement: Quad) => void): this; - on(event: "end", listener: (numResults: number) => void): this; + on(event: "end", listener: (iterations: number, statements: number) => void): this; on(event: "error", listener: (e: Error) => void): this; emit(event: "data", statement: Quad): boolean; - emit(event: "end", numResults: number): boolean; + emit(event: "end", iterations: number, statements: number): boolean; emit(event: "error", e: Error): boolean; } class Generator extends EventEmitter { private readonly query: ConstructQuery; private readonly engine: QueryEngine; + private iterationsProcessed: number = 0 + private iterationsIncoming?: number + private statements: number = 0 private source: string = '' + private readonly $thisList: NamedNode[] = [] private readonly endpoint: Endpoint; - public constructor(stage: Stage) { + public constructor(private readonly stage: Stage) { super() this.query = getSPARQLQuery( stage.configuration.generator.query, "construct" ); - + this.endpoint = stage.configuration.generator.endpoint === undefined ? stage.iterator.endpoint : getEndpoint(stage, "generator"); this.engine = getEngine(this.endpoint) + + stage.iterator.on('end', count => { + this.iterationsIncoming = count + for (const $this of this.$thisList) { + this.run($this, this.$thisList.length) + + } + }) + } - public run($this: NamedNode): void { - // Prebinding, see https://www.w3.org/TR/shacl/#pre-binding - // we know the query is safe to use replacement since we checked it before - const queryString = getSPARQLQueryString(this.query) - .replaceAll( - /[?$]\bthis\b/g, - `<${$this.value}>` - ); + private get batchSize(): number { + return this.stage.configuration.generator.batchSize ?? DEFAULT_BATCH_SIZE + } + + private $thisToBind($this: NamedNode): BindPattern { + return { + type: 'bind', + expression: { + type:"operation", + operator:"", + args: [ + $this + ] + }, + variable: DataFactory.variable('this') + + } + } + + + public run($this?: NamedNode, batchSize?: number): void { + if ($this !== undefined) this.$thisList.push($this) const error = (e: any): Error => new Error(`The Generator did not run succesfully, it could not get the results from the endpoint ${this.source}: ${(e as Error).message}`) - if (this.source === '') this.source = getEngineSource(this.endpoint) - let numberOfStatements = 0 - this.engine.queryQuads(queryString, { - sources: [this.source] - }).then(stream => { - stream.on('data', (quad: Quad) => { - numberOfStatements ++ - this.emit('data', quad) - }) - stream.on('end', () => { - this.emit('end', numberOfStatements) - }) - stream.on('error', (e) => { + if (this.$thisList.length >= (batchSize ?? this.batchSize)) { + if (this.source === '') this.source = getEngineSource(this.endpoint) + const unionQuery = getSPARQLQuery(getSPARQLQueryString(this.query), "construct"); + const union: UnionPattern = { type: 'union', patterns: [] } + for (const $this of this.$thisList) { + this.iterationsProcessed++ + const group: GroupPattern = { type: 'group', patterns: [...unionQuery.where ?? []] } + group.patterns.unshift(this.$thisToBind($this)) + union.patterns.push(group) + } + unionQuery.where = [union] + + this.engine.queryQuads(getSPARQLQueryString(unionQuery), { + sources: [this.source] + }).then(stream => { + stream.on('data', (quad: Quad) => { + this.statements++ + this.emit('data', quad) + }) + stream.on('error', (e) => { + this.emit("error", error(e)) + }) + stream.on('end', () => { + if (this.iterationsIncoming !== undefined && this.iterationsProcessed >= this.iterationsIncoming) { + this.emit('end', this.iterationsIncoming, this.statements) + } + }) + }).catch(e => { this.emit("error", error(e)) }) - }).catch(e => { - this.emit("error", error(e)) - }) + this.$thisList.length = 0 + } } } diff --git a/src/lib/Stage.class.ts b/src/lib/Stage.class.ts index 77d1d6f..bba5490 100644 --- a/src/lib/Stage.class.ts +++ b/src/lib/Stage.class.ts @@ -56,27 +56,24 @@ class Stage extends EventEmitter { public run(): void { let quadCount = 0 - let iteratorCount = 0 - let generatorCount = 0 + // let iteratorCount = 0 const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' }) + this.generator.on('data', quad => { writer.addQuad(quad) quadCount ++ this.emit('generatorResult', quadCount) }) - this.generator.on('end', _ => { - generatorCount++ - if (generatorCount === iteratorCount) { - this.emit('end', iteratorCount, quadCount) - } + + this.generator.on('error', e => { + this.emit('error', e) }) + this.iterator.on('data', $this => { this.generator.run($this) this.emit('iteratorResult', $this) }) - this.iterator.on('end', count => { - iteratorCount = count - }) + this.iterator.on('error', e => { this.emit('error', e) }) diff --git a/src/lib/tests/Generator.class.test.ts b/src/lib/tests/Generator.class.test.ts index 71dfa90..5919eb6 100644 --- a/src/lib/tests/Generator.class.test.ts +++ b/src/lib/tests/Generator.class.test.ts @@ -5,6 +5,7 @@ import Pipeline from "../Pipeline.class.js"; import * as chai from 'chai' import chaiAsPromised from 'chai-as-promised' import { NamedNode } from "n3"; +import * as fs from "fs" import type { LDWorkbenchConfiguration } from "../LDWorkbenchConfiguration.js"; chai.use(chaiAsPromised) const expect = chai.expect @@ -52,8 +53,45 @@ describe('Generator Class', () => { }); }); // BUG when both the generator and iterator tests are running, it seems the iterator will never terminate - describe.skip('run', () => { - it('should emit "data" and "end" events with the correct number of statements', async () => { + describe('run', () => { + it('Should work in batchSize for pipeline\'s generator', async function () { + const filePath = 'pipelines/data/example-pipelineBatch.nt'; + + + const batchConfiguration: LDWorkbenchConfiguration = { + name: 'Example Pipeline Batch', + description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n', + destination: "file://" + filePath, + stages: [ + { + name: 'Stage 1', + iterator: { + query: 'file://static/example/iterator-stage-1.rq', + endpoint: 'file://static/tests/iris.nt' + }, + generator: { + query: 'file://static/example/generator-stage-1.rq', + // adjust batchsize for test here + batchSize: 7 + } + } + ] + } + const pipelineBatch = new Pipeline(batchConfiguration, {silent: true}) + pipelineBatch.validate() + pipelineBatch.run().then(() => { + // read file after pipeline has finished + const file = fs.readFileSync(filePath, {encoding: "utf-8"}) + const fileLines = file.split("\n").sort() + expect(fileLines.length).to.equal(460) + expect(fileLines[0]).to.equal('') + expect(fileLines[1]).to.equal(' .') + expect(fileLines[fileLines.length - 1]).to.equal(' "Instance 150 of the Iris Virginica"@en .') + }).catch(error => {throw error}); + + + }) + it.skip('should emit "data" and "end" events with the correct number of statements', async () => { const configuration : LDWorkbenchConfiguration = { name: 'Example Pipeline', description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n', diff --git a/static/ld-workbench.schema.json b/static/ld-workbench.schema.json index 6f813de..2a07c0e 100644 --- a/static/ld-workbench.schema.json +++ b/static/ld-workbench.schema.json @@ -45,6 +45,7 @@ }, "batchSize": { "type": "number", + "minimum": 1, "description": "Overrule the iterator's behaviour of fetching 10 results per request, regardless of any limit's in your query." } } @@ -64,6 +65,7 @@ }, "batchSize": { "type": "number", + "minimum": 1, "description": "Overrule the generator's behaviour of fetching results for 10 bindings of $this per request." } }