Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more info for ndjson errors #157

Merged
merged 8 commits into from
Sep 5, 2024
3 changes: 2 additions & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
coverage
coverage
ecqm-content-r4-2021
9 changes: 7 additions & 2 deletions src/database/dbOperations.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,13 @@ const pushBulkFailedOutcomes = async (clientId, failedOutcomes) => {
* @param {String} fileUrl The url for the resource ndjson
* @param {Array} failedOutcomes An array of strings with messages detailing why the resource failed import
*/
const pushNdjsonFailedOutcomes = async (clientId, fileUrl, failedOutcomes) => {
const pushNdjsonFailedOutcomes = async (clientId, fileUrl, failedOutcomes, successCount) => {
const collection = db.collection('ndjsonStatuses');
await collection.insertOne({ id: clientId + fileUrl, failedOutcomes: failedOutcomes });
await collection.insertOne({
id: clientId + fileUrl,
failedOutcomes: failedOutcomes,
successCount: successCount
});
return clientId;
};

Expand Down Expand Up @@ -288,6 +292,7 @@ const decrementBulkFileCount = async (clientId, resourceCount) => {

// Complete import request when file count reaches 0
if (value.exportedFileCount === 0) {
logger.info(`Completed Import Request for: ${clientId}`);
await completeBulkImportRequest(clientId);
}
};
Expand Down
41 changes: 33 additions & 8 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ const { buildConfig } = require('./config/profileConfig');
const { initialize } = require('./server/server');
const childProcess = require('child_process');
const os = require('os');
const ndjsonQueue = require('./queue/ndjsonProcessQueue');

const app = express();
app.use(express.json({ limit: '50mb', type: 'application/json+fhir' }));
app.use(express.json({ limit: '50mb', type: 'application/fhir+json' }));

const config = buildConfig();
const server = initialize(config, app);
const workerProcesses = [];

const workerTotal =
parseInt(process.env.IMPORT_WORKERS) + parseInt(process.env.NDJSON_WORKERS) + parseInt(process.env.EXEC_WORKERS);
Expand All @@ -23,22 +25,26 @@ if (workerTotal > os.cpus().length) {
}

for (let i = 0; i < process.env.IMPORT_WORKERS; i++) {
childProcess.fork('./src/server/importWorker.js');
workerProcesses.push(childProcess.fork('./src/server/importWorker.js'));
}

for (let i = 0; i < process.env.EXEC_WORKERS; i++) {
childProcess.fork('./src/server/execWorker.js');
workerProcesses.push(childProcess.fork('./src/server/execWorker.js'));
}

for (let i = 0; i < process.env.NDJSON_WORKERS; i++) {
const child = childProcess.fork('./src/server/ndjsonWorker.js');

// Database updates need to happen from the main process to avoid race conditions
child.on('message', async ({ clientId, resourceCount, successCount }) => {
workerProcesses.push(childProcess.fork('./src/server/ndjsonWorker.js'));
}
// Database updates need to happen from the main process to avoid race conditions for bulk status update
ndjsonQueue.on('job succeeded', async (jobId, { clientId, resourceCount, successCount }) => {
logger.debug(`ndjson job ${jobId} succeeded`);
try {
await decrementBulkFileCount(clientId, resourceCount);
await updateSuccessfulImportCount(clientId, successCount);
});
}
} catch (e) {
logger.info(`Error processing ndjson-worker message: ${e.message}`);
}
});

const port = process.env.SERVER_PORT || 3000;

Expand All @@ -51,3 +57,22 @@ server.listen(port, async () => {
logger.info('Added qicore profiles to validator server');
}
});

process.on('exit', exitHandler);
process.on('SIGINT', exitHandler);
process.on('SIGTERM', exitHandler);

let stopping = false;
function exitHandler() {
if (!stopping) {
stopping = true;
logger.info('Shuting down...');
workerProcesses.forEach(worker => {
worker.kill('SIGTERM');
});
setTimeout(() => {
logger.info('Workers stopped... Goodbye!');
process.exit();
}, 2000);
}
}
13 changes: 13 additions & 0 deletions src/server/execWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,17 @@
return res;
});

process.on('exit', exitHandler);
process.on('SIGINT', exitHandler);
process.on('SIGTERM', exitHandler);

let stopping = false;
function exitHandler() {

Check warning on line 120 in src/server/execWorker.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🕹️ Function is not covered

Warning! Not covered function
if (!stopping) {
stopping = true;

Check warning on line 122 in src/server/execWorker.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
logger.info(`exec-worker-${process.pid}: Execution Worker Stopping`);

Check warning on line 123 in src/server/execWorker.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
process.exit();

Check warning on line 124 in src/server/execWorker.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
}

Check warning on line 125 in src/server/execWorker.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

Check warning on line 125 in src/server/execWorker.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch
}

module.exports = execQueue;
13 changes: 13 additions & 0 deletions src/server/importWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,16 @@ const executeImportWorkflow = async (clientEntryId, inputUrls) => {
return false;
}
};

process.on('exit', exitHandler);
process.on('SIGINT', exitHandler);
process.on('SIGTERM', exitHandler);

let stopping = false;
function exitHandler() {
if (!stopping) {
stopping = true;
logger.info(`import-worker-${process.pid}: Import Worker Stopping`);
process.exit();
}
}
69 changes: 46 additions & 23 deletions src/server/ndjsonWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,54 +50,77 @@ ndjsonWorker.process(async job => {
await pushBulkFailedOutcomes(clientId, outcome);
process.send({ clientId, resourceCount: 0, successCount: 0 });
logger.info(`ndjson-worker-${process.pid}: failed to fetch ${fileName}`);
return;
return { clientId, resourceCount: 0, successCount: 0 };
}

const ndjsonLines = ndjsonResources.trim().split(/\n/);
if (ndjsonLines.length > 0 && JSON.parse(ndjsonLines[0]).resourceType === 'Parameters') {
// check first line for a Parameters header and remove if necessary
ndjsonLines.shift();
}
const insertions = ndjsonLines.map(async resourceStr => {
let data;
try {
data = JSON.parse(resourceStr);
} catch (e) {
throw new Error(`Error parsing JSON: ${resourceStr}`);
const ndjsonLines = ndjsonResources.split(/\n/);

// keep track of when we hit the first non-empty line
let firstNonEmptyLine = null;

const insertions = ndjsonLines.map(async (resourceStr, index) => {
resourceStr = resourceStr.trim();

// if line is empty skip
if (resourceStr === '') {
return null;
}

// if this is the first non empty line then mark that we found it
if (firstNonEmptyLine === null) {
firstNonEmptyLine = index;
}

// attempt to parse the line
try {
const data = JSON.parse(resourceStr);

// check if first non empty line is a Parameters header and skip it
if (firstNonEmptyLine === index && data.resourceType === 'Parameters') {
return null;
}

checkSupportedResource(data.resourceType);
return updateResource(data.id, data, data.resourceType);
} catch (e) {
// Here, the location of the error message varies between standard error and ServerError
lmd59 marked this conversation as resolved.
Show resolved Hide resolved
// The former path finds the message for a ServerError, the latter for a standard error
throw new Error(
`${data.resourceType}/${data.id} failed import with the following message: ${
e.issue?.[0]?.details?.text ?? e.message
}`
);
// Rethrow the error with info on the line number. This fails the async promise and will be collected later.
throw new Error(`Failed to process entry at row ${index + 1}: ${e.issue?.[0]?.details?.text ?? e.message}`);
}
});

const outcomes = await Promise.allSettled(insertions);

const failedOutcomes = outcomes.filter(outcome => outcome.status === 'rejected');
const successfulOutcomes = outcomes.filter(outcome => outcome.status === 'fulfilled');
const successfulOutcomes = outcomes.filter(outcome => outcome.status === 'fulfilled' && outcome.value?.id);

const outcomeData = [];

failedOutcomes.forEach(out => {
outcomeData.push(out.reason.message);
});
const successCount = successfulOutcomes.length;

// keep track of failed outcomes for individual ndjson files
await pushNdjsonFailedOutcomes(clientId, fileUrl, outcomeData);
await pushNdjsonFailedOutcomes(clientId, fileUrl, outcomeData, successCount);

await pushBulkFailedOutcomes(clientId, outcomeData);
const successCount = successfulOutcomes.length;
logger.info(`ndjson-worker-${process.pid}: processed ${fileName}`);

logger.info(`ndjson-worker-${process.pid}: processed ${fileName}`);
process.send({ clientId, resourceCount, successCount });

await mongoUtil.client.close();
return { clientId, resourceCount, successCount };
});

process.on('exit', exitHandler);
process.on('SIGINT', exitHandler);
process.on('SIGTERM', exitHandler);

let stopping = false;
function exitHandler() {
if (!stopping) {
stopping = true;
logger.info(`ndjson-worker-${process.pid}: ndjson Worker Stopping`);
process.exit();
}
}
51 changes: 51 additions & 0 deletions src/services/bulkstatus.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,28 @@
};
response.entry[0].resource.parameter.push(inputResult);
});
if (ndjsonStatus.successCount) {
const successCountResult = {
name: 'outcome',
part: [
{ name: 'associatedInputUrl', valueUrl: url.valueUrl },
{
name: 'operationOutcome',
resource: {
resourceType: 'OperationOutcome',
issue: [
{
severity: 'information',
code: 'informational',
details: { text: `Successfully processed ${ndjsonStatus.successCount} rows.` }
}
]
}
}
]
};
response.entry[0].resource.parameter.push(successCountResult);
}
}
}
}
Expand All @@ -156,6 +178,35 @@
}
]
});
// create success counts for every file
for (const parameter of bulkStatus.importManifest.parameter) {
if (parameter.name === 'input') {
const url = parameter.part.find(p => p.name === 'url');

Check warning on line 184 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

Check warning on line 184 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

Check warning on line 184 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🕹️ Function is not covered

Warning! Not covered function
const ndjsonStatus = await getNdjsonFileStatus(clientId, url.valueUrl);

Check warning on line 185 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
if (ndjsonStatus?.successCount) {
const successCountResult = {
name: 'outcome',
part: [
{ name: 'associatedInputUrl', valueUrl: url.valueUrl },
{
name: 'operationOutcome',
resource: {
resourceType: 'OperationOutcome',
issue: [
{
severity: 'information',
code: 'informational',
details: { text: `Successfully processed ${ndjsonStatus.successCount} rows.` }
elsaperelli marked this conversation as resolved.
Show resolved Hide resolved
}
]
}
}
]
};

Check warning on line 205 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
response.entry[0].resource.parameter.push(successCountResult);

Check warning on line 206 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
}

Check warning on line 207 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

Check warning on line 207 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch
}

Check warning on line 208 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch
}
}

return response;
Expand Down
18 changes: 18 additions & 0 deletions test/fixtures/testBulkStatus.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@
{
"name": "requestIdentity",
"valueString": "request-identity-example"
},
{
"name": "input",
"part": [
{
"name": "url",
"valueUrl": "http://localhost:3001/Condition.ndjson"
},
{
"name": "inputDetails",
"part": [
{
"name": "resourceType",
"valueCode": "Condition"
}
]
}
]
}
]
}
Expand Down
7 changes: 7 additions & 0 deletions test/fixtures/testNdjsonStatus.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
{
"id": "COMPLETED_REQUEST_WITH_RESOURCE_ERRORShttp://localhost:3001/Condition.ndjson",
"failedOutcomes": ["Test error message"],
"successCount": 3
}
]
5 changes: 5 additions & 0 deletions test/populateTestData.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { db, client } = require('../src/database/connection');
const testStatuses = require('./fixtures/testBulkStatus.json');
const testNdjsonStatuses = require('./fixtures/testNdjsonStatus.json');
const testOperationOutcome = require('./fixtures/fhir-resources/testOperationOutcome.json');
const importQueue = require('../src/queue/importQueue');
const { execQueue } = require('../src/queue/execQueue');
Expand Down Expand Up @@ -44,7 +45,11 @@ const bulkStatusSetup = async () => {
const promises = testStatuses.map(async status => {
await createTestResource(status, 'bulkImportStatuses');
});
const ndjsonStatus = testNdjsonStatuses.map(async ndjsonStatus => {
await createTestResource(ndjsonStatus, 'ndjsonStatuses');
});
await Promise.all(promises);
await Promise.all(ndjsonStatus);
};

const clientFileSetup = async () => {
Expand Down
19 changes: 16 additions & 3 deletions test/services/bulkstatus.service.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,31 @@ describe('bulkstatus.service', () => {
expect(response.headers['content-type']).toEqual('application/json; charset=utf-8');
expect(response.body).toBeDefined();
expect(response.body.entry[0].response.status).toEqual('200');
// add check for All OK OperationOutcome ?
expect(response.body.entry[0].resource.parameter[1].part[0].resource.issue[0].details.text).toEqual('All OK');
});

it('returns 200 status and a batch-response bundle with 400 status when $import failed', async () => {
const response = await supertest(server.app).get('/4_0_1/bulkstatus/ERROR_REQUEST').expect(200);
expect(response.headers['content-type']).toEqual('application/json; charset=utf-8');
expect(response.body).toBeDefined();
expect(response.body.entry[0].response.status).toEqual('400');
// add check for fatal OperationOutcome ?
expect(response.body.entry[0].response.outcome.issue[0].severity).toEqual('fatal');
});

// TODO: Add tests for when a 200 status is returned but there were failed outcomes
it('returns 200 status for completed request but with one failed outcome', async () => {
const response = await supertest(server.app)
.get('/4_0_1/bulkstatus/COMPLETED_REQUEST_WITH_RESOURCE_ERRORS')
.expect(200);
expect(response.headers['content-type']).toEqual('application/json; charset=utf-8');
expect(response.body).toBeDefined();
expect(response.body.entry[0].response.status).toEqual('200');
expect(response.body.entry[0].resource.parameter[1].part[1].resource.issue[0].details.text).toEqual(
'Test error message'
);
expect(response.body.entry[0].resource.parameter[2].part[1].resource.issue[0].details.text).toEqual(
'Successfully processed 3 rows.'
);
});

it('returns 404 status for request with unknown ID', async () => {
await supertest(server.app)
Expand Down
Loading