Skip to content

Commit

Permalink
feat: implement pause/unpause (add a pause workflow)
Browse files Browse the repository at this point in the history
  • Loading branch information
aubin-tchoi committed Oct 30, 2024
1 parent daee8b2 commit 537061e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 5 deletions.
30 changes: 27 additions & 3 deletions connectors/src/connectors/zendesk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from "@connectors/connectors/zendesk/lib/ticket_permissions";
import { getZendeskAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token";
import { launchZendeskSyncWorkflow } from "@connectors/connectors/zendesk/temporal/client";
import { zendeskStopSyncWorkflow } from "@connectors/connectors/zendesk/temporal/workflows";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
Expand Down Expand Up @@ -102,7 +103,8 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}

async stop(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const result = await zendeskStopSyncWorkflow(this.connectorId);
return result.isErr() ? result : new Ok(undefined);
}

async resume(): Promise<Result<undefined, Error>> {
Expand Down Expand Up @@ -537,11 +539,33 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}

async pause(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const connectorId = this.connectorId;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}
await connector.markAsPaused();
const result = await this.stop();

return result.isErr() ? result : new Ok(undefined);
}

async unpause(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const connectorId = this.connectorId;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}
await connector.markAsUnpaused();

const brandIds = await ZendeskBrandResource.fetchAllBrandIds({
connectorId,
});
const result = await launchZendeskSyncWorkflow({ connectorId, brandIds });

return result.isErr() ? result : new Ok(undefined);
}

async garbageCollect(): Promise<Result<string, Error>> {
Expand Down
43 changes: 41 additions & 2 deletions connectors/src/connectors/zendesk/temporal/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { ModelId } from "@dust-tt/types";
import { assertNever } from "@dust-tt/types";
import type { ModelId, Result } from "@dust-tt/types";
import { assertNever, Err, Ok } from "@dust-tt/types";
import type { WorkflowHandle } from "@temporalio/client";
import { WorkflowNotFoundError } from "@temporalio/client";
import {
executeChild,
proxyActivities,
Expand All @@ -10,8 +12,12 @@ import {

import type * as activities from "@connectors/connectors/zendesk/temporal/activities";
import { syncZendeskTicketsActivity } from "@connectors/connectors/zendesk/temporal/activities";
import { getZendeskSyncWorkflowId } from "@connectors/connectors/zendesk/temporal/client";
import { INTERVAL_BETWEEN_SYNCS_MS } from "@connectors/connectors/zendesk/temporal/config";
import type { ZendeskUpdateSignal } from "@connectors/connectors/zendesk/temporal/signals";
import { getTemporalClient } from "@connectors/lib/temporal";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";

import { zendeskUpdatesSignal } from "./signals";

Expand Down Expand Up @@ -379,3 +385,36 @@ async function runZendeskBrandTicketsSyncActivities({
forceResync,
});
}

export async function zendeskStopSyncWorkflow(
connectorId: ModelId
): Promise<Result<void, Error>> {
const client = await getTemporalClient();
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error(
`[Zendesk] Connector not found, connectorId: ${connectorId}`
);
}

const workflowId = getZendeskSyncWorkflowId(connectorId);

try {
const handle: WorkflowHandle<typeof zendeskSyncWorkflow> =
client.workflow.getHandle(workflowId);
try {
await handle.terminate();
} catch (e) {
if (!(e instanceof WorkflowNotFoundError)) {
throw e;
}
}
return new Ok(undefined);
} catch (error) {
logger.error(
{ workflowId, error },
"[Zendesk] Failed to stop the sync workflow."
);
return new Err(error as Error);
}
}

0 comments on commit 537061e

Please sign in to comment.