From bd639b18180da3438edcc22af2e19bcaea732048 Mon Sep 17 00:00:00 2001 From: Aubin Date: Wed, 30 Oct 2024 16:17:05 +0100 Subject: [PATCH 01/16] feat: launch sync workflows on creation, permission setting and resume --- connectors/src/connectors/zendesk/index.ts | 48 +++++++++++++++++++--- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 5f3d88685034..206acfbf9518 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -5,10 +5,9 @@ import type { ContentNodesViewType, Result, } from "@dust-tt/types"; -import { assertNever } from "@dust-tt/types"; -import { Err } from "@dust-tt/types"; -import { Ok } from "@dust-tt/types"; +import { assertNever, Err, Ok } from "@dust-tt/types"; +import { launchIntercomSyncWorkflow } from "@connectors/connectors/intercom/temporal/client"; import type { ConnectorManagerError } from "@connectors/connectors/interface"; import { BaseConnectorManager } from "@connectors/connectors/interface"; import { @@ -37,13 +36,16 @@ import { revokeSyncZendeskTickets, } from "@connectors/connectors/zendesk/lib/ticket_permissions"; import { getZendeskAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; +import { launchZendeskSyncWorkflow } from "@connectors/connectors/zendesk/temporal/client"; +import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; +import { IntercomTeam } from "@connectors/lib/models/intercom"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; -import { ZendeskConfigurationResource } from "@connectors/resources/zendesk_resources"; import { ZendeskArticleResource, ZendeskBrandResource, ZendeskCategoryResource, + ZendeskConfigurationResource, ZendeskTicketResource, } from "@connectors/resources/zendesk_resources"; import type { DataSourceConfig } from "@connectors/types/data_source_config"; @@ -69,6 +71,22 @@ export class ZendeskConnectorManager extends BaseConnectorManager { { subdomain: "d3v-dust", conversationsSlidingWindow: 90 } ); + const workflowStartResult = await launchZendeskSyncWorkflow({ + connectorId: connector.id, + }); + + if (workflowStartResult.isErr()) { + await connector.delete(); + logger.error( + { + workspaceId: dataSourceConfig.workspaceId, + error: workflowStartResult.error, + }, + "[Zendesk] Error creating connector, could not launch sync workflow." + ); + throw workflowStartResult.error; + } + return new Ok(connector.id.toString()); } @@ -90,7 +108,27 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async resume(): Promise> { - throw new Error("Method not implemented."); + const connectorId = this.connectorId; + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "[Zendesk] Connector not found."); + return new Err(new Error("Connector not found")); + } + + const dataSourceConfig = dataSourceConfigFromConnector(connector); + try { + await launchZendeskSyncWorkflow({ connectorId }); + } catch (e) { + logger.error( + { + workspaceId: dataSourceConfig.workspaceId, + dataSourceId: dataSourceConfig.dataSourceId, + error: e, + }, + "[Zendesk] Error resuming the sync workflow." + ); + } + return new Ok(undefined); } async sync(): Promise> { From b3294f4cc0708ee459269c4db566d61e6a68b9d6 Mon Sep 17 00:00:00 2001 From: Aubin Date: Wed, 30 Oct 2024 16:18:55 +0100 Subject: [PATCH 02/16] feat: implement the sync --- connectors/src/connectors/zendesk/index.ts | 41 +++++++++++++++++-- connectors/src/resources/zendesk_resources.ts | 9 ++++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 206acfbf9518..15d0bc04dbc0 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -7,7 +7,6 @@ import type { } from "@dust-tt/types"; import { assertNever, Err, Ok } from "@dust-tt/types"; -import { launchIntercomSyncWorkflow } from "@connectors/connectors/intercom/temporal/client"; import type { ConnectorManagerError } from "@connectors/connectors/interface"; import { BaseConnectorManager } from "@connectors/connectors/interface"; import { @@ -38,7 +37,6 @@ import { import { getZendeskAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; import { launchZendeskSyncWorkflow } from "@connectors/connectors/zendesk/temporal/client"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; -import { IntercomTeam } from "@connectors/lib/models/intercom"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; import { @@ -132,7 +130,26 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async sync(): Promise> { - throw new Error("Method not implemented."); + const connectorId = this.connectorId; + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "[Zendesk] Connector not found."); + return new Err(new Error("Connector not found")); + } + + const brands = await ZendeskBrandResource.fetchByConnectorId({ + connectorId, + }); + + const sendSignalToWorkflowResult = await launchZendeskSyncWorkflow({ + connectorId, + brandIds: brands.map((brand) => brand.brandId), + forceResync: true, + }); + if (sendSignalToWorkflowResult.isErr()) { + return new Err(sendSignalToWorkflowResult.error); + } + return new Ok(connector.id.toString()); } async retrievePermissions({ @@ -314,7 +331,23 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } } - /// Launch a sync workflow here + if ( + toBeSignaledBrandIds.size > 0 || + toBeSignaledTicketsIds.size > 0 || + toBeSignaledHelpCenterIds.size > 0 || + toBeSignaledCategoryIds.size > 0 + ) { + const sendSignalToWorkflowResult = await launchZendeskSyncWorkflow({ + connectorId, + brandIds: [...toBeSignaledBrandIds], + ticketsBrandIds: [...toBeSignaledTicketsIds], + helpCenterBrandIds: [...toBeSignaledHelpCenterIds], + categoryIds: [...toBeSignaledCategoryIds], + }); + if (sendSignalToWorkflowResult.isErr()) { + return new Err(sendSignalToWorkflowResult.error); + } + } return new Ok(undefined); } diff --git a/connectors/src/resources/zendesk_resources.ts b/connectors/src/resources/zendesk_resources.ts index 923fab0c04b7..721d4a895f9c 100644 --- a/connectors/src/resources/zendesk_resources.ts +++ b/connectors/src/resources/zendesk_resources.ts @@ -215,6 +215,15 @@ export class ZendeskBrandResource extends BaseResource { return brands.map((brand) => new this(this.model, brand.get())); } + static async fetchByConnectorId({ + connectorId, + }: { + connectorId: number; + }): Promise { + const brands = await ZendeskBrand.findAll({ where: { connectorId } }); + return brands.map((brand) => new this(this.model, brand.get())); + } + static async fetchAllWithHelpCenter({ connectorId, }: { From a0d7c88770030ce0abf461eeec75985fd301b49e Mon Sep 17 00:00:00 2001 From: Aubin Date: Wed, 30 Oct 2024 16:29:06 +0100 Subject: [PATCH 03/16] perf: optimize the fetching of brandIds by only selecting the one column --- connectors/src/connectors/zendesk/index.ts | 13 +++++-------- connectors/src/resources/zendesk_resources.ts | 11 +++++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 15d0bc04dbc0..4cb5d0c7f1a7 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -137,19 +137,16 @@ export class ZendeskConnectorManager extends BaseConnectorManager { return new Err(new Error("Connector not found")); } - const brands = await ZendeskBrandResource.fetchByConnectorId({ + const brandIds = await ZendeskBrandResource.fetchAllBrandIds({ connectorId, }); - - const sendSignalToWorkflowResult = await launchZendeskSyncWorkflow({ + const result = await launchZendeskSyncWorkflow({ connectorId, - brandIds: brands.map((brand) => brand.brandId), + brandIds, forceResync: true, }); - if (sendSignalToWorkflowResult.isErr()) { - return new Err(sendSignalToWorkflowResult.error); - } - return new Ok(connector.id.toString()); + + return result.isErr() ? result : new Ok(connector.id.toString()); } async retrievePermissions({ diff --git a/connectors/src/resources/zendesk_resources.ts b/connectors/src/resources/zendesk_resources.ts index 721d4a895f9c..423285157983 100644 --- a/connectors/src/resources/zendesk_resources.ts +++ b/connectors/src/resources/zendesk_resources.ts @@ -215,13 +215,16 @@ export class ZendeskBrandResource extends BaseResource { return brands.map((brand) => new this(this.model, brand.get())); } - static async fetchByConnectorId({ + static async fetchAllBrandIds({ connectorId, }: { connectorId: number; - }): Promise { - const brands = await ZendeskBrand.findAll({ where: { connectorId } }); - return brands.map((brand) => new this(this.model, brand.get())); + }): Promise { + const brands = await ZendeskBrand.findAll({ + where: { connectorId }, + attributes: ["brandId"], + }); + return brands.map((brand) => brand.brandId); } static async fetchAllWithHelpCenter({ From 893373b954fed8439f66406a37bcba07115909ff Mon Sep 17 00:00:00 2001 From: Aubin Date: Wed, 30 Oct 2024 16:36:51 +0100 Subject: [PATCH 04/16] feat: implement pause/unpause (add a pause workflow) --- connectors/src/connectors/zendesk/index.ts | 30 +++++++++++-- .../connectors/zendesk/temporal/workflows.ts | 42 ++++++++++++++++++- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 4cb5d0c7f1a7..af51965e31f6 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -36,6 +36,7 @@ import { } from "@connectors/connectors/zendesk/lib/ticket_permissions"; import { getZendeskAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; import { launchZendeskSyncWorkflow } from "@connectors/connectors/zendesk/temporal/client"; +import { zendeskStopSyncWorkflow } from "@connectors/connectors/zendesk/temporal/workflows"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; @@ -102,7 +103,8 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async stop(): Promise> { - throw new Error("Method not implemented."); + const result = await zendeskStopSyncWorkflow(this.connectorId); + return result.isErr() ? result : new Ok(undefined); } async resume(): Promise> { @@ -537,11 +539,33 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async pause(): Promise> { - throw new Error("Method not implemented."); + const connectorId = this.connectorId; + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "[Zendesk] Connector not found."); + return new Err(new Error("Connector not found")); + } + await connector.markAsPaused(); + const result = await this.stop(); + + return result.isErr() ? result : new Ok(undefined); } async unpause(): Promise> { - throw new Error("Method not implemented."); + const connectorId = this.connectorId; + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + logger.error({ connectorId }, "[Zendesk] Connector not found."); + return new Err(new Error("Connector not found")); + } + await connector.markAsUnpaused(); + + const brandIds = await ZendeskBrandResource.fetchAllBrandIds({ + connectorId, + }); + const result = await launchZendeskSyncWorkflow({ connectorId, brandIds }); + + return result.isErr() ? result : new Ok(undefined); } async garbageCollect(): Promise> { diff --git a/connectors/src/connectors/zendesk/temporal/workflows.ts b/connectors/src/connectors/zendesk/temporal/workflows.ts index 55a24b7ac90c..22b7b2e055ed 100644 --- a/connectors/src/connectors/zendesk/temporal/workflows.ts +++ b/connectors/src/connectors/zendesk/temporal/workflows.ts @@ -1,4 +1,7 @@ -import type { ModelId } from "@dust-tt/types"; +import type { ModelId, Result } from "@dust-tt/types"; +import { Err, Ok } from "@dust-tt/types"; +import type { WorkflowHandle } from "@temporalio/client"; +import { WorkflowNotFoundError } from "@temporalio/client"; import { assertNever } from "@temporalio/common/lib/type-helpers"; import { executeChild, @@ -8,8 +11,12 @@ import { } from "@temporalio/workflow"; import type * as activities from "@connectors/connectors/zendesk/temporal/activities"; +import { getZendeskSyncWorkflowId } from "@connectors/connectors/zendesk/temporal/client"; import type { ZendeskUpdateSignal } from "@connectors/connectors/zendesk/temporal/signals"; import { zendeskUpdatesSignal } from "@connectors/connectors/zendesk/temporal/signals"; +import { getTemporalClient } from "@connectors/lib/temporal"; +import logger from "@connectors/logger/logger"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; const { getZendeskCategoriesActivity, @@ -377,3 +384,36 @@ async function runZendeskBrandTicketsSyncActivities({ forceResync, }); } + +export async function zendeskStopSyncWorkflow( + connectorId: ModelId +): Promise> { + const client = await getTemporalClient(); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error( + `[Zendesk] Connector not found, connectorId: ${connectorId}` + ); + } + + const workflowId = getZendeskSyncWorkflowId(connectorId); + + try { + const handle: WorkflowHandle = + client.workflow.getHandle(workflowId); + try { + await handle.terminate(); + } catch (e) { + if (!(e instanceof WorkflowNotFoundError)) { + throw e; + } + } + return new Ok(undefined); + } catch (error) { + logger.error( + { workflowId, error }, + "[Zendesk] Failed to stop the sync workflow." + ); + return new Err(error as Error); + } +} From 99f86bd969b4aa41f2f9ac7098943660e2f4226e Mon Sep 17 00:00:00 2001 From: Aubin Date: Wed, 30 Oct 2024 19:31:41 +0100 Subject: [PATCH 05/16] refactor: remove the duplicate function stopZendeskSyncWorkflow --- connectors/src/connectors/zendesk/index.ts | 8 ++-- .../connectors/zendesk/temporal/workflows.ts | 42 +------------------ 2 files changed, 6 insertions(+), 44 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index af51965e31f6..a6a3e009662d 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -35,8 +35,10 @@ import { revokeSyncZendeskTickets, } from "@connectors/connectors/zendesk/lib/ticket_permissions"; import { getZendeskAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; -import { launchZendeskSyncWorkflow } from "@connectors/connectors/zendesk/temporal/client"; -import { zendeskStopSyncWorkflow } from "@connectors/connectors/zendesk/temporal/workflows"; +import { + launchZendeskSyncWorkflow, + stopZendeskSyncWorkflow, +} from "@connectors/connectors/zendesk/temporal/client"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import logger from "@connectors/logger/logger"; import { ConnectorResource } from "@connectors/resources/connector_resource"; @@ -103,7 +105,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async stop(): Promise> { - const result = await zendeskStopSyncWorkflow(this.connectorId); + const result = await stopZendeskSyncWorkflow(this.connectorId); return result.isErr() ? result : new Ok(undefined); } diff --git a/connectors/src/connectors/zendesk/temporal/workflows.ts b/connectors/src/connectors/zendesk/temporal/workflows.ts index 22b7b2e055ed..55a24b7ac90c 100644 --- a/connectors/src/connectors/zendesk/temporal/workflows.ts +++ b/connectors/src/connectors/zendesk/temporal/workflows.ts @@ -1,7 +1,4 @@ -import type { ModelId, Result } from "@dust-tt/types"; -import { Err, Ok } from "@dust-tt/types"; -import type { WorkflowHandle } from "@temporalio/client"; -import { WorkflowNotFoundError } from "@temporalio/client"; +import type { ModelId } from "@dust-tt/types"; import { assertNever } from "@temporalio/common/lib/type-helpers"; import { executeChild, @@ -11,12 +8,8 @@ import { } from "@temporalio/workflow"; import type * as activities from "@connectors/connectors/zendesk/temporal/activities"; -import { getZendeskSyncWorkflowId } from "@connectors/connectors/zendesk/temporal/client"; import type { ZendeskUpdateSignal } from "@connectors/connectors/zendesk/temporal/signals"; import { zendeskUpdatesSignal } from "@connectors/connectors/zendesk/temporal/signals"; -import { getTemporalClient } from "@connectors/lib/temporal"; -import logger from "@connectors/logger/logger"; -import { ConnectorResource } from "@connectors/resources/connector_resource"; const { getZendeskCategoriesActivity, @@ -384,36 +377,3 @@ async function runZendeskBrandTicketsSyncActivities({ forceResync, }); } - -export async function zendeskStopSyncWorkflow( - connectorId: ModelId -): Promise> { - const client = await getTemporalClient(); - const connector = await ConnectorResource.fetchById(connectorId); - if (!connector) { - throw new Error( - `[Zendesk] Connector not found, connectorId: ${connectorId}` - ); - } - - const workflowId = getZendeskSyncWorkflowId(connectorId); - - try { - const handle: WorkflowHandle = - client.workflow.getHandle(workflowId); - try { - await handle.terminate(); - } catch (e) { - if (!(e instanceof WorkflowNotFoundError)) { - throw e; - } - } - return new Ok(undefined); - } catch (error) { - logger.error( - { workflowId, error }, - "[Zendesk] Failed to stop the sync workflow." - ); - return new Err(error as Error); - } -} From 16c74ac1d83cd9c671bc28cac958fb74db4ee9eb Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 10:38:59 +0100 Subject: [PATCH 06/16] fix: fix ZendeskConfigurationResource fetching --- connectors/src/connectors/zendesk/index.ts | 5 ++--- connectors/src/connectors/zendesk/temporal/activities.ts | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index a6a3e009662d..ca65a7500d16 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -202,9 +202,8 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } const connectionId = connector.connectionId; - const zendeskConfiguration = await ZendeskConfigurationResource.fetchById( - this.connectorId - ); + const zendeskConfiguration = + await ZendeskConfigurationResource.fetchByConnectorId(connectorId); if (!zendeskConfiguration) { logger.error( { connectorId }, diff --git a/connectors/src/connectors/zendesk/temporal/activities.ts b/connectors/src/connectors/zendesk/temporal/activities.ts index bf9b2f1c9df3..ed7793b4f896 100644 --- a/connectors/src/connectors/zendesk/temporal/activities.ts +++ b/connectors/src/connectors/zendesk/temporal/activities.ts @@ -35,7 +35,7 @@ async function _getZendeskConnectorOrRaise(connectorId: ModelId) { async function _getZendeskConfigurationOrRaise(connectorId: ModelId) { const configuration = - await ZendeskConfigurationResource.fetchById(connectorId); + await ZendeskConfigurationResource.fetchByConnectorId(connectorId); if (!configuration) { throw new Error("[Zendesk] Configuration not found."); } From 3cadb70bd3a5bcd24dee7d4f0ae2fd7f466fa973 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 11:08:24 +0100 Subject: [PATCH 07/16] fix incorrect error message --- connectors/src/connectors/zendesk/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index ca65a7500d16..ac3949c1a556 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -83,7 +83,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { workspaceId: dataSourceConfig.workspaceId, error: workflowStartResult.error, }, - "[Zendesk] Error creating connector, could not launch sync workflow." + "[Zendesk] Error launching the sync workflow." ); throw workflowStartResult.error; } From 406bf632ff2d04a7b7a7f391ae410a39965018f6 Mon Sep 17 00:00:00 2001 From: Aubin <60398825+aubin-tchoi@users.noreply.github.com> Date: Thu, 31 Oct 2024 16:21:31 +0100 Subject: [PATCH 08/16] refactor Co-authored-by: Flavien David --- connectors/src/connectors/zendesk/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index ac3949c1a556..83fe1b13ee07 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -110,7 +110,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async resume(): Promise> { - const connectorId = this.connectorId; + const {connectorId} = this; const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { logger.error({ connectorId }, "[Zendesk] Connector not found."); From 61107d619c6e266b9135d876789b66fcd128ba4c Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 16:32:10 +0100 Subject: [PATCH 09/16] pass complete type instead of only the connector ID in launchZendeskSyncWorkflow --- connectors/src/connectors/zendesk/index.ts | 10 +++++----- .../src/connectors/zendesk/temporal/client.ts | 16 +++++----------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index ac3949c1a556..97b30d4b275b 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -73,7 +73,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { ); const workflowStartResult = await launchZendeskSyncWorkflow({ - connectorId: connector.id, + connector, }); if (workflowStartResult.isErr()) { @@ -119,7 +119,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { const dataSourceConfig = dataSourceConfigFromConnector(connector); try { - await launchZendeskSyncWorkflow({ connectorId }); + await launchZendeskSyncWorkflow({ connector }); } catch (e) { logger.error( { @@ -145,7 +145,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { connectorId, }); const result = await launchZendeskSyncWorkflow({ - connectorId, + connector, brandIds, forceResync: true, }); @@ -338,7 +338,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { toBeSignaledCategoryIds.size > 0 ) { const sendSignalToWorkflowResult = await launchZendeskSyncWorkflow({ - connectorId, + connector, brandIds: [...toBeSignaledBrandIds], ticketsBrandIds: [...toBeSignaledTicketsIds], helpCenterBrandIds: [...toBeSignaledHelpCenterIds], @@ -564,7 +564,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { const brandIds = await ZendeskBrandResource.fetchAllBrandIds({ connectorId, }); - const result = await launchZendeskSyncWorkflow({ connectorId, brandIds }); + const result = await launchZendeskSyncWorkflow({ connector, brandIds }); return result.isErr() ? result : new Ok(undefined); } diff --git a/connectors/src/connectors/zendesk/temporal/client.ts b/connectors/src/connectors/zendesk/temporal/client.ts index 38c9be4be234..d45273a317ec 100644 --- a/connectors/src/connectors/zendesk/temporal/client.ts +++ b/connectors/src/connectors/zendesk/temporal/client.ts @@ -16,7 +16,7 @@ export function getZendeskSyncWorkflowId(connectorId: ModelId): string { } export async function launchZendeskSyncWorkflow({ - connectorId, + connector, startFromTs = null, brandIds = [], ticketsBrandIds = [], @@ -24,7 +24,7 @@ export async function launchZendeskSyncWorkflow({ categoryIds = [], forceResync = false, }: { - connectorId: ModelId; + connector: ConnectorResource; startFromTs?: number | null; brandIds?: number[]; ticketsBrandIds?: number[]; @@ -37,12 +37,6 @@ export async function launchZendeskSyncWorkflow({ } const client = await getTemporalClient(); - const connector = await ConnectorResource.fetchById(connectorId); - if (!connector) { - throw new Error( - `[Zendesk] Connector not found, connectorId: ${connectorId}` - ); - } const signals: ZendeskUpdateSignal[] = [ ...brandIds.map( @@ -75,16 +69,16 @@ export async function launchZendeskSyncWorkflow({ ), ]; - const workflowId = getZendeskSyncWorkflowId(connectorId); + const workflowId = getZendeskSyncWorkflowId(connector.id); try { await client.workflow.signalWithStart(zendeskSyncWorkflow, { args: [{ connectorId: connector.id }], taskQueue: QUEUE_NAME, workflowId, - searchAttributes: { connectorId: [connectorId] }, + searchAttributes: { connectorId: [connector.id] }, signal: zendeskUpdatesSignal, signalArgs: [signals], - memo: { connectorId }, + memo: { connectorId: connector.id }, cronSchedule: "*/5 * * * *", // Every 5 minutes. }); } catch (err) { From 94d042f524a27e4f486024933f0883f6411cfe53 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 16:34:05 +0100 Subject: [PATCH 10/16] fix return type of stopZendeskSyncWorkflow and cleanup stop --- connectors/src/connectors/zendesk/index.ts | 5 ++--- connectors/src/connectors/zendesk/temporal/client.ts | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 096c592f84e0..cd09698c0fe4 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -105,12 +105,11 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async stop(): Promise> { - const result = await stopZendeskSyncWorkflow(this.connectorId); - return result.isErr() ? result : new Ok(undefined); + return stopZendeskSyncWorkflow(this.connectorId); } async resume(): Promise> { - const {connectorId} = this; + const { connectorId } = this; const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { logger.error({ connectorId }, "[Zendesk] Connector not found."); diff --git a/connectors/src/connectors/zendesk/temporal/client.ts b/connectors/src/connectors/zendesk/temporal/client.ts index d45273a317ec..a5dfe344bf8e 100644 --- a/connectors/src/connectors/zendesk/temporal/client.ts +++ b/connectors/src/connectors/zendesk/temporal/client.ts @@ -90,7 +90,7 @@ export async function launchZendeskSyncWorkflow({ export async function stopZendeskSyncWorkflow( connectorId: ModelId -): Promise> { +): Promise> { const client = await getTemporalClient(); const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { From 69fa4e017631489f07c14d392b09a7e42bca6fcc Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 16:35:04 +0100 Subject: [PATCH 11/16] cleanup const connectorId = this.connectorId; --- connectors/src/connectors/zendesk/index.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index cd09698c0fe4..36a623de1bec 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -133,7 +133,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async sync(): Promise> { - const connectorId = this.connectorId; + const { connectorId } = this; const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { logger.error({ connectorId }, "[Zendesk] Connector not found."); @@ -160,7 +160,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { filterPermission: ConnectorPermission | null; viewType: ContentNodesViewType; }): Promise> { - const connectorId = this.connectorId; + const { connectorId } = this; const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { logger.error({ connectorId }, "[Zendesk] Connector not found."); @@ -192,7 +192,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { }: { permissions: Record; }): Promise> { - const connectorId = this.connectorId; + const { connectorId } = this; const connector = await ConnectorResource.fetchById(this.connectorId); if (!connector) { @@ -397,7 +397,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } }); - const connectorId = this.connectorId; + const { connectorId } = this; const allBrandIds = [ ...new Set([...brandIds, ...brandTicketsIds, ...brandHelpCenterIds]), @@ -441,7 +441,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { internalId: string; memoizationKey?: string; }): Promise> { - const connectorId = this.connectorId; + const { connectorId } = this; const { type, objectId } = getIdFromInternalId(connectorId, internalId); switch (type) { @@ -539,7 +539,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async pause(): Promise> { - const connectorId = this.connectorId; + const { connectorId } = this; const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { logger.error({ connectorId }, "[Zendesk] Connector not found."); @@ -552,7 +552,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } async unpause(): Promise> { - const connectorId = this.connectorId; + const { connectorId } = this; const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { logger.error({ connectorId }, "[Zendesk] Connector not found."); From ac9f73dbf8436be26520040c906cb9378ff90639 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 16:36:57 +0100 Subject: [PATCH 12/16] correctly return errors in resume --- connectors/src/connectors/zendesk/index.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 36a623de1bec..6f09a317f5eb 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -118,7 +118,10 @@ export class ZendeskConnectorManager extends BaseConnectorManager { const dataSourceConfig = dataSourceConfigFromConnector(connector); try { - await launchZendeskSyncWorkflow({ connector }); + const result = await launchZendeskSyncWorkflow({ connector }); + if (result.isErr()) { + return result; + } } catch (e) { logger.error( { @@ -128,6 +131,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { }, "[Zendesk] Error resuming the sync workflow." ); + return new Err(e as Error); } return new Ok(undefined); } From aee585c3b7a3c9b08daf9769b63daf47afcb6cd9 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 17:10:03 +0100 Subject: [PATCH 13/16] get rid of an unused try catch --- connectors/src/connectors/zendesk/index.ts | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 6f09a317f5eb..55b2479da809 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -117,21 +117,17 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } const dataSourceConfig = dataSourceConfigFromConnector(connector); - try { - const result = await launchZendeskSyncWorkflow({ connector }); - if (result.isErr()) { - return result; - } - } catch (e) { + const result = await launchZendeskSyncWorkflow({ connector }); + if (result.isErr()) { logger.error( { workspaceId: dataSourceConfig.workspaceId, dataSourceId: dataSourceConfig.dataSourceId, - error: e, + error: result.error, }, "[Zendesk] Error resuming the sync workflow." ); - return new Err(e as Error); + return result; } return new Ok(undefined); } From a897686649216b3dcf70fd4ede1535f191123563 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 17:10:25 +0100 Subject: [PATCH 14/16] cleanup --- connectors/src/connectors/zendesk/index.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 55b2479da809..1bf718c115ff 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -546,9 +546,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { return new Err(new Error("Connector not found")); } await connector.markAsPaused(); - const result = await this.stop(); - - return result.isErr() ? result : new Ok(undefined); + return this.stop(); } async unpause(): Promise> { From ca0e4398a879d975c92993cea012933cef6a0f05 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 17:17:21 +0100 Subject: [PATCH 15/16] cleanup --- connectors/src/connectors/zendesk/index.ts | 9 ++------- connectors/src/connectors/zendesk/temporal/client.ts | 4 ++-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 1bf718c115ff..0637df4da5df 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -336,16 +336,13 @@ export class ZendeskConnectorManager extends BaseConnectorManager { toBeSignaledHelpCenterIds.size > 0 || toBeSignaledCategoryIds.size > 0 ) { - const sendSignalToWorkflowResult = await launchZendeskSyncWorkflow({ + return launchZendeskSyncWorkflow({ connector, brandIds: [...toBeSignaledBrandIds], ticketsBrandIds: [...toBeSignaledTicketsIds], helpCenterBrandIds: [...toBeSignaledHelpCenterIds], categoryIds: [...toBeSignaledCategoryIds], }); - if (sendSignalToWorkflowResult.isErr()) { - return new Err(sendSignalToWorkflowResult.error); - } } return new Ok(undefined); @@ -561,9 +558,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { const brandIds = await ZendeskBrandResource.fetchAllBrandIds({ connectorId, }); - const result = await launchZendeskSyncWorkflow({ connector, brandIds }); - - return result.isErr() ? result : new Ok(undefined); + return launchZendeskSyncWorkflow({ connector, brandIds }); } async garbageCollect(): Promise> { diff --git a/connectors/src/connectors/zendesk/temporal/client.ts b/connectors/src/connectors/zendesk/temporal/client.ts index a5dfe344bf8e..dbfbe1356ef0 100644 --- a/connectors/src/connectors/zendesk/temporal/client.ts +++ b/connectors/src/connectors/zendesk/temporal/client.ts @@ -31,7 +31,7 @@ export async function launchZendeskSyncWorkflow({ helpCenterBrandIds?: number[]; categoryIds?: number[]; forceResync?: boolean; -}): Promise> { +}): Promise> { if (startFromTs) { throw new Error("[Zendesk] startFromTs not implemented yet."); } @@ -85,7 +85,7 @@ export async function launchZendeskSyncWorkflow({ return new Err(err as Error); } - return new Ok(workflowId); + return new Ok(undefined); } export async function stopZendeskSyncWorkflow( From 12c7af68e7bf76dad4d714a5453f59a20eec88d3 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 31 Oct 2024 19:05:38 +0100 Subject: [PATCH 16/16] refactor: update the signature of launchZendeskSyncWorkflow --- connectors/src/connectors/zendesk/index.ts | 14 +++----- .../src/connectors/zendesk/temporal/client.ts | 35 ++++++++++--------- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/connectors/src/connectors/zendesk/index.ts b/connectors/src/connectors/zendesk/index.ts index 0637df4da5df..86c8aabf12a0 100644 --- a/connectors/src/connectors/zendesk/index.ts +++ b/connectors/src/connectors/zendesk/index.ts @@ -72,9 +72,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { { subdomain: "d3v-dust", conversationsSlidingWindow: 90 } ); - const workflowStartResult = await launchZendeskSyncWorkflow({ - connector, - }); + const workflowStartResult = await launchZendeskSyncWorkflow(connector); if (workflowStartResult.isErr()) { await connector.delete(); @@ -117,7 +115,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { } const dataSourceConfig = dataSourceConfigFromConnector(connector); - const result = await launchZendeskSyncWorkflow({ connector }); + const result = await launchZendeskSyncWorkflow(connector); if (result.isErr()) { logger.error( { @@ -143,8 +141,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { const brandIds = await ZendeskBrandResource.fetchAllBrandIds({ connectorId, }); - const result = await launchZendeskSyncWorkflow({ - connector, + const result = await launchZendeskSyncWorkflow(connector, { brandIds, forceResync: true, }); @@ -336,8 +333,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { toBeSignaledHelpCenterIds.size > 0 || toBeSignaledCategoryIds.size > 0 ) { - return launchZendeskSyncWorkflow({ - connector, + return launchZendeskSyncWorkflow(connector, { brandIds: [...toBeSignaledBrandIds], ticketsBrandIds: [...toBeSignaledTicketsIds], helpCenterBrandIds: [...toBeSignaledHelpCenterIds], @@ -558,7 +554,7 @@ export class ZendeskConnectorManager extends BaseConnectorManager { const brandIds = await ZendeskBrandResource.fetchAllBrandIds({ connectorId, }); - return launchZendeskSyncWorkflow({ connector, brandIds }); + return launchZendeskSyncWorkflow(connector, { brandIds }); } async garbageCollect(): Promise> { diff --git a/connectors/src/connectors/zendesk/temporal/client.ts b/connectors/src/connectors/zendesk/temporal/client.ts index dbfbe1356ef0..98bc02df54a6 100644 --- a/connectors/src/connectors/zendesk/temporal/client.ts +++ b/connectors/src/connectors/zendesk/temporal/client.ts @@ -15,23 +15,24 @@ export function getZendeskSyncWorkflowId(connectorId: ModelId): string { return `zendesk-sync-${connectorId}`; } -export async function launchZendeskSyncWorkflow({ - connector, - startFromTs = null, - brandIds = [], - ticketsBrandIds = [], - helpCenterBrandIds = [], - categoryIds = [], - forceResync = false, -}: { - connector: ConnectorResource; - startFromTs?: number | null; - brandIds?: number[]; - ticketsBrandIds?: number[]; - helpCenterBrandIds?: number[]; - categoryIds?: number[]; - forceResync?: boolean; -}): Promise> { +export async function launchZendeskSyncWorkflow( + connector: ConnectorResource, + { + startFromTs = null, + brandIds = [], + ticketsBrandIds = [], + helpCenterBrandIds = [], + categoryIds = [], + forceResync = false, + }: { + startFromTs?: number | null; + brandIds?: number[]; + ticketsBrandIds?: number[]; + helpCenterBrandIds?: number[]; + categoryIds?: number[]; + forceResync?: boolean; + } = {} +): Promise> { if (startFromTs) { throw new Error("[Zendesk] startFromTs not implemented yet."); }