From e563ccda4df574b75a76d92a95f413419879a408 Mon Sep 17 00:00:00 2001 From: Julian Rojas Date: Fri, 5 Jan 2024 23:20:09 +0100 Subject: [PATCH] Optionally sort bulked members in SDS processing --- src/sdsify.ts | 28 ++++++++++++------ test/sdsify.test.ts | 72 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 86 insertions(+), 14 deletions(-) diff --git a/src/sdsify.ts b/src/sdsify.ts index 63528da..27f10d7 100644 --- a/src/sdsify.ts +++ b/src/sdsify.ts @@ -26,9 +26,9 @@ function extractMainNodeShape(store: Store): Quad_Subject { if (nodeShapes && nodeShapes.length > 0) { for (const ns of nodeShapes) { const isNotReferenced = store.getSubjects(null, ns, null).length === 0; - - if(isNotReferenced) { - if(!mainNodeShape) { + + if (isNotReferenced) { + if (!mainNodeShape) { mainNodeShape = ns; } else { throw new Error("There are multiple main node shapes in a given shape. Unrelated shapes must be given as separate shape filters"); @@ -49,12 +49,13 @@ export function sdsify( input: Stream, output: Writer, streamNode: Term, + timestampPath?: string, shapeFilters?: string[], ) { input.data(async (input) => { const dataStore = new Store(maybe_parse(input)); console.log("[sdsify] Got input with", dataStore.size, "quads"); - const members: { [id: string]: Quad[] } = {}; + const members: Array<{ [id: string]: Quad[] }> = []; if (shapeFilters) { console.log("[sdsify] Extracting SDS members based on given shape(s)"); @@ -72,9 +73,8 @@ export function sdsify( // Execute the CBDShapeExtractor over every targeted instance of the given shape for (const entity of dataStore.getSubjects(RDF.type, targetClass, null)) { - members[entity.value] = await shapeExtractor.extract(dataStore, entity, mainNodeShape); + members.push({[entity.value]: await shapeExtractor.extract(dataStore, entity, mainNodeShape)}); } - } } else { // Extract members based on a Concise Bound Description (CBD) @@ -82,11 +82,20 @@ export function sdsify( for (const sub of dataStore.getSubjects(null, null, null)) { if (sub instanceof NamedNode) { - members[sub.value] = await cbdExtractor.extract(dataStore, sub); + members.push({[sub.value]: await cbdExtractor.extract(dataStore, sub)}); } } } + // Sort members based on the given timestamp value (if any) to avoid out of order writing issues downstream + if (timestampPath) { + members.sort((a, b) => { + const ta = new Date(dataStore.getObjects(Object.keys(a)[0], timestampPath, null)[0].value).getTime(); + const tb = new Date(dataStore.getObjects(Object.keys(b)[0], timestampPath, null)[0].value).getTime(); + return ta - tb; + }); + } + let membersCount = 0; // Create a unique transaction ID based on the data content and the current system time @@ -95,8 +104,9 @@ export function sdsify( .quadsToString(dataStore.getQuads(null, null, null, null))) .digest("hex") + "_" + new Date().toISOString(); - for (const key of Object.keys(members)) { - const quads = members[key]; + for (const obj of members) { + const key = Object.keys(obj)[0]; + const quads = obj[key]; const blank = blankNode(); quads.push( diff --git a/test/sdsify.test.ts b/test/sdsify.test.ts index 3d5760b..084c5f7 100644 --- a/test/sdsify.test.ts +++ b/test/sdsify.test.ts @@ -34,6 +34,22 @@ describe("Functional tests for the sdsify function", () => { a ex:SomeOtherClass; ex:prop3 "another value". `; + const INPUT_3 = ` + @prefix ex: . + @prefix xsd: . + + a ex:SomeClass; + ex:prop1 "some value A"; + ex:timestamp "2024-01-05T09:00:00.000Z"^^xsd:dateTime. + + a ex:SomeClass; + ex:prop1 "some value B"; + ex:timestamp "2024-01-05T10:00:00.000Z"^^xsd:dateTime. + + a ex:SomeClass; + ex:prop1 "some value C"; + ex:timestamp "2024-01-05T07:00:00.000Z"^^xsd:dateTime. + `; const SHAPE_1 = ` @prefix sh: . @prefix ex: . @@ -78,6 +94,14 @@ describe("Functional tests for the sdsify function", () => { ] ]. `; + const SHAPE_4 = ` + @prefix sh: . + @prefix ex: . + @prefix rdf: . + + [ ] a sh:NodeShape; + sh:targetClass ex:SomeClass. + `; const BAD_SHAPE_1 = ` @prefix sh: . @prefix ex: . @@ -139,7 +163,7 @@ describe("Functional tests for the sdsify function", () => { }); // Execute function - sdsify(input, output, STREAM_ID, [SHAPE_1]); + sdsify(input, output, STREAM_ID, undefined, [SHAPE_1]); // Push some data in await input.push(INPUT_1); @@ -163,7 +187,7 @@ describe("Functional tests for the sdsify function", () => { }); // Execute function - sdsify(input, output, STREAM_ID, [SHAPE_2]); + sdsify(input, output, STREAM_ID, undefined, [SHAPE_2]); // Push some data in await input.push(INPUT_1); @@ -190,7 +214,7 @@ describe("Functional tests for the sdsify function", () => { }); // Execute function - sdsify(input, output, STREAM_ID, [SHAPE_3]); + sdsify(input, output, STREAM_ID, undefined, [SHAPE_3]); // Push some data in await input.push(INPUT_2); @@ -216,7 +240,7 @@ describe("Functional tests for the sdsify function", () => { }); // Execute function - sdsify(input, output, STREAM_ID, [SHAPE_1, SHAPE_2]); + sdsify(input, output, STREAM_ID, undefined, [SHAPE_1, SHAPE_2]); // Push some data in await input.push(INPUT_1); @@ -233,7 +257,7 @@ describe("Functional tests for the sdsify function", () => { }).on("end", () => { }); // Execute function - sdsify(input, output, STREAM_ID, [BAD_SHAPE_1]); + sdsify(input, output, STREAM_ID, undefined, [BAD_SHAPE_1]); try { // Push some data in expect(await input.push(INPUT_1)).toThrow(Error); @@ -241,4 +265,42 @@ describe("Functional tests for the sdsify function", () => { expect(err.message).toBe("There are multiple main node shapes in a given shape. Unrelated shapes must be given as separate shape filters"); } }); + + test("Time stamp-based ordering of extraction SHACL-based extraction", async () => { + const input = new SimpleStream(); + const output = new SimpleStream(); + + const store = new Store(); + const timestamps: string[] = []; + + output.data(data => { + const quads = new Parser().parse(data); + const subj = quads[0].subject; + store.addQuads(quads); + timestamps.push(store.getObjects(subj, "http://ex.org/timestamp", null)[0].value); + }).on("end", () => { + // Check there number of members + expect(store.getObjects(null, SDS.payload, null).length).toBe(3); + + // Check all properties are extracted for members + expect(store.getQuads(null, "http://ex.org/prop1", literal("some value A"), null).length).toBe(1); + expect(store.getQuads(null, "http://ex.org/prop1", literal("some value B"), null).length).toBe(1); + expect(store.getQuads(null, "http://ex.org/prop1", literal("some value C"), null).length).toBe(1); + expect(store.getQuads(null, "http://ex.org/timestamp", null, null).length).toBe(3); + + let currT = 0; + for (const ts of timestamps) { + const tsv = new Date(ts).getTime(); + expect(tsv).toBeGreaterThanOrEqual(currT); + currT = tsv; + } + }); + + // Execute function + sdsify(input, output, STREAM_ID, "http://ex.org/timestamp", [SHAPE_4]); + + // Push some data in + await input.push(INPUT_3); + await input.end(); + }); }); \ No newline at end of file