From 537061eb4230fe362345ed534b24b2cf8eecf155 Mon Sep 17 00:00:00 2001 From: Aubin Date: Wed, 30 Oct 2024 16:36:51 +0100 Subject: [PATCH] feat: implement pause/unpause (add a pause workflow) --- connectors/src/connectors/zendesk/index.ts | 30 +++++++++++-- .../connectors/zendesk/temporal/workflows.ts | 43 ++++++++++++++++++- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 4cb5d0c7f1a7..af51965e31f6 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -36,6 +36,7 @@ import { } from "@connectors/connectors/zendesk/lib/ticket_permissions"; import { getZendeskAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; import { launchZendeskSyncWorkflow } from "@connectors/connectors/zendesk/temporal/client"; +import { zendeskStopSyncWorkflow } from "@connectors/connectors/zendesk/temporal/workflows"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; @@ -102,7 +103,8 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async stop(): Promise> { - throw new Error("Method not implemented."); + const result = await zendeskStopSyncWorkflow(this.connectorId); + return result.isErr() ? result : new Ok(undefined); } async resume(): Promise> { @@ -537,11 +539,33 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async pause(): Promise> { - throw new Error("Method not implemented."); + const connectorId = this.connectorId; + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "[Zendesk] Connector not found."); + return new Err(new Error("Connector not found")); + } + await connector.markAsPaused(); + const result = await this.stop(); + + return result.isErr() ? result : new Ok(undefined); } async unpause(): Promise> { - throw new Error("Method not implemented."); + const connectorId = this.connectorId; + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "[Zendesk] Connector not found."); + return new Err(new Error("Connector not found")); + } + await connector.markAsUnpaused(); + + const brandIds = await ZendeskBrandResource.fetchAllBrandIds({ + connectorId, + }); + const result = await launchZendeskSyncWorkflow({ connectorId, brandIds }); + + return result.isErr() ? result : new Ok(undefined); } async garbageCollect(): Promise> { diff --git a/connectors/src/connectors/zendesk/temporal/workflows.ts b/connectors/src/connectors/zendesk/temporal/workflows.ts index af619ecaaed0..7998a0d71462 100644 --- a/connectors/src/connectors/zendesk/temporal/workflows.ts +++ b/connectors/src/connectors/zendesk/temporal/workflows.ts @@ -1,5 +1,7 @@ -import type { ModelId } from "@dust-tt/types"; -import { assertNever } from "@dust-tt/types"; +import type { ModelId, Result } from "@dust-tt/types"; +import { assertNever, Err, Ok } from "@dust-tt/types"; +import type { WorkflowHandle } from "@temporalio/client"; +import { WorkflowNotFoundError } from "@temporalio/client"; import { executeChild, proxyActivities, @@ -10,8 +12,12 @@ import { import type * as activities from "@connectors/connectors/zendesk/temporal/activities"; import { syncZendeskTicketsActivity } from "@connectors/connectors/zendesk/temporal/activities"; +import { getZendeskSyncWorkflowId } from "@connectors/connectors/zendesk/temporal/client"; import { INTERVAL_BETWEEN_SYNCS_MS } from "@connectors/connectors/zendesk/temporal/config"; import type { ZendeskUpdateSignal } from "@connectors/connectors/zendesk/temporal/signals"; +import { getTemporalClient } from "@connectors/lib/temporal"; +import logger from "@connectors/logger/logger"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; import { zendeskUpdatesSignal } from "./signals"; @@ -379,3 +385,36 @@ async function runZendeskBrandTicketsSyncActivities({ forceResync, }); } + +export async function zendeskStopSyncWorkflow( + connectorId: ModelId +): Promise> { + const client = await getTemporalClient(); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error( + `[Zendesk] Connector not found, connectorId: ${connectorId}` + ); + } + + const workflowId = getZendeskSyncWorkflowId(connectorId); + + try { + const handle: WorkflowHandle = + client.workflow.getHandle(workflowId); + try { + await handle.terminate(); + } catch (e) { + if (!(e instanceof WorkflowNotFoundError)) { + throw e; + } + } + return new Ok(undefined); + } catch (error) { + logger.error( + { workflowId, error }, + "[Zendesk] Failed to stop the sync workflow." + ); + return new Err(error as Error); + } +}