Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harmony 1996: Add 'empty-result' status to WorkItem class and work-items table #683

Merged
merged 9 commits into from
Jan 27, 2025
18 changes: 17 additions & 1 deletion db/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ CREATE TABLE `job_errors` (
`jobID` char(36) not null,
`url` varchar(4096) not null,
`message` varchar(4096) not null,
`level` varchar(255) check (
`level` in (
'error',
'warning'
)
) not null default 'error',
`category` varchar(255) check (
`category` in (
Copy link
Collaborator

@vinnyinverso vinnyinverso Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to have to change this check every time a new category is added in service lib? If we're adding new categories only via pull request in the service lib, it has been validated already by the time it gets to Harmony. This is also missing the existing error categories like "Server", "Forbidden".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I'll drop this constraint here as well as in the migration.

'generic',
'no-data'
)
) not null default 'generic',
`createdAt` datetime not null,
`updatedAt` datetime not null,
FOREIGN KEY(jobID) REFERENCES jobs(jobID)
Expand Down Expand Up @@ -80,7 +92,8 @@ CREATE TABLE `work_items` (
`workflowStepIndex` integer not null,
`scrollID` varchar(4096),
`serviceID` varchar(255) not null,
`status` text check (`status` in ('ready', 'queued', 'running', 'successful', 'failed', 'canceled')) not null,
`status` varchar(255) check (`status` in ('ready', 'queued', 'running', 'successful', 'failed', 'canceled', 'warning')) not null,
`subStatus` varchar(255) check (`subStatus` in ('no-data')),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column name should besub_status to match the Postgres definition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value is missing null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

`stacCatalogLocation` varchar(255),
`totalItemsSize` double precision not null default 0,
`outputItemSizesJson` text,
Expand Down Expand Up @@ -183,9 +196,12 @@ CREATE INDEX jobs_username_idx ON jobs(jobID, username);
CREATE INDEX job_links_jobID_idx ON job_links(jobID);
CREATE INDEX job_links_jobID_id_idx ON job_links(jobID, id);
CREATE INDEX job_errors_jobID_idx ON job_errors(jobID);
CREATE INDEX job_errors_level_idx ON job_errors(level);
CREATE INDEX job_errors_category_idx ON job_errors(category);
CREATE INDEX work_items_jobID_idx ON work_items(jobID);
CREATE INDEX work_items_serviceID_idx ON work_items(serviceID);
CREATE INDEX work_items_status_idx ON work_items(status);
CREATE INDEX work_items_subStatus_idx ON work_items(subStatus);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column name should besub_status to match the Postgres definition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

CREATE INDEX workflow_steps_jobID_idx ON workflow_steps(jobID);
CREATE INDEX workflow_steps_jobID_StepIndex_idx ON workflow_steps(jobID, stepIndex);
CREATE INDEX workflow_steps_serviceID_idx ON workflow_steps(serviceID);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = function (knex, Promise) {
return knex.schema.raw(`
ALTER TABLE "work_items"
DROP CONSTRAINT "work_items_status_check",
vinnyinverso marked this conversation as resolved.
Show resolved Hide resolved
ADD CONSTRAINT "work_items_status_check"
CHECK (status IN ('ready', 'queued', 'running', 'successful', 'failed', 'canceled', 'warning')),
ADD COLUMN "sub_status" VARCHAR(255),
ADD CONSTRAINT "work_items_sub_status_check"
CHECK (sub_status IN (null, 'no-data'));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have an enumeration on this field which I think we were calling "category" - we should allow any string and don't need this constraint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

CREATE INDEX work_items_sub_status ON work_items (sub_status)
`);
};

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = function (knex) {
return knex.schema.raw(`
ALTER TABLE "work_items"
DROP CONSTRAINT "work_items_sub_status_check",
DROP COLUMN "sub_status"),
DROP CONSTRAINT "work_items_status_check",
ADD CONSTRAINT "work_items_status_check"
CHECK (status IN ('ready', 'queued', 'running', 'successful', 'failed', 'canceled'))
`);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = function (knex, Promise) {
return knex.schema.raw(`
ALTER TABLE "job_errors"
ADD COLUMN "level" VARCHAR(255) DEFAULT 'error' NOT NULL,
ADD CONSTRAINT "job_errors_level_check"
CHECK (level IN ('error', 'warning')),
ADD COLUMN "category" VARCHAR(255) DEFAULT 'generic' NOT NULL,
ADD CONSTRAINT "job_errors_category_check"
CHECK (category IN ('generic', 'no-data'));

CREATE INDEX job_errors_level ON job_errors (level);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment with index naming convention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

CREATE INDEX job_errors_category ON job_errors (category)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it matters, but for consistency should we have each statement end in a semi-colon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

`);
};

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = function (knex) {
return knex.schema.raw(`
DROP INDEX job_errors_category;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got - syntax error at or near "DROP_INDEX" error when running the down migration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh - fixed here and in the other migration

DROP_INDEX job_errors_level;
ALTER TABLE "job_errors"
DROP CONSTRAINT "job_errors_category_check",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a job_error_category_check constraint in the up migration - and I don't think we want one so assume this should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

DROP COLUMN "category",
DROP CONSTRAINT "job_errors_level_check",
DROP COLUMN "level"
`);
};
5 changes: 4 additions & 1 deletion packages/util/tsconfig.build.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
"allowJs": true,
"noImplicitAny": false,
"sourceMap": true,
"outDir": "built"
"outDir": "built",
"typeRoots": [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity what does this fix - anything we should do differently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was having consistent failures from tsc complaining about not finding types for lots of packages. This tells it where to look. Not sure why no one else has seen these errors.

"node_modules/@types"
]
},
"include": ["."],
"exclude": ["./node_modules", "coverage"]
Expand Down
5 changes: 4 additions & 1 deletion packages/util/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
"allowJs": true,
"noImplicitAny": false,
"sourceMap": true,
"outDir": "built"
"outDir": "built",
"typeRoots": [
"node_modules/@types"
],
},
"include": ["."],
"exclude": ["./node_modules", "coverage"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ export async function preprocessWorkItem(
let outputItemSizes;
let catalogItems;
try {
if (status === WorkItemStatus.SUCCESSFUL && !nextWorkflowStep) {
// TODO fix this in HARMONY-1995
if ([WorkItemStatus.SUCCESSFUL, WorkItemStatus.WARNING].includes(status) && !nextWorkflowStep) {
// if we are CREATING STAC CATALOGS for the last step in the chain we should read the catalog items
// since they are needed for generating the output links we will save
catalogItems = await readCatalogsItems(results);
Expand Down Expand Up @@ -608,13 +609,13 @@ export async function processWorkItem(
): Promise<void> {
const { jobID } = job;
const { status, errorMessage, catalogItems, outputItemSizes } = preprocessResult;
const { workItemID, hits, results, scrollID } = update;
const { workItemID, hits, results, scrollID, subStatus } = update;
const startTime = new Date().getTime();
let durationMs;
let jobSaveStartTime;
let didCreateWorkItem = false;
if (status === WorkItemStatus.SUCCESSFUL) {
logger.info(`Updating work item ${workItemID} to ${status}`);
if (status === WorkItemStatus.SUCCESSFUL || status === WorkItemStatus.WARNING) {
logger.info(`Updating work item ${workItemID} to ${status} | ${subStatus}`);
}

try {
Expand Down Expand Up @@ -700,6 +701,7 @@ export async function processWorkItem(
tx,
workItemID,
status,
subStatus,
duration,
totalItemsSize,
outputItemSizes);
Expand All @@ -711,6 +713,7 @@ export async function processWorkItem(
logger.info(`Updated work item. Duration (ms) was: ${duration}`);

workItem.status = status;
workItem.subStatus = subStatus;

let allWorkItemsForStepComplete = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export async function updateWorkItem(req: HarmonyRequest, res: Response): Promis
const { id } = req.params;
const {
status,
subStatus,
hits,
results,
scrollID,
Expand All @@ -116,6 +117,7 @@ export async function updateWorkItem(req: HarmonyRequest, res: Response): Promis
const update = {
workItemID: parseInt(id),
status,
subStatus,
hits,
results,
scrollID,
Expand Down
10 changes: 10 additions & 0 deletions services/harmony/app/models/work-item-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ export enum WorkItemStatus {
SUCCESSFUL = 'successful',
FAILED = 'failed',
CANCELED = 'canceled',
WARNING = 'warning',
}

// additional information about the status - currently only relevant for WARNING status
export enum WorkItemSubStatus {
NO_DATA = 'no-data', // the service responded with no data
}

export const COMPLETED_WORK_ITEM_STATUSES = [
WorkItemStatus.SUCCESSFUL,
WorkItemStatus.FAILED,
WorkItemStatus.CANCELED,
WorkItemStatus.WARNING,
];

export interface WorkItemRecord {
Expand All @@ -36,6 +43,9 @@ export interface WorkItemRecord {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
subStatus?: WorkItemSubStatus;

// error message if status === FAILED
errorMessage?: string;

Expand Down
5 changes: 4 additions & 1 deletion services/harmony/app/models/work-item-update.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _ from 'lodash';
import { WorkItemStatus } from './work-item-interface';
import { WorkItemStatus, WorkItemSubStatus } from './work-item-interface';

/**
*
Expand All @@ -14,6 +14,9 @@ export default interface WorkItemUpdate {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
subStatus?: WorkItemSubStatus;

// The ID of the scroll session (only used for the query cmr service)
scrollID?: string;

Expand Down
19 changes: 13 additions & 6 deletions services/harmony/app/models/work-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import env from '../util/env';
import { Job, JobStatus } from './job';
import Record from './record';
import WorkflowStep from './workflow-steps';
import { WorkItemRecord, WorkItemStatus, getStacLocation, WorkItemQuery } from './work-item-interface';
import { WorkItemRecord, WorkItemStatus, getStacLocation, WorkItemQuery, WorkItemSubStatus } from './work-item-interface';
import { eventEmitter } from '../events';
import { getWorkSchedulerQueue } from '../../app/util/queue/queue-factory';

Expand Down Expand Up @@ -51,6 +51,9 @@ export default class WorkItem extends Record implements WorkItemRecord {
// The status of the operation - see WorkItemStatus
status?: WorkItemStatus;

// The sub-status of the operation - see WorkItemSubStatus
subStatus?: WorkItemSubStatus;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we have coding standard to use underscores in new column names. i.e. sub_status.
I commented above that the sub_status name is not consistent in db definition vs migration. If you want to go with subStatus, it is fine with me as long as we make it consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I thought it could be snake case in the dB, but camel case in the models. I see now that we are not doing that (maybe it doesn't work). I'll make it snake case here.


// error message if status === FAILED
errorMessage?: string;

Expand Down Expand Up @@ -256,7 +259,7 @@ export async function getNextWorkItems(
.whereIn('id', workItemData.map((w) => w.id));
}
} catch (e) {
logger.error(`Error getting next work item for service [${serviceID}] and job [${jobID}]`);
logger.error(`Error getting next work items for service [${serviceID}] and job [${jobID}]`);
logger.error(e);
throw e;
}
Expand Down Expand Up @@ -297,6 +300,7 @@ export async function getWorkItemStatus(
* @param tx - the transaction to use for querying
* @param id - the id of the WorkItem
* @param status - the status to set for the WorkItem
* @param subStatus - the sub-status to set for the WorkItem
* @param duration - how long the work item took to process
* @param totalItemsSize - the combined sizes of all the input granules for this work item
* @param outputItemSizes - the separate size of each granule in the output for this work item
Expand All @@ -305,6 +309,7 @@ export async function updateWorkItemStatus(
tx: Transaction,
id: number,
status: WorkItemStatus,
subStatus: WorkItemSubStatus,
duration: number,
totalItemsSize: number,
outputItemSizes: number[],
Expand All @@ -313,11 +318,11 @@ export async function updateWorkItemStatus(
const outputItemSizesJson = JSON.stringify(outputItemSizes);
try {
await tx(WorkItem.table)
.update({ status, duration, totalItemsSize, outputItemSizesJson: outputItemSizesJson, updatedAt: new Date() })
.update({ status, subStatus, duration, totalItemsSize, outputItemSizesJson: outputItemSizesJson, updatedAt: new Date() })
.where({ id });
logger.debug(`Status for work item ${id} set to ${status}`);
logger.debug(`Status for work item ${id} set to ${status} | ${subStatus}`);
} catch (e) {
logger.error(`Failed to update work item ${id} status to ${status}`);
logger.error(`Failed to update work item ${id} status to ${status} | ${subStatus}`);
logger.error(e);
throw e;
}
Expand All @@ -328,14 +333,16 @@ export async function updateWorkItemStatus(
* @param tx - the transaction to use for querying
* @param ids - the ids of the WorkItems
* @param status - the status to set for the WorkItems
* @param subStatus - the sub-status to set for the WorkItems
*/
export async function updateWorkItemStatuses(
tx: Transaction,
ids: number[],
status: WorkItemStatus,
subStatus?: WorkItemSubStatus,
): Promise<void> {
const now = new Date();
let update = { status, updatedAt: now };
let update = { status, subStatus, updatedAt: now };
// if we are setting the status to running, also set the startedAt time
if (status === WorkItemStatus.RUNNING) {
update = { ...update, ...{ startedAt: now } };
Expand Down
Loading
Loading