Skip to content

Commit

Permalink
HARMONY-1996: Add work-item status 'warning' and substatus work items
Browse files Browse the repository at this point in the history
and level/category to job_errors to support warnings as well as errors
  • Loading branch information
indiejames committed Jan 24, 2025
1 parent 4fe4a84 commit 8551b21
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 19 deletions.
18 changes: 17 additions & 1 deletion db/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ CREATE TABLE `job_errors` (
`jobID` char(36) not null,
`url` varchar(4096) not null,
`message` varchar(4096) not null,
`level` varchar(255) check (
`level` in (
'error',
'warning'
)
) not null default 'error',
`category` varchar(255) check (
`category` in (
'generic',
'no-data'
)
) not null default 'generic',
`createdAt` datetime not null,
`updatedAt` datetime not null,
FOREIGN KEY(jobID) REFERENCES jobs(jobID)
Expand Down Expand Up @@ -80,7 +92,8 @@ CREATE TABLE `work_items` (
`workflowStepIndex` integer not null,
`scrollID` varchar(4096),
`serviceID` varchar(255) not null,
`status` text check (`status` in ('ready', 'queued', 'running', 'successful', 'failed', 'canceled', 'no-data')) not null,
`status` varchar(255) check (`status` in ('ready', 'queued', 'running', 'successful', 'failed', 'canceled', 'warning')) not null,
`subStatus` varchar(255) check (`subStatus` in ('no-data')),
`stacCatalogLocation` varchar(255),
`totalItemsSize` double precision not null default 0,
`outputItemSizesJson` text,
Expand Down Expand Up @@ -183,9 +196,12 @@ CREATE INDEX jobs_username_idx ON jobs(jobID, username);
CREATE INDEX job_links_jobID_idx ON job_links(jobID);
CREATE INDEX job_links_jobID_id_idx ON job_links(jobID, id);
CREATE INDEX job_errors_jobID_idx ON job_errors(jobID);
CREATE INDEX job_errors_level_idx ON job_errors(level);
CREATE INDEX job_errors_category_idx ON job_errors(category);
CREATE INDEX work_items_jobID_idx ON work_items(jobID);
CREATE INDEX work_items_serviceID_idx ON work_items(serviceID);
CREATE INDEX work_items_status_idx ON work_items(status);
CREATE INDEX work_items_subStatus_idx ON work_items(subStatus);
CREATE INDEX workflow_steps_jobID_idx ON workflow_steps(jobID);
CREATE INDEX workflow_steps_jobID_StepIndex_idx ON workflow_steps(jobID, stepIndex);
CREATE INDEX workflow_steps_serviceID_idx ON workflow_steps(serviceID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ exports.up = function (knex, Promise) {
ALTER TABLE "work_items"
DROP CONSTRAINT "work_items_status_check",
ADD CONSTRAINT "work_items_status_check"
CHECK (status IN ('ready', 'queued', 'running', 'successful', 'failed', 'canceled', 'no-data'))
CHECK (status IN ('ready', 'queued', 'running', 'successful', 'failed', 'canceled', 'warning')),
ADD COLUMN "sub_status" VARCHAR(255),
ADD CONSTRAINT "work_items_sub_status_check"
CHECK (sub_status IN (null, 'no-data'));
CREATE INDEX work_items_sub_status ON work_items (sub_status)
`);
};

Expand All @@ -18,6 +23,8 @@ exports.up = function (knex, Promise) {
exports.down = function (knex) {
return knex.schema.raw(`
ALTER TABLE "work_items"
DROP CONSTRAINT "work_items_sub_status_check",
DROP COLUMN "sub_status"),
DROP CONSTRAINT "work_items_status_check",
ADD CONSTRAINT "work_items_status_check"
CHECK (status IN ('ready', 'queued', 'running', 'successful', 'failed', 'canceled'))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = function (knex, Promise) {
return knex.schema.raw(`
ALTER TABLE "job_errors"
ADD COLUMN "level" VARCHAR(255) DEFAULT 'error' NOT NULL,
ADD CONSTRAINT "job_errors_level_check"
CHECK (level IN ('error', 'warning')),
ADD COLUMN "category" VARCHAR(255) DEFAULT 'generic' NOT NULL,
ADD CONSTRAINT "job_errors_category_check"
CHECK (category IN ('generic', 'no-data'));
CREATE INDEX job_errors_level ON job_errors (level);
CREATE INDEX job_errors_category ON job_errors (category)
`);
};

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = function (knex) {
return knex.schema.raw(`
DROP INDEX job_errors_category;
DROP_INDEX job_errors_level;
ALTER TABLE "job_errors"
DROP CONSTRAINT "job_errors_category_check",
DROP COLUMN "category",
DROP CONSTRAINT "job_errors_level_check",
DROP COLUMN "level"
`);
};
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ export async function preprocessWorkItem(
let catalogItems;
try {
// TODO fix this in HARMONY-1995
if ([WorkItemStatus.SUCCESSFUL, WorkItemStatus.NO_DATA].includes(status) && !nextWorkflowStep) {
if ([WorkItemStatus.SUCCESSFUL, WorkItemStatus.WARNING].includes(status) && !nextWorkflowStep) {
// if we are CREATING STAC CATALOGS for the last step in the chain we should read the catalog items
// since they are needed for generating the output links we will save
catalogItems = await readCatalogsItems(results);
Expand Down Expand Up @@ -609,13 +609,13 @@ export async function processWorkItem(
): Promise<void> {
const { jobID } = job;
const { status, errorMessage, catalogItems, outputItemSizes } = preprocessResult;
const { workItemID, hits, results, scrollID } = update;
const { workItemID, hits, results, scrollID, subStatus } = update;
const startTime = new Date().getTime();
let durationMs;
let jobSaveStartTime;
let didCreateWorkItem = false;
if (status === WorkItemStatus.SUCCESSFUL) {
logger.info(`Updating work item ${workItemID} to ${status}`);
if (status === WorkItemStatus.SUCCESSFUL || status === WorkItemStatus.WARNING) {
logger.info(`Updating work item ${workItemID} to ${status} | ${subStatus}`);
}

try {
Expand Down Expand Up @@ -701,6 +701,7 @@ export async function processWorkItem(
tx,
workItemID,
status,
subStatus,
duration,
totalItemsSize,
outputItemSizes);
Expand All @@ -712,6 +713,7 @@ export async function processWorkItem(
logger.info(`Updated work item. Duration (ms) was: ${duration}`);

workItem.status = status;
workItem.subStatus = subStatus;

let allWorkItemsForStepComplete = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export async function updateWorkItem(req: HarmonyRequest, res: Response): Promis
const { id } = req.params;
const {
status,
subStatus,
hits,
results,
scrollID,
Expand All @@ -116,6 +117,7 @@ export async function updateWorkItem(req: HarmonyRequest, res: Response): Promis
const update = {
workItemID: parseInt(id),
status,
subStatus,
hits,
results,
scrollID,
Expand Down
12 changes: 10 additions & 2 deletions services/harmony/app/models/work-item-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@ export enum WorkItemStatus {
SUCCESSFUL = 'successful',
FAILED = 'failed',
CANCELED = 'canceled',
NO_DATA = 'no-data',
WARNING = 'warning',
};

// additional information about the status - currently only relevant for WARNING status
export enum WorkItemSubStatus {
NO_DATA = 'no-data', // the service responded with no data
}

export const COMPLETED_WORK_ITEM_STATUSES = [
WorkItemStatus.SUCCESSFUL,
WorkItemStatus.FAILED,
WorkItemStatus.CANCELED,
WorkItemStatus.NO_DATA,
WorkItemStatus.WARNING,
];

export interface WorkItemRecord {
Expand All @@ -38,6 +43,9 @@ export interface WorkItemRecord {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
subStatus?: WorkItemSubStatus;

// error message if status === FAILED
errorMessage?: string;

Expand Down
5 changes: 4 additions & 1 deletion services/harmony/app/models/work-item-update.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _ from 'lodash';
import { WorkItemStatus } from './work-item-interface';
import { WorkItemStatus, WorkItemSubStatus } from './work-item-interface';

/**
*
Expand All @@ -14,6 +14,9 @@ export default interface WorkItemUpdate {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
subStatus?: WorkItemSubStatus;

// The ID of the scroll session (only used for the query cmr service)
scrollID?: string;

Expand Down
17 changes: 12 additions & 5 deletions services/harmony/app/models/work-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import env from '../util/env';
import { Job, JobStatus } from './job';
import Record from './record';
import WorkflowStep from './workflow-steps';
import { WorkItemRecord, WorkItemStatus, getStacLocation, WorkItemQuery } from './work-item-interface';
import { WorkItemRecord, WorkItemStatus, getStacLocation, WorkItemQuery, WorkItemSubStatus } from './work-item-interface';
import { eventEmitter } from '../events';
import { getWorkSchedulerQueue } from '../../app/util/queue/queue-factory';

Expand Down Expand Up @@ -51,6 +51,9 @@ export default class WorkItem extends Record implements WorkItemRecord {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
subStatus?: WorkItemSubStatus;

// error message if status === FAILED
errorMessage?: string;

Expand Down Expand Up @@ -297,6 +300,7 @@ export async function getWorkItemStatus(
* @param tx - the transaction to use for querying
* @param id - the id of the WorkItem
* @param status - the status to set for the WorkItem
* @param subStatus - the sub-status to set for the WorkItem
* @param duration - how long the work item took to process
* @param totalItemsSize - the combined sizes of all the input granules for this work item
* @param outputItemSizes - the separate size of each granule in the output for this work item
Expand All @@ -305,6 +309,7 @@ export async function updateWorkItemStatus(
tx: Transaction,
id: number,
status: WorkItemStatus,
subStatus: WorkItemSubStatus,
duration: number,
totalItemsSize: number,
outputItemSizes: number[],
Expand All @@ -313,11 +318,11 @@ export async function updateWorkItemStatus(
const outputItemSizesJson = JSON.stringify(outputItemSizes);
try {
await tx(WorkItem.table)
.update({ status, duration, totalItemsSize, outputItemSizesJson: outputItemSizesJson, updatedAt: new Date() })
.update({ status, subStatus, duration, totalItemsSize, outputItemSizesJson: outputItemSizesJson, updatedAt: new Date() })
.where({ id });
logger.debug(`Status for work item ${id} set to ${status}`);
logger.debug(`Status for work item ${id} set to ${status} | ${subStatus}`);
} catch (e) {
logger.error(`Failed to update work item ${id} status to ${status}`);
logger.error(`Failed to update work item ${id} status to ${status} | ${subStatus}`);
logger.error(e);
throw e;
}
Expand All @@ -328,14 +333,16 @@ export async function updateWorkItemStatus(
* @param tx - the transaction to use for querying
* @param ids - the ids of the WorkItems
* @param status - the status to set for the WorkItems
* @param subStatus - the sub-status to set for the WorkItems
*/
export async function updateWorkItemStatuses(
tx: Transaction,
ids: number[],
status: WorkItemStatus,
subStatus?: WorkItemSubStatus,
): Promise<void> {
const now = new Date();
let update = { status, updatedAt: now };
let update = { status, subStatus, updatedAt: now };
// if we are setting the status to running, also set the startedAt time
if (status === WorkItemStatus.RUNNING) {
update = { ...update, ...{ startedAt: now } };
Expand Down
12 changes: 7 additions & 5 deletions services/harmony/test/work-items/work-backends.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkItemStatus, getStacLocation, WorkItemRecord } from './../../app/models/work-item-interface';
import { WorkItemStatus, getStacLocation, WorkItemRecord, WorkItemSubStatus } from './../../app/models/work-item-interface';
import { Job, JobRecord, JobStatus, terminalStates } from './../../app/models/job';
import { describe, it } from 'mocha';
import * as sinon from 'sinon';
Expand Down Expand Up @@ -490,7 +490,8 @@ describe('Work Backends', function () {
const noDataWorkItemRecord = {
...workItemRecord,
...{
status: WorkItemStatus.NO_DATA,
status: WorkItemStatus.WARNING,
subStatus: WorkItemSubStatus.NO_DATA,
results: [getStacLocation({ id: workItemRecord.id, jobID: workItemRecord.jobID }, 'catalog.json')],
outputItemSizes: [],
duration: 0,
Expand All @@ -501,9 +502,10 @@ describe('Work Backends', function () {
});
hookWorkItemUpdate((r) => r.send(noDataWorkItemRecord));

it('sets the work item status to no-data', async function () {
it('sets the work item status to warning with no-data', async function () {
const updatedWorkItem = await getWorkItemById(db, this.workItem.id);
expect(updatedWorkItem.status).to.equal(WorkItemStatus.NO_DATA);
expect(updatedWorkItem.status).to.equal(WorkItemStatus.WARNING);
expect(updatedWorkItem.subStatus).to.equal(WorkItemSubStatus.NO_DATA);
});

describe('and the worker computed duration is less than the harmony computed duration', async function () {
Expand Down Expand Up @@ -602,7 +604,7 @@ describe('Work Backends', function () {
}

// tests to make sure work-items cannot be updated once they are in a terminal state
for (const terminalState of [WorkItemStatus.CANCELED, WorkItemStatus.FAILED, WorkItemStatus.SUCCESSFUL, WorkItemStatus.NO_DATA]) {
for (const terminalState of [WorkItemStatus.CANCELED, WorkItemStatus.FAILED, WorkItemStatus.SUCCESSFUL, WorkItemStatus.WARNING]) {
describe(`When the work-item is already in state "${terminalState}"`, async function () {
const newWorkItemRecord = {
...workItemRecord, ...{ status: terminalState },
Expand Down

0 comments on commit 8551b21

Please sign in to comment.