Skip to content

Commit

Permalink
Merge pull request #17 from rdf-connect/fix/check-types-defined
Browse files Browse the repository at this point in the history
fix: Correctly check if no types were configured
  • Loading branch information
smessie authored Jun 25, 2024
2 parents 1806b80 + 8728aa4 commit 974369c
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 27 deletions.
21 changes: 21 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"@types/jsonld": "^1.5.14",
"@types/n3": "^1.16.4",
"@types/node": "^20.14.2",
"@types/rdf-js": "^4.0.2",
"@typescript-eslint/eslint-plugin": "^7.13.0",
"@typescript-eslint/parser": "^7.13.0",
"@vitest/coverage-v8": "^1.6.0",
Expand Down
93 changes: 66 additions & 27 deletions src/sdsify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ function maybeParse(data: Quad[] | string): Quad[] {
// We determine this by assuming that the main node shape
// is not referenced by any other shape description.
// If more than one is found an exception is thrown.
async function extractMainNodeShape(store: RdfStore | null): Promise<Quad_Subject | undefined> {
async function extractMainNodeShape(
store: RdfStore | null,
): Promise<Quad_Subject | undefined> {
if (store) {
const nodeShapes = await getSubjects(
store,
Expand All @@ -48,15 +50,17 @@ async function extractMainNodeShape(store: RdfStore | null): Promise<Quad_Subjec
} else {
throw new Error(
"There are multiple main node shapes in a given shape. " +
"Use sh:xone or sh:or to provide multiple unrelated shapes together."
"Use sh:xone or sh:or to provide multiple unrelated shapes together.",
);
}
}
}
if (mainNodeShape) {
return <Quad_Subject>mainNodeShape;
} else {
throw new Error("No main SHACL Node Shapes found in given shape filter");
throw new Error(
"No main SHACL Node Shapes found in given shape filter",
);
}
} else {
throw new Error("No SHACL Node Shapes found in given shape filter");
Expand All @@ -69,9 +73,10 @@ function getExtractor(shapeStore: RdfStore | null): CBDShapeExtractor {
return new CBDShapeExtractor();
} else {
return new CBDShapeExtractor(shapeStore, undefined, {
fetch: async () => new Response("", {
headers: { "content-type": "text/turtle" }
}),
fetch: async () =>
new Response("", {
headers: { "content-type": "text/turtle" },
}),
});
}
}
Expand All @@ -87,7 +92,9 @@ export function sdsify(
// Setup member extractor
const shapeStore = shape ? RdfStore.createDefault() : null;
if (shape) {
maybeParse(shape).forEach((x) => { if (shapeStore) shapeStore.addQuad(x); });
maybeParse(shape).forEach((x) => {
if (shapeStore) shapeStore.addQuad(x);
});
}
const extractor = getExtractor(shapeStore);

Expand All @@ -99,30 +106,48 @@ export function sdsify(
const members: { [id: string]: SDSMember } = {};
const t0 = new Date();
// Get shape Id (if any)
const shapeId = shape ? await extractMainNodeShape(shapeStore) : undefined;
const shapeId = shape
? await extractMainNodeShape(shapeStore)
: undefined;
const subjects = [];

if (types) {
if (types?.length) {
for (const t of types) {
// Group quads based on given member type
subjects.push(...(await getSubjects(dataStore, RDF.terms.type, t)));
subjects.push(
...(await getSubjects(dataStore, RDF.terms.type, t)),
);
}
} else {
subjects.push(...(await getSubjects(dataStore)));
}

// Extract members from received quads
await Promise.all(subjects.map(async subject => {
if (subject.termType === "NamedNode" && !members[subject.value]) {
const membQuads = await extractor.extract(dataStore, subject, shapeId);
members[subject.value] = {
quads: membQuads,
timestamp: timestampPath ? dataStore.getQuads(subject, timestampPath)[0].object : undefined
};
}
}));
await Promise.all(
subjects.map(async (subject) => {
if (
subject.termType === "NamedNode" &&
!members[subject.value]
) {
const membQuads = await extractor.extract(
dataStore,
subject,
shapeId,
);
members[subject.value] = {
quads: membQuads,
timestamp: timestampPath
? dataStore.getQuads(subject, timestampPath)[0]
.object
: undefined,
};
}
}),
);

console.log(`[sdsify] Members extracted in ${new Date().getTime() - t0.getTime()} ms`);
console.log(
`[sdsify] Members extracted in ${new Date().getTime() - t0.getTime()} ms`,
);

// Sort members based on the given timestamp value (if any) to avoid out of order writing issues downstream
const orderedMembersIds = Object.keys(members);
Expand All @@ -140,21 +165,33 @@ export function sdsify(
const TRANSACTION_ID =
hash
.update(new NWriter().quadsToString(dataStore.getQuads()))
.digest("hex") + "_" + new Date().toISOString();
.digest("hex") +
"_" +
new Date().toISOString();

for (const sub of orderedMembersIds) {
const quads = members[sub].quads;
const blank = df.blankNode();

quads.push(
df.quad(blank, SDS.terms.payload, <Quad_Object>df.namedNode(sub), SDS.terms.custom("DataDescription")),
df.quad(blank, SDS.terms.stream, <Quad_Object>streamNode, SDS.terms.custom("DataDescription")),
df.quad(
blank,
SDS.terms.payload,
<Quad_Object>df.namedNode(sub),
SDS.terms.custom("DataDescription"),
),
df.quad(
blank,
SDS.terms.stream,
<Quad_Object>streamNode,
SDS.terms.custom("DataDescription"),
),
// This is not standardized (yet)
df.quad(
blank,
LDES.terms.custom("transactionId"),
df.literal(TRANSACTION_ID),
SDS.terms.custom("DataDescription")
SDS.terms.custom("DataDescription"),
),
);

Expand All @@ -166,7 +203,7 @@ export function sdsify(
blank,
LDES.terms.custom("isLastOfTransaction"),
df.literal("true", XSD.terms.custom("boolean")),
SDS.terms.custom("DataDescription")
SDS.terms.custom("DataDescription"),
),
);
}
Expand All @@ -175,11 +212,13 @@ export function sdsify(
membersCount += 1;
}

console.log(`[sdsify] successfully pushed ${membersCount} members in ${new Date().getTime() - t0.getTime()} ms`);
console.log(
`[sdsify] successfully pushed ${membersCount} members in ${new Date().getTime() - t0.getTime()} ms`,
);
});

input.on("end", async () => {
console.log("[sdsify] input channel was closed down");
await output.end();
});
}
}

0 comments on commit 974369c

Please sign in to comment.