Skip to content

Commit

Permalink
Move upsert table to its own worker (#7812)
Browse files Browse the repository at this point in the history
* Move upsert table to its own worker

* Update workflow name

* 🙈

* Prefix deployment name with front-
  • Loading branch information
flvndvd authored Oct 2, 2024
1 parent e06e105 commit 77b8d46
Show file tree
Hide file tree
Showing 18 changed files with 404 additions and 84 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/deploy-front.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ jobs:
chmod +x ./k8s/deploy-image.sh
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/front-image:${{ steps.short_sha.outputs.short_sha }} front-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/front-image:${{ steps.short_sha.outputs.short_sha }} front-worker-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/front-image:${{ steps.short_sha.outputs.short_sha }} upsert-table-worker-deployment
- name: Wait for rollout to complete
run: |
echo "Waiting for rollout to complete (web)"
kubectl rollout status deployment/front-deployment --timeout=10m
echo "Waiting for rollout to complete (worker)"
kubectl rollout status deployment/front-worker-deployment --timeout=10m
echo "Waiting for rollout to complete (upsert table worker)"
kubectl rollout status deployment/upsert-table-worker-deployment --timeout=10m
6 changes: 2 additions & 4 deletions front/lib/upsert_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import { getDocumentsPostUpsertHooksToRun } from "@app/lib/documents_post_proces
import logger from "@app/logger/logger";
import { statsDClient } from "@app/logger/withlogging";
import { launchRunPostUpsertHooksWorkflow } from "@app/temporal/documents_post_process_hooks/client";
import {
launchUpsertDocumentWorkflow,
launchUpsertTableWorkflow,
} from "@app/temporal/upsert_queue/client";
import { launchUpsertDocumentWorkflow } from "@app/temporal/upsert_queue/client";
import { launchUpsertTableWorkflow } from "@app/temporal/upsert_tables/client";

import type { DataSourceResource } from "./resources/data_source_resource";

Expand Down
6 changes: 4 additions & 2 deletions front/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion front/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@
"tailwindcss": "^3.2.4",
"tailwindcss-animate": "^1.0.7",
"three": "^0.163.0",
"uuid": "^9.0.0"
"uuid": "^9.0.0",
"yargs": "^17.7.2"
},
"devDependencies": {
"@google-cloud/storage": "^7.11.2",
Expand Down
81 changes: 50 additions & 31 deletions front/start_worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { setupGlobalErrorHandler } from "@dust-tt/types";
import yargs from "yargs";
import { hideBin } from "yargs/helpers";

import logger from "@app/logger/logger";
import { runPokeWorker } from "@app/poke/temporal/worker";
Expand All @@ -13,37 +15,54 @@ import { runUpdateWorkspaceUsageWorker } from "@app/temporal/usage_queue/worker"

setupGlobalErrorHandler(logger);

runPostUpsertHooksWorker().catch((err) =>
logger.error({ error: err }, "Error running post upsert hooks worker.")
);
runPokeWorker().catch((err) =>
logger.error({ error: err }, "Error running poke worker.")
);
type WorkerName =
| "hard_delete"
| "labs"
| "mentions_count"
| "poke"
| "post_upsert_hooks"
| "production_checks"
| "scrub_workspace_queue"
| "update_workspace_usage"
| "upsert_queue"
| "upsert_table_queue";

runProductionChecksWorker().catch((err) =>
logger.error({ error: err }, "Error running production checks worker.")
);
const workerFunctions: Record<WorkerName, () => Promise<void>> = {
hard_delete: runHardDeleteWorker,
labs: runLabsWorker,
mentions_count: runMentionsCountWorker,
poke: runPokeWorker,
post_upsert_hooks: runPostUpsertHooksWorker,
production_checks: runProductionChecksWorker,
scrub_workspace_queue: runScrubWorkspaceQueueWorker,
update_workspace_usage: runUpdateWorkspaceUsageWorker,
upsert_queue: runUpsertQueueWorker,
upsert_table_queue: runUpsertQueueWorker,
};
const ALL_WORKERS = Object.keys(workerFunctions);

runUpsertQueueWorker().catch((err) =>
logger.error({ error: err }, "Error running upsert queue worker.")
);
async function runWorkers(workers: WorkerName[]) {
for (const worker of workers) {
workerFunctions[worker]().catch((err) =>
logger.error({ error: err }, `Error running ${worker} worker.`)
);
}
}

runUpdateWorkspaceUsageWorker().catch((err) =>
logger.error({ error: err }, "Error running usage queue worker.")
);

runScrubWorkspaceQueueWorker().catch((err) =>
logger.error({ error: err }, "Error running scrub workspace queue worker.")
);

runLabsWorker().catch((err) =>
logger.error({ error: err }, "Error running labs worker.")
);

runHardDeleteWorker().catch((err) =>
logger.error({ error: err }, "Error running hard delete worker.")
);

runMentionsCountWorker().catch((err) =>
logger.error({ error: err }, "Error running mentions count worker.")
);
yargs(hideBin(process.argv))
.option("workers", {
alias: "w",
type: "array",
choices: ALL_WORKERS,
default: ALL_WORKERS,
demandOption: true,
description: "Choose one or multiple workers to run.",
})
.help()
.alias("help", "h")
.parseAsync()
.then(async (args) => runWorkers(args.workers as WorkerName[]))
.catch((err) => {
logger.error({ error: err }, "Error running workers");
process.exit(1);
});
12 changes: 12 additions & 0 deletions front/temporal/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { EnvironmentConfig } from "@dust-tt/types";

const config = {
getUpsertQueueBucket: (): string => {
return EnvironmentConfig.getEnvVariable("DUST_UPSERT_QUEUE_BUCKET");
},
getServiceAccount: (): string => {
return EnvironmentConfig.getEnvVariable("SERVICE_ACCOUNT");
},
};

export default config;
1 change: 1 addition & 0 deletions front/temporal/upsert_queue/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ export async function upsertDocumentActivity(
});
}

// TODO(2024-10-02 flav) Removed once all the upsert tables have been processed from this queue.
export async function upsertTableActivity(
upsertQueueId: string,
enqueueTimestamp: number
Expand Down
47 changes: 1 addition & 46 deletions front/temporal/upsert_queue/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { getTemporalClient } from "@app/lib/temporal";
import logger from "@app/logger/logger";

import { QUEUE_NAME } from "./config";
import { upsertDocumentWorkflow, upsertTableWorkflow } from "./workflows";
import { upsertDocumentWorkflow } from "./workflows";

export async function launchUpsertDocumentWorkflow({
workspaceId,
Expand Down Expand Up @@ -51,48 +51,3 @@ export async function launchUpsertDocumentWorkflow({
return new Err(e as Error);
}
}

export async function launchUpsertTableWorkflow({
workspaceId,
dataSourceId,
upsertQueueId,
enqueueTimestamp,
}: {
workspaceId: string;
dataSourceId: string;
upsertQueueId: string;
enqueueTimestamp: number;
}): Promise<Result<string, Error>> {
const client = await getTemporalClient();

const workflowId = `upsert-queue-table-${workspaceId}-${dataSourceId}-${upsertQueueId}`;

try {
await client.workflow.start(upsertTableWorkflow, {
args: [upsertQueueId, enqueueTimestamp],
taskQueue: QUEUE_NAME,
workflowId: workflowId,
memo: {
workspaceId,
dataSourceId,
upsertQueueId,
},
});
logger.info(
{
workflowId,
},
"Started workflow."
);
return new Ok(workflowId);
} catch (e) {
logger.error(
{
workflowId,
error: e,
},
"Failed starting workflow."
);
return new Err(e as Error);
}
}
1 change: 1 addition & 0 deletions front/temporal/upsert_queue/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export async function upsertDocumentWorkflow(
await upsertDocumentActivity(upsertQueueId, enqueueTimestamp);
}

// TODO(2024-10-02 flav) Removed once all the upsert tables have been processed from this queue.
export async function upsertTableWorkflow(
upsertQueueId: string,
enqueueTimestamp: number
Expand Down
138 changes: 138 additions & 0 deletions front/temporal/upsert_tables/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { Storage } from "@google-cloud/storage";
import { isLeft } from "fp-ts/lib/Either";
import * as reporter from "io-ts-reporters";

import { upsertTableFromCsv } from "@app/lib/api/tables";
import { Authenticator } from "@app/lib/auth";
import { DataSourceResource } from "@app/lib/resources/data_source_resource";
import type { WorkflowError } from "@app/lib/temporal_monitoring";
import { EnqueueUpsertTable } from "@app/lib/upsert_queue";
import mainLogger from "@app/logger/logger";
import { statsDClient } from "@app/logger/withlogging";
import config from "@app/temporal/config";

export async function upsertTableActivity(
upsertQueueId: string,
enqueueTimestamp: number
) {
const storage = new Storage({ keyFilename: config.getServiceAccount() });
const bucket = storage.bucket(config.getUpsertQueueBucket());
const content = await bucket.file(`${upsertQueueId}.json`).download();

const upsertDocument = JSON.parse(content.toString());

const tableItemValidation = EnqueueUpsertTable.decode(upsertDocument);

if (isLeft(tableItemValidation)) {
const pathErrorTable = reporter.formatValidationErrors(
tableItemValidation.left
);
throw new Error(`Invalid upsertQueue table: ${pathErrorTable}`);
}

const upsertQueueItem = tableItemValidation.right;
const logger = mainLogger.child({
upsertQueueId,
workspaceId: upsertQueueItem.workspaceId,
dataSourceId: upsertQueueItem.dataSourceId,
tableId: upsertQueueItem.tableId,
});

const auth = await Authenticator.internalAdminForWorkspace(
upsertQueueItem.workspaceId
);

const owner = auth.workspace();
if (!owner) {
logger.error(
{
delaySinceEnqueueMs: Date.now() - enqueueTimestamp,
},
"[UpsertQueue] Giving up: Workspace not found"
);
return;
}

const dataSource = await DataSourceResource.fetchById(
auth,
upsertQueueItem.dataSourceId
);
if (!dataSource) {
// If the data source was not found, we simply give up and remove the item from the queue as it
// means that the data source was deleted.
logger.info(
{
delaySinceEnqueueMs: Date.now() - enqueueTimestamp,
},
"[UpsertQueue] Giving up: DataSource not found"
);
return;
}

const statsDTags = [
`data_source_name:${dataSource.name}`,
`workspace_id:${upsertQueueItem.workspaceId}`,
];

const upsertTimestamp = Date.now();

const tableRes = await upsertTableFromCsv({
auth,
dataSource,
tableName: upsertQueueItem.tableName,
tableDescription: upsertQueueItem.tableDescription,
tableId: upsertQueueItem.tableId,
tableTimestamp: upsertQueueItem.tableTimestamp ?? null,
tableTags: upsertQueueItem.tableTags || [],
tableParents: upsertQueueItem.tableParents || [],
csv: upsertQueueItem.csv,
truncate: upsertQueueItem.truncate,
useAppForHeaderDetection: upsertQueueItem.useAppForHeaderDetection ?? false,
});

if (tableRes.isErr()) {
logger.error(
{
error: tableRes.error,
latencyMs: Date.now() - upsertTimestamp,
delaySinceEnqueueMs: Date.now() - enqueueTimestamp,
csvSize: upsertQueueItem.csv?.length || 0,
},
"[UpsertQueue] Failed table upsert"
);
statsDClient.increment("upsert_queue_table_error.count", 1, statsDTags);
statsDClient.distribution(
"upsert_queue_upsert_table_error.duration.distribution",
Date.now() - upsertTimestamp,
[]
);

const error: WorkflowError = {
__is_dust_error: true,
message: `Upsert error: ${JSON.stringify(tableRes.error)}`,
type: "upsert_queue_upsert_table_error",
};

throw error;
}

logger.info(
{
latencyMs: Date.now() - upsertTimestamp,
delaySinceEnqueueMs: Date.now() - enqueueTimestamp,
csvSize: upsertQueueItem.csv?.length || 0,
},
"[UpsertQueue] Successful table upsert"
);
statsDClient.increment("upsert_queue_table_success.count", 1, statsDTags);
statsDClient.distribution(
"upsert_queue_upsert_table_success.duration.distribution",
Date.now() - upsertTimestamp,
[]
);
statsDClient.distribution(
"upsert_queue_table.duration.distribution",
Date.now() - enqueueTimestamp,
[]
);
}
Loading

0 comments on commit 77b8d46

Please sign in to comment.