Skip to content

Commit

Permalink
Fixes #3 batch processing for generator (#23)
Browse files Browse the repository at this point in the history
* Implement batchprocessing for generator using UNION with bind for $this

*  added + tests

* Update github-ci.yml - local files instead of remote endpoint

---------

Co-authored-by: Mark Lindeman <[email protected]>
  • Loading branch information
philipperenzen and mightymax authored Jan 2, 2024
1 parent 3674469 commit e1ac3d0
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 43 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/github-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
- name: Build
run: npm run build
- name: LDWorkbench init & test run
run: rm -rf ./pipelines && npx ld-workbench --init && npx ld-workbench
run: rm -rf ./pipelines && npx ld-workbench --init && npx ld-workbench -c "src/utils/tests/static/single/conf.yml"
- name: Test
run: npm run test
- run: echo "Job status ${{ job.status }}."
- run: echo "Job status ${{ job.status }}."
102 changes: 73 additions & 29 deletions src/lib/Generator.class.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/method-signature-style */
import type { ConstructQuery } from "sparqljs";
import type { BindPattern, ConstructQuery, GroupPattern, UnionPattern } from "sparqljs";
import type Stage from "./Stage.class.js";
import getSPARQLQuery from "../utils/getSPARQLQuery.js";
import type { Quad, NamedNode } from "@rdfjs/types";
Expand All @@ -9,63 +9,107 @@ import type { Endpoint, QueryEngine } from "./types.js";
import getEngine from '../utils/getEngine.js';
import getEngineSource from '../utils/getEngineSource.js';
import EventEmitter from 'node:events';
import { DataFactory } from 'n3';

const DEFAULT_BATCH_SIZE = 10

declare interface Generator {
on(event: "data", listener: (statement: Quad) => void): this;
on(event: "end", listener: (numResults: number) => void): this;
on(event: "end", listener: (iterations: number, statements: number) => void): this;
on(event: "error", listener: (e: Error) => void): this;

emit(event: "data", statement: Quad): boolean;
emit(event: "end", numResults: number): boolean;
emit(event: "end", iterations: number, statements: number): boolean;
emit(event: "error", e: Error): boolean;
}
class Generator extends EventEmitter {
private readonly query: ConstructQuery;
private readonly engine: QueryEngine;
private iterationsProcessed: number = 0
private iterationsIncoming?: number
private statements: number = 0
private source: string = ''
private readonly $thisList: NamedNode[] = []
private readonly endpoint: Endpoint;
public constructor(stage: Stage) {
public constructor(private readonly stage: Stage) {
super()
this.query = getSPARQLQuery(
stage.configuration.generator.query,
"construct"
);

this.endpoint =
stage.configuration.generator.endpoint === undefined
? stage.iterator.endpoint
: getEndpoint(stage, "generator");

this.engine = getEngine(this.endpoint)

stage.iterator.on('end', count => {
this.iterationsIncoming = count
for (const $this of this.$thisList) {
this.run($this, this.$thisList.length)

}
})

}

public run($this: NamedNode): void {
// Prebinding, see https://www.w3.org/TR/shacl/#pre-binding
// we know the query is safe to use replacement since we checked it before
const queryString = getSPARQLQueryString(this.query)
.replaceAll(
/[?$]\bthis\b/g,
`<${$this.value}>`
);
private get batchSize(): number {
return this.stage.configuration.generator.batchSize ?? DEFAULT_BATCH_SIZE
}

private $thisToBind($this: NamedNode): BindPattern {
return {
type: 'bind',
expression: {
type:"operation",
operator:"",
args: [
$this
]
},
variable: DataFactory.variable('this')

}
}


public run($this?: NamedNode, batchSize?: number): void {
if ($this !== undefined) this.$thisList.push($this)
const error = (e: any): Error => new Error(`The Generator did not run succesfully, it could not get the results from the endpoint ${this.source}: ${(e as Error).message}`)
if (this.source === '') this.source = getEngineSource(this.endpoint)
let numberOfStatements = 0
this.engine.queryQuads(queryString, {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
numberOfStatements ++
this.emit('data', quad)
})
stream.on('end', () => {
this.emit('end', numberOfStatements)
})
stream.on('error', (e) => {
if (this.$thisList.length >= (batchSize ?? this.batchSize)) {
if (this.source === '') this.source = getEngineSource(this.endpoint)
const unionQuery = getSPARQLQuery(getSPARQLQueryString(this.query), "construct");
const union: UnionPattern = { type: 'union', patterns: [] }
for (const $this of this.$thisList) {
this.iterationsProcessed++
const group: GroupPattern = { type: 'group', patterns: [...unionQuery.where ?? []] }
group.patterns.unshift(this.$thisToBind($this))
union.patterns.push(group)
}
unionQuery.where = [union]

this.engine.queryQuads(getSPARQLQueryString(unionQuery), {
sources: [this.source]
}).then(stream => {
stream.on('data', (quad: Quad) => {
this.statements++
this.emit('data', quad)
})
stream.on('error', (e) => {
this.emit("error", error(e))
})
stream.on('end', () => {
if (this.iterationsIncoming !== undefined && this.iterationsProcessed >= this.iterationsIncoming) {
this.emit('end', this.iterationsIncoming, this.statements)
}
})
}).catch(e => {
this.emit("error", error(e))
})
}).catch(e => {
this.emit("error", error(e))
})
this.$thisList.length = 0
}
}
}

Expand Down
17 changes: 7 additions & 10 deletions src/lib/Stage.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,24 @@ class Stage extends EventEmitter {

public run(): void {
let quadCount = 0
let iteratorCount = 0
let generatorCount = 0
// let iteratorCount = 0
const writer = new Writer(this.destination(), { end: false, format: 'N-Triples' })

this.generator.on('data', quad => {
writer.addQuad(quad)
quadCount ++
this.emit('generatorResult', quadCount)
})
this.generator.on('end', _ => {
generatorCount++
if (generatorCount === iteratorCount) {
this.emit('end', iteratorCount, quadCount)
}

this.generator.on('error', e => {
this.emit('error', e)
})

this.iterator.on('data', $this => {
this.generator.run($this)
this.emit('iteratorResult', $this)
})
this.iterator.on('end', count => {
iteratorCount = count
})

this.iterator.on('error', e => {
this.emit('error', e)
})
Expand Down
42 changes: 40 additions & 2 deletions src/lib/tests/Generator.class.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Pipeline from "../Pipeline.class.js";
import * as chai from 'chai'
import chaiAsPromised from 'chai-as-promised'
import { NamedNode } from "n3";
import * as fs from "fs"
import type { LDWorkbenchConfiguration } from "../LDWorkbenchConfiguration.js";
chai.use(chaiAsPromised)
const expect = chai.expect
Expand Down Expand Up @@ -52,8 +53,45 @@ describe('Generator Class', () => {
});
});
// BUG when both the generator and iterator tests are running, it seems the iterator will never terminate
describe.skip('run', () => {
it('should emit "data" and "end" events with the correct number of statements', async () => {
describe('run', () => {
it('Should work in batchSize for pipeline\'s generator', async function () {
const filePath = 'pipelines/data/example-pipelineBatch.nt';


const batchConfiguration: LDWorkbenchConfiguration = {
name: 'Example Pipeline Batch',
description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n',
destination: "file://" + filePath,
stages: [
{
name: 'Stage 1',
iterator: {
query: 'file://static/example/iterator-stage-1.rq',
endpoint: 'file://static/tests/iris.nt'
},
generator: {
query: 'file://static/example/generator-stage-1.rq',
// adjust batchsize for test here
batchSize: 7
}
}
]
}
const pipelineBatch = new Pipeline(batchConfiguration, {silent: true})
pipelineBatch.validate()
pipelineBatch.run().then(() => {
// read file after pipeline has finished
const file = fs.readFileSync(filePath, {encoding: "utf-8"})
const fileLines = file.split("\n").sort()
expect(fileLines.length).to.equal(460)
expect(fileLines[0]).to.equal('')
expect(fileLines[1]).to.equal('<http://dbpedia.org/resource/Iris_setosa> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/Thing> .')
expect(fileLines[fileLines.length - 1]).to.equal('<https://triplydb.com/triply/iris/id/floweringPlant/00150> <https://schema.org/name> "Instance 150 of the Iris Virginica"@en .')
}).catch(error => {throw error});


})
it.skip('should emit "data" and "end" events with the correct number of statements', async () => {
const configuration : LDWorkbenchConfiguration = {
name: 'Example Pipeline',
description: 'This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work.\n',
Expand Down
2 changes: 2 additions & 0 deletions static/ld-workbench.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
},
"batchSize": {
"type": "number",
"minimum": 1,
"description": "Overrule the iterator's behaviour of fetching 10 results per request, regardless of any limit's in your query."
}
}
Expand All @@ -64,6 +65,7 @@
},
"batchSize": {
"type": "number",
"minimum": 1,
"description": "Overrule the generator's behaviour of fetching results for 10 bindings of $this per request."
}
}
Expand Down

0 comments on commit e1ac3d0

Please sign in to comment.