Skip to content

Commit

Permalink
Include timestamps of for timed-out queries
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenEschauzier authored and rubensworks committed Jun 5, 2024
1 parent a6b2ace commit 902ca9f
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 47 deletions.
106 changes: 65 additions & 41 deletions lib/ResultAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,83 +31,101 @@ export class ResultAggregator implements IResultAggregator {
name: resultGroup[0].name,
id: resultGroup[0].id,
time: 0,
timeMax: 0,
timeMin: 0,
timeMax: Number.NEGATIVE_INFINITY,
timeMin: Number.POSITIVE_INFINITY,
failures: 0,
replication: resultGroup.length,
results: 0,
resultsMax: 0,
resultsMin: 0,
resultsMax: Number.NEGATIVE_INFINITY,
resultsMin: Number.POSITIVE_INFINITY,
hash: '',
timestamps: [],
timestampsMax: [],
timestampsMin: [],
};
let inconsistentResults = false;
let successfulExecutions = 0;
const timestampDivisors: number[] = [];
const timestampsAll: number[][] = [];
// Track max number of timestamps for averaging of timestamps later
let maxNumTimestamp = 0;
for (const result of resultGroup) {
if (result.error) {
aggregate.error = result.error;
aggregate.failures++;
} else if (aggregate.hash.length === 0) {
// Update the aggregate based on the first successful result
successfulExecutions++;
aggregate.time = result.time;
aggregate.timeMax = result.time;
aggregate.timeMin = result.time;
aggregate.results = result.results;
aggregate.resultsMax = result.results;
aggregate.resultsMin = result.results;
aggregate.hash = result.hash;
for (const ts of result.timestamps) {
timestampDivisors.push(1);
aggregate.timestamps.push(ts);
aggregate.timestampsMax.push(ts);
aggregate.timestampsMin.push(ts);
// If no results and error we don't register
if (result.timestamps.length === 0) {
continue;
}
} else {
successfulExecutions++;
aggregate.time += result.time;
aggregate.timeMax = Math.max(aggregate.timeMax, result.time);
aggregate.timeMin = Math.min(aggregate.timeMin, result.time);
aggregate.results += result.results;
aggregate.resultsMax = Math.max(aggregate.resultsMax, result.results);
aggregate.resultsMin = Math.min(aggregate.resultsMin, result.results);
if (aggregate.hash !== result.hash && !aggregate.error) {
aggregate.timeMax = Math.max(aggregate.timeMax, result.time);
aggregate.timeMin = Math.min(aggregate.timeMin, result.time);

// If we haven't registered hash, we do so for full query result
if (aggregate.hash.length === 0) {
aggregate.hash = result.hash;
} else if (aggregate.hash !== result.hash) {
inconsistentResults = true;
aggregate.failures++;
}
for (const [ index, timestamp ] of result.timestamps.entries()) {
if (timestampDivisors.length > index) {
timestampDivisors[index] += 1;
aggregate.timestamps[index] += timestamp;
aggregate.timestampsMax[index] = Math.max(aggregate.timestampsMax[index], timestamp);
aggregate.timestampsMin[index] = Math.min(aggregate.timestampsMin[index], timestamp);
} else {
timestampDivisors.push(1);
aggregate.timestamps.push(timestamp);
aggregate.timestampsMax.push(timestamp);
aggregate.timestampsMin.push(timestamp);
}
}
}
timestampsAll.push(result.timestamps);
if (result.timestamps.length > maxNumTimestamp) {
maxNumTimestamp = result.timestamps.length;
}
}
if (inconsistentResults && !aggregate.error) {
aggregate.error = new Error('Result hash inconsistency');
}
if (successfulExecutions > 0) {
aggregate.time /= successfulExecutions;
aggregate.results /= successfulExecutions;
for (const [ index, timestampDivisor ] of timestampDivisors.entries()) {
aggregate.timestamps[index] /= timestampDivisor;

if (timestampsAll.length > 0) {
if (successfulExecutions > 0) {
aggregate.time /= successfulExecutions;
aggregate.results /= successfulExecutions;
}

const timestampsProcessed = this.averageTimeStamps(timestampsAll, maxNumTimestamp);
aggregate.timestamps = timestampsProcessed.timestampsAverage;
aggregate.timestampsMin = timestampsProcessed.timestampsMin;
aggregate.timestampsMax = timestampsProcessed.timestampsMax;
}

// Convert all possible leftover infinity / -infinity back to 0 for backward compatibility
aggregate.resultsMin = Number.isFinite(aggregate.resultsMin) ? aggregate.resultsMin : 0;
aggregate.resultsMax = Number.isFinite(aggregate.resultsMax) ? aggregate.resultsMax : 0;
aggregate.timeMin = Number.isFinite(aggregate.timeMin) ? aggregate.timeMin : 0;
aggregate.timeMax = Number.isFinite(aggregate.timeMax) ? aggregate.timeMax : 0;

aggregates.push(aggregate);
}
return aggregates;
}

public averageTimeStamps(timestampsAll: number[][], maxNumTimestamps: number): IProcessedTimestamps {
const timestampsSum: number[] = <number[]> Array.from({ length: maxNumTimestamps }).fill(0);
const timestampsMax: number[] = <number[]> Array.from({ length: maxNumTimestamps }).fill(Number.NEGATIVE_INFINITY);
const timestampsMin: number[] = <number[]> Array.from({ length: maxNumTimestamps }).fill(Number.POSITIVE_INFINITY);
const nObsTimestamp: number[] = <number[]> Array.from({ length: maxNumTimestamps }).fill(0);

for (const timestamps of timestampsAll) {
for (const [ j, ts ] of timestamps.entries()) {
timestampsSum[j] += ts;
timestampsMax[j] = Math.max(timestampsMax[j], ts);
timestampsMin[j] = Math.min(timestampsMin[j], ts);
nObsTimestamp[j]++;
}
}
return {
timestampsMax,
timestampsMin,
timestampsAverage: timestampsSum.map((ts, i) => ts / nObsTimestamp[i]),
};
}

/**
* Produce aggregated query results from a set of single execution results.
* @param results Individual query execution results.
Expand All @@ -123,3 +141,9 @@ export class ResultAggregator implements IResultAggregator {
export interface IResultAggregator {
aggregateResults: (results: IResult[]) => IAggregateResult[];
}

export interface IProcessedTimestamps {
timestampsMax: number[];
timestampsMin: number[];
timestampsAverage: number[];
}
48 changes: 45 additions & 3 deletions test/ResultAggregator-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ describe('ResultAggregator', () => {
const hashError = new Error('Result hash inconsistency');
const aggregator = new ResultAggregator();
let results: IResult[];
let noResults: IResult[];

beforeEach(() => {
results = [
Expand All @@ -32,7 +33,7 @@ describe('ResultAggregator', () => {
error: exampleError,
results: 1,
hash: 'b',
timestamps: [ 50 ],
timestamps: [ 30 ],
},
{
name: 'a',
Expand All @@ -51,6 +52,26 @@ describe('ResultAggregator', () => {
timestamps: [ 10, 20, 30 ],
},
];
noResults = [
{
name: 'a',
id: '0',
time: 0,
error: exampleError,
results: 0,
hash: 'a',
timestamps: [ ],
},
{
name: 'a',
id: '0',
time: 0,
error: exampleError,
results: 0,
hash: 'a',
timestamps: [ ],
},
];
});

it('produces the aggregate across one result', () => {
Expand Down Expand Up @@ -93,6 +114,27 @@ describe('ResultAggregator', () => {
expect(aggregator.aggregateResults(results.slice(0, 2))).toEqual(expected);
});

it('produces the aggregate across multiple results with no produced results and timeout', () => {
const expected: IAggregateResult[] = [{
name: 'a',
id: '0',
error: exampleError,
time: 0,
timeMax: 0,
timeMin: 0,
failures: 2,
replication: 2,
results: 0,
resultsMax: 0,
resultsMin: 0,
hash: '',
timestamps: [ ],
timestampsMax: [ ],
timestampsMin: [ ],
}];
expect(aggregator.aggregateResults(noResults)).toEqual(expected);
});

it('produces the aggregate across multiple results with errors', () => {
const expected: IAggregateResult[] = [{
name: 'a',
Expand All @@ -107,8 +149,8 @@ describe('ResultAggregator', () => {
resultsMax: 3,
resultsMin: 3,
hash: 'a',
timestamps: [ 15, 25, 35 ],
timestampsMax: [ 20, 30, 40 ],
timestamps: [ 20, 25, 35 ],
timestampsMax: [ 30, 30, 40 ],
timestampsMin: [ 10, 20, 30 ],
}];
expect(aggregator.aggregateResults(results.slice(0, 3))).toEqual(expected);
Expand Down
6 changes: 3 additions & 3 deletions test/ResultAggregatorComunica-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('ResultAggregatorComunica', () => {
error: exampleError,
results: 1,
hash: 'b',
timestamps: [ 50 ],
timestamps: [ 30 ],
httpRequests: 6,
},
{
Expand Down Expand Up @@ -118,8 +118,8 @@ describe('ResultAggregatorComunica', () => {
resultsMax: 3,
resultsMin: 3,
hash: 'a',
timestamps: [ 15, 25, 35 ],
timestampsMax: [ 20, 30, 40 ],
timestamps: [ 20, 25, 35 ],
timestampsMax: [ 30, 30, 40 ],
timestampsMin: [ 10, 20, 30 ],
httpRequests: 15,
httpRequestsMax: 20,
Expand Down

0 comments on commit 902ca9f

Please sign in to comment.