Skip to content

Commit

Permalink
enh: move notion garbage collector to its own worker (#7373)
Browse files Browse the repository at this point in the history
Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Sep 13, 2024
1 parent 02e1e40 commit 07a85f3
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/deploy-connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/connectors-image:${{ steps.short_sha.outputs.short_sha }} connectors-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/connectors-image:${{ steps.short_sha.outputs.short_sha }} connectors-worker-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/connectors-image:${{ steps.short_sha.outputs.short_sha }} connectors-worker-notion-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/connectors-image:${{ steps.short_sha.outputs.short_sha }} connectors-worker-notion-gc-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/connectors-image:${{ steps.short_sha.outputs.short_sha }} connectors-worker-webcrawler-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/connectors-image:${{ steps.short_sha.outputs.short_sha }} connectors-worker-google-drive-deployment
Expand All @@ -63,6 +64,8 @@ jobs:
kubectl rollout status deployment/connectors-worker-deployment --timeout=10m
echo "Waiting for rollout to complete (notion worker)"
kubectl rollout status deployment/connectors-worker-notion-deployment --timeout=10m
echo "Waiting for rollout to complete (notion GC worker)"
kubectl rollout status deployment/connectors-worker-notion-gc-deployment --timeout=10m
echo "Waiting for rollout to complete (webcrawler worker)"
kubectl rollout status deployment/connectors-worker-webcrawler-deployment --timeout=10m
echo "Waiting for rollout to complete (google_drive worker)"
Expand Down
7 changes: 5 additions & 2 deletions connectors/src/connectors/notion/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import type {
} from "@temporalio/client";
import { WorkflowNotFoundError } from "@temporalio/client";

import { QUEUE_NAME } from "@connectors/connectors/notion/temporal/config";
import {
GARBAGE_COLLECT_QUEUE_NAME,
QUEUE_NAME,
} from "@connectors/connectors/notion/temporal/config";
import { notionSyncWorkflow } from "@connectors/connectors/notion/temporal/workflows";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { NotionConnectorState } from "@connectors/lib/models/notion";
Expand Down Expand Up @@ -109,7 +112,7 @@ export async function launchNotionGarbageCollectorWorkflow(
garbageCollectionMode: "always",
},
],
taskQueue: QUEUE_NAME,
taskQueue: GARBAGE_COLLECT_QUEUE_NAME,
workflowId: getNotionWorkflowId(connectorId, "always"),
searchAttributes: {
connectorId: [connectorId],
Expand Down
1 change: 1 addition & 0 deletions connectors/src/connectors/notion/temporal/config.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export const WORKFLOW_VERSION = 39;
export const QUEUE_NAME = `notion-queue-v${WORKFLOW_VERSION}`;
export const GARBAGE_COLLECT_QUEUE_NAME = `notion-gc-queue-v${WORKFLOW_VERSION}`;
29 changes: 28 additions & 1 deletion connectors/src/connectors/notion/temporal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import { Worker } from "@temporalio/worker";

import * as activities from "@connectors/connectors/notion/temporal/activities";
import { NotionCastKnownErrorsInterceptor } from "@connectors/connectors/notion/temporal/cast_known_errors";
import { QUEUE_NAME } from "@connectors/connectors/notion/temporal/config";
import {
GARBAGE_COLLECT_QUEUE_NAME,
QUEUE_NAME,
} from "@connectors/connectors/notion/temporal/config";
import { getTemporalWorkerConnection } from "@connectors/lib/temporal";
import { ActivityInboundLogInterceptor } from "@connectors/lib/temporal_monitoring";
import logger from "@connectors/logger/logger";
Expand Down Expand Up @@ -31,3 +34,27 @@ export async function runNotionWorker() {

await worker.run();
}

export async function runNotionGarbageCollectWorker() {
const { connection, namespace } = await getTemporalWorkerConnection();
const worker = await Worker.create({
workflowsPath: require.resolve("./workflows"),
activities,
taskQueue: GARBAGE_COLLECT_QUEUE_NAME,
connection,
reuseV8Context: true,
namespace,
maxConcurrentActivityTaskExecutions: 8,
maxCachedWorkflows: 200,
interceptors: {
activityInbound: [
(ctx: Context) => {
return new ActivityInboundLogInterceptor(ctx, logger);
},
() => new NotionCastKnownErrorsInterceptor(),
],
},
});

await worker.run();
}
11 changes: 9 additions & 2 deletions connectors/src/start_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,27 @@ import { runWebCrawlerWorker } from "@connectors/connectors/webcrawler/temporal/
import { runGithubWorker } from "./connectors/github/temporal/worker";
import { runGoogleWorkers } from "./connectors/google_drive/temporal/worker";
import { runIntercomWorker } from "./connectors/intercom/temporal/worker";
import { runNotionWorker } from "./connectors/notion/temporal/worker";
import {
runNotionGarbageCollectWorker,
runNotionWorker,
} from "./connectors/notion/temporal/worker";
import { runSlackWorker } from "./connectors/slack/temporal/worker";
import { errorFromAny } from "./lib/error";
import logger from "./logger/logger";

setupGlobalErrorHandler(logger);

const workerFunctions: Record<ConnectorProvider, () => Promise<void>> = {
const workerFunctions: Record<
ConnectorProvider | "notion_garbage_collector",
() => Promise<void>
> = {
confluence: runConfluenceWorker,
github: runGithubWorker,
google_drive: runGoogleWorkers,
intercom: runIntercomWorker,
microsoft: runMicrosoftWorker,
notion: runNotionWorker,
notion_garbage_collector: runNotionGarbageCollectWorker,
slack: runSlackWorker,
webcrawler: runWebCrawlerWorker,
};
Expand Down
1 change: 1 addition & 0 deletions k8s/apply_infra.sh
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ apply_deployment front-qa-deployment
apply_deployment connectors-deployment
apply_deployment connectors-worker-deployment
apply_deployment connectors-worker-notion-deployment
apply_deployment connectors-worker-notion-gc-deployment
apply_deployment connectors-worker-webcrawler-deployment
apply_deployment connectors-worker-google-drive-deployment
apply_deployment metabase-deployment
Expand Down
2 changes: 1 addition & 1 deletion k8s/deployments/connectors-worker-notion-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Deployment
metadata:
name: connectors-worker-notion-deployment
spec:
replicas: 6
replicas: 3
selector:
matchLabels:
app: connectors-worker
Expand Down
60 changes: 60 additions & 0 deletions k8s/deployments/connectors-worker-notion-gc-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: connectors-worker-notion-gc-deployment
spec:
replicas: 3
selector:
matchLabels:
app: connectors-worker
worker: notion-gc
template:
metadata:
labels:
app: connectors-worker
name: connectors-worker-pod
worker: notion-gc
admission.datadoghq.com/enabled: "true"
annotations:
ad.datadoghq.com/web.logs: '[{"source": "connectors-worker","service": "connectors-worker","tags": ["env:prod"]}]'
spec:
containers:
- name: web
image: gcr.io/or1g1n-186209/connectors-image:latest
command: ["npm", "run", "start:worker"]
args: ["--", "--workers", "notion_garbage_collector"]
imagePullPolicy: Always
envFrom:
- configMapRef:
name: connectors-worker-specific-config
- secretRef:
name: connectors-secrets
env:
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP

volumeMounts:
- name: cert-volume
mountPath: /etc/certs
- name: private-key-volume
mountPath: /etc/private-keys

resources:
requests:
cpu: 2000m
memory: 4Gi

limits:
cpu: 2000m
memory: 4Gi

volumes:
- name: cert-volume
secret:
secretName: temporal-cert

- name: private-key-volume
secret:
secretName: github-app-private-key

0 comments on commit 07a85f3

Please sign in to comment.