Skip to content

Commit

Permalink
Optionally sort bulked members in SDS processing
Browse files Browse the repository at this point in the history
  • Loading branch information
julianrojas87 committed Jan 5, 2024
1 parent 8380e6a commit e563ccd
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 14 deletions.
28 changes: 19 additions & 9 deletions src/sdsify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ function extractMainNodeShape(store: Store): Quad_Subject {
if (nodeShapes && nodeShapes.length > 0) {
for (const ns of nodeShapes) {
const isNotReferenced = store.getSubjects(null, ns, null).length === 0;
if(isNotReferenced) {
if(!mainNodeShape) {

if (isNotReferenced) {
if (!mainNodeShape) {
mainNodeShape = ns;
} else {
throw new Error("There are multiple main node shapes in a given shape. Unrelated shapes must be given as separate shape filters");
Expand All @@ -49,12 +49,13 @@ export function sdsify(
input: Stream<string | Quad[]>,
output: Writer<string>,
streamNode: Term,
timestampPath?: string,
shapeFilters?: string[],
) {
input.data(async (input) => {
const dataStore = new Store(maybe_parse(input));
console.log("[sdsify] Got input with", dataStore.size, "quads");
const members: { [id: string]: Quad[] } = {};
const members: Array<{ [id: string]: Quad[] }> = [];

if (shapeFilters) {
console.log("[sdsify] Extracting SDS members based on given shape(s)");
Expand All @@ -72,21 +73,29 @@ export function sdsify(

// Execute the CBDShapeExtractor over every targeted instance of the given shape
for (const entity of dataStore.getSubjects(RDF.type, targetClass, null)) {
members[entity.value] = await shapeExtractor.extract(dataStore, entity, mainNodeShape);
members.push({[entity.value]: await shapeExtractor.extract(dataStore, entity, mainNodeShape)});
}

}
} else {
// Extract members based on a Concise Bound Description (CBD)
const cbdExtractor = new CBDShapeExtractor();

for (const sub of dataStore.getSubjects(null, null, null)) {
if (sub instanceof NamedNode) {
members[sub.value] = await cbdExtractor.extract(dataStore, sub);
members.push({[sub.value]: await cbdExtractor.extract(dataStore, sub)});
}
}
}

// Sort members based on the given timestamp value (if any) to avoid out of order writing issues downstream
if (timestampPath) {
members.sort((a, b) => {
const ta = new Date(dataStore.getObjects(Object.keys(a)[0], timestampPath, null)[0].value).getTime();
const tb = new Date(dataStore.getObjects(Object.keys(b)[0], timestampPath, null)[0].value).getTime();
return ta - tb;
});
}

let membersCount = 0;

// Create a unique transaction ID based on the data content and the current system time
Expand All @@ -95,8 +104,9 @@ export function sdsify(
.quadsToString(dataStore.getQuads(null, null, null, null)))
.digest("hex") + "_" + new Date().toISOString();

for (const key of Object.keys(members)) {
const quads = members[key];
for (const obj of members) {
const key = Object.keys(obj)[0];
const quads = obj[key];
const blank = blankNode();

quads.push(
Expand Down
72 changes: 67 additions & 5 deletions test/sdsify.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ describe("Functional tests for the sdsify function", () => {
<B> a ex:SomeOtherClass;
ex:prop3 "another value".
`;
const INPUT_3 = `
@prefix ex: <http://ex.org/>.
@prefix xsd: <http://www.w3.org/2001/XMLSchema#>.
<A> a ex:SomeClass;
ex:prop1 "some value A";
ex:timestamp "2024-01-05T09:00:00.000Z"^^xsd:dateTime.
<B> a ex:SomeClass;
ex:prop1 "some value B";
ex:timestamp "2024-01-05T10:00:00.000Z"^^xsd:dateTime.
<C> a ex:SomeClass;
ex:prop1 "some value C";
ex:timestamp "2024-01-05T07:00:00.000Z"^^xsd:dateTime.
`;
const SHAPE_1 = `
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix ex: <http://ex.org/>.
Expand Down Expand Up @@ -78,6 +94,14 @@ describe("Functional tests for the sdsify function", () => {
]
].
`;
const SHAPE_4 = `
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix ex: <http://ex.org/>.
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>.
[ ] a sh:NodeShape;
sh:targetClass ex:SomeClass.
`;
const BAD_SHAPE_1 = `
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix ex: <http://ex.org/>.
Expand Down Expand Up @@ -139,7 +163,7 @@ describe("Functional tests for the sdsify function", () => {
});

// Execute function
sdsify(input, output, STREAM_ID, [SHAPE_1]);
sdsify(input, output, STREAM_ID, undefined, [SHAPE_1]);

// Push some data in
await input.push(INPUT_1);
Expand All @@ -163,7 +187,7 @@ describe("Functional tests for the sdsify function", () => {
});

// Execute function
sdsify(input, output, STREAM_ID, [SHAPE_2]);
sdsify(input, output, STREAM_ID, undefined, [SHAPE_2]);

// Push some data in
await input.push(INPUT_1);
Expand All @@ -190,7 +214,7 @@ describe("Functional tests for the sdsify function", () => {
});

// Execute function
sdsify(input, output, STREAM_ID, [SHAPE_3]);
sdsify(input, output, STREAM_ID, undefined, [SHAPE_3]);

// Push some data in
await input.push(INPUT_2);
Expand All @@ -216,7 +240,7 @@ describe("Functional tests for the sdsify function", () => {
});

// Execute function
sdsify(input, output, STREAM_ID, [SHAPE_1, SHAPE_2]);
sdsify(input, output, STREAM_ID, undefined, [SHAPE_1, SHAPE_2]);

// Push some data in
await input.push(INPUT_1);
Expand All @@ -233,12 +257,50 @@ describe("Functional tests for the sdsify function", () => {
}).on("end", () => { });

// Execute function
sdsify(input, output, STREAM_ID, [BAD_SHAPE_1]);
sdsify(input, output, STREAM_ID, undefined, [BAD_SHAPE_1]);
try {
// Push some data in
expect(await input.push(INPUT_1)).toThrow(Error);
} catch (err) {
expect(err.message).toBe("There are multiple main node shapes in a given shape. Unrelated shapes must be given as separate shape filters");
}
});

test("Time stamp-based ordering of extraction SHACL-based extraction", async () => {
const input = new SimpleStream<string>();
const output = new SimpleStream<string>();

const store = new Store();
const timestamps: string[] = [];

output.data(data => {
const quads = new Parser().parse(data);
const subj = quads[0].subject;
store.addQuads(quads);
timestamps.push(store.getObjects(subj, "http://ex.org/timestamp", null)[0].value);
}).on("end", () => {
// Check there number of members
expect(store.getObjects(null, SDS.payload, null).length).toBe(3);

// Check all properties are extracted for members
expect(store.getQuads(null, "http://ex.org/prop1", literal("some value A"), null).length).toBe(1);
expect(store.getQuads(null, "http://ex.org/prop1", literal("some value B"), null).length).toBe(1);
expect(store.getQuads(null, "http://ex.org/prop1", literal("some value C"), null).length).toBe(1);
expect(store.getQuads(null, "http://ex.org/timestamp", null, null).length).toBe(3);

let currT = 0;
for (const ts of timestamps) {
const tsv = new Date(ts).getTime();
expect(tsv).toBeGreaterThanOrEqual(currT);
currT = tsv;
}
});

// Execute function
sdsify(input, output, STREAM_ID, "http://ex.org/timestamp", [SHAPE_4]);

// Push some data in
await input.push(INPUT_3);
await input.end();
});
});

0 comments on commit e563ccd

Please sign in to comment.