Skip to content

Commit

Permalink
Add onQuery run listener
Browse files Browse the repository at this point in the history
  • Loading branch information
rubensworks committed Jul 2, 2024
1 parent 31a4582 commit ebffa57
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 15 deletions.
26 changes: 21 additions & 5 deletions lib/SparqlBenchmarkRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export class SparqlBenchmarkRunner {
protected readonly logger?: (message: string) => void;
protected readonly resultAggregator: IResultAggregator;
protected readonly availabilityCheckTimeout: number;
protected readonly endpointFetcher: SparqlEndpointFetcher;
public readonly endpointFetcher: SparqlEndpointFetcher;

public constructor(options: ISparqlBenchmarkRunnerArgs) {
this.logger = options.logger;
Expand All @@ -47,15 +47,15 @@ export class SparqlBenchmarkRunner {
public async run(options: IRunOptions = {}): Promise<IAggregateResult[]> {
// Execute queries in warmup
if (this.warmup > 0) {
await this.executeAllQueries(this.warmup, true);
await this.executeAllQueries(this.warmup, true, options.onQuery);
}

// Execute queries
if (options.onStart) {
await options.onStart();
}

const results = await this.executeAllQueries(this.replication);
const results = await this.executeAllQueries(this.replication, false, options.onQuery);

if (options.onStop) {
await options.onStop();
Expand All @@ -70,9 +70,14 @@ export class SparqlBenchmarkRunner {
* Executes all queries from the runner's query sets, outputting the results.
* @param replication The number of executions per individual query.
* @param warmup Whether the executions are intended for warmup purposes only.
* @param onQuery Callback for when a query is about to be executed.
* @returns The query reults, unless warmup is specified.
*/
public async executeAllQueries(replication: number, warmup = false): Promise<IResult[]> {
public async executeAllQueries(
replication: number,
warmup: boolean,
onQuery?: (queryString: string) => Promise<void>,
): Promise<IResult[]> {
const totalQuerySets = Object.keys(this.querySets).length;
const totalQueries = Object.values(this.querySets).map(qs => qs.length).reduce((acc, qsl) => acc + qsl);
const startTime = Date.now();
Expand All @@ -95,6 +100,9 @@ export class SparqlBenchmarkRunner {
if (this.requestDelay) {
await this.sleep(this.requestDelay);
}
if (onQuery) {
await onQuery(queryString);
}
const result = await this.executeQuery(name, id.toString(), queryString);
if (!warmup) {
results.push(result);
Expand Down Expand Up @@ -225,7 +233,10 @@ export class SparqlBenchmarkRunner {
await this.sleep(1_000);
this.log(`Endpoint not available yet, waited for ${elapsed()} seconds...`);
}
this.log(`Endpoint available after ${elapsed()} seconds`);
const seconds = elapsed();
if (seconds > 0) {
this.log(`Endpoint available after ${seconds} seconds`);
}
}

/**
Expand Down Expand Up @@ -303,4 +314,9 @@ export interface IRunOptions {
* A listener for when the actual query executions have stopped.
*/
onStop?: () => Promise<void>;
/**
* A listener for when a query is about to be executed.
* @param queryString A query string.
*/
onQuery?: (queryString: string) => Promise<void>;
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"version": "manual-git-changelog onversion"
},
"dependencies": {
"fetch-sparql-endpoint": "^5.0.0",
"fetch-sparql-endpoint": "^5.1.0",
"rdf-string": "^1.0.0",
"yargs": "^17.0.0"
},
Expand Down
26 changes: 21 additions & 5 deletions test/SparqlBenchmarkRunner-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,12 @@ describe('SparqlBenchmarkRunner', () => {
it('runs the whole query set and invokes listeners', async() => {
const onStart = jest.fn();
const onStop = jest.fn();
await runner.run({ onStart, onStop });
const onQuery = jest.fn();
await runner.run({ onStart, onStop, onQuery });
expect(onStart).toHaveBeenCalledTimes(1);
expect(onStop).toHaveBeenCalledTimes(1);
expect(onQuery).toHaveBeenCalledTimes(12);
expect(onQuery).toHaveBeenCalledWith('Q1');
});
});

Expand Down Expand Up @@ -960,8 +963,7 @@ describe('SparqlBenchmarkRunner', () => {

expect(logger).toHaveBeenNthCalledWith(1, 'Executing 2 query sets, containing 4 queries, with replication of 2');
expect(logger).toHaveBeenNthCalledWith(2, 'Execute: 1 / 8 <a#0>');
expect(logger).toHaveBeenNthCalledWith(3, 'Endpoint available after 0 seconds');
expect(logger).toHaveBeenNthCalledWith(4, `${expectedError.name}: ${expectedError.message}`);
expect(logger).toHaveBeenNthCalledWith(3, `${expectedError.name}: ${expectedError.message}`);
});

it('logs error for throwing query in stream and marks it as errored', async() => {
Expand Down Expand Up @@ -1070,8 +1072,7 @@ describe('SparqlBenchmarkRunner', () => {

expect(logger).toHaveBeenNthCalledWith(1, 'Executing 2 query sets, containing 4 queries, with replication of 2');
expect(logger).toHaveBeenNthCalledWith(2, 'Execute: 1 / 8 <a#0>');
expect(logger).toHaveBeenNthCalledWith(3, 'Endpoint available after 0 seconds');
expect(logger).toHaveBeenNthCalledWith(4, `${expectedError.name}: ${expectedError.message}`);
expect(logger).toHaveBeenNthCalledWith(3, `${expectedError.name}: ${expectedError.message}`);
});
});

Expand Down Expand Up @@ -1253,6 +1254,21 @@ describe('SparqlBenchmarkRunner', () => {
});
});

describe('waitForEndpoint', () => {
it('only prints if endpoint is not immediately available', async() => {
let call = 0;
process.hrtime = <any> (() => [ 1, 1_000_000 ]);
jest.spyOn(runner, 'endpointAvailable').mockImplementation(async(): Promise<boolean> => {
return call++ !== 0;
});
jest.spyOn(runner, 'sleep').mockResolvedValue();
await runner.waitForEndpoint();

expect(logger).toHaveBeenNthCalledWith(1, `Endpoint not available yet, waited for 1 seconds...`);
expect(logger).toHaveBeenNthCalledWith(2, `Endpoint available after 1 seconds`);
});
});

describe('bindingsToString', () => {
const bindings: Record<string, RDF.Term> = {
a: DF.namedNode('ex:a'),
Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2537,10 +2537,10 @@ fb-watchman@^2.0.0:
dependencies:
bser "2.1.1"

fetch-sparql-endpoint@^5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/fetch-sparql-endpoint/-/fetch-sparql-endpoint-5.0.0.tgz#89b4338752b5a537625458006d89f842697f9948"
integrity sha512-I22MKV+A02I1uK5vnDfUxp/dIWwXySgam7FMpbaSvACl3l8FkNzaDg6eolC6WLV+gfLt//I9DnrL0Eqn2DGxwA==
fetch-sparql-endpoint@^5.1.0:
version "5.1.0"
resolved "https://registry.yarnpkg.com/fetch-sparql-endpoint/-/fetch-sparql-endpoint-5.1.0.tgz#95000b48aca1cb601ebd345af24ddbcd173d665c"
integrity sha512-ylROBEdVOVzaGdngq3hSGuA/cDtmRiMmPMU75dsu9xatdKEtU39vRp3HbVxdgzqDDX4HU0FnTBQ/+ciMaEBdbA==
dependencies:
"@rdfjs/types" "*"
"@types/n3" "^1.0.0"
Expand Down

0 comments on commit ebffa57

Please sign in to comment.