Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connectors] Launch workflows from ZendeskConnectorManager #8346

Merged
merged 17 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 106 additions & 13 deletions connectors/src/connectors/zendesk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import type {
ContentNodesViewType,
Result,
} from "@dust-tt/types";
import { assertNever } from "@dust-tt/types";
import { Err } from "@dust-tt/types";
import { Ok } from "@dust-tt/types";
import { assertNever, Err, Ok } from "@dust-tt/types";

import type { ConnectorManagerError } from "@connectors/connectors/interface";
import { BaseConnectorManager } from "@connectors/connectors/interface";
Expand Down Expand Up @@ -37,13 +35,18 @@ import {
revokeSyncZendeskTickets,
} from "@connectors/connectors/zendesk/lib/ticket_permissions";
import { getZendeskAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token";
import {
launchZendeskSyncWorkflow,
stopZendeskSyncWorkflow,
} from "@connectors/connectors/zendesk/temporal/client";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import { ZendeskConfigurationResource } from "@connectors/resources/zendesk_resources";
import {
ZendeskArticleResource,
ZendeskBrandResource,
ZendeskCategoryResource,
ZendeskConfigurationResource,
ZendeskTicketResource,
} from "@connectors/resources/zendesk_resources";
import type { DataSourceConfig } from "@connectors/types/data_source_config";
Expand All @@ -69,6 +72,22 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
{ subdomain: "d3v-dust", conversationsSlidingWindow: 90 }
);

const workflowStartResult = await launchZendeskSyncWorkflow({
connectorId: connector.id,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit but we usually prefer to pass proper type, here you can pass Connector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do!


if (workflowStartResult.isErr()) {
await connector.delete();
logger.error(
{
workspaceId: dataSourceConfig.workspaceId,
error: workflowStartResult.error,
},
"[Zendesk] Error launching the sync workflow."
);
throw workflowStartResult.error;
}

return new Ok(connector.id.toString());
}

Expand All @@ -86,15 +105,52 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}

async stop(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const result = await stopZendeskSyncWorkflow(this.connectorId);
return result.isErr() ? result : new Ok(undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not directly returning result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types do not match here, but this is an error in stopZendeskSyncWorkflow, will fix it and return result here, thanks for spotting

}

async resume(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const connectorId = this.connectorId;
aubin-tchoi marked this conversation as resolved.
Show resolved Hide resolved
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}

const dataSourceConfig = dataSourceConfigFromConnector(connector);
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the try/catch if we have a Result type? Also in many other places there is no try/catch that wraps this call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, changed that in launchZendeskSyncWorkflow a while ago and got a little bit confused here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goal here is indeed to always use the Result type by catching it if necessary in the deepest level possible and return it in launchZendeskSyncWorkflow and then in resume.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

await launchZendeskSyncWorkflow({ connectorId });
} catch (e) {
logger.error(
{
workspaceId: dataSourceConfig.workspaceId,
dataSourceId: dataSourceConfig.dataSourceId,
error: e,
},
"[Zendesk] Error resuming the sync workflow."
);
}
return new Ok(undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems weird to not return the error in this case 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, i'll change that

}

async sync(): Promise<Result<string, Error>> {
throw new Error("Method not implemented.");
const connectorId = this.connectorId;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}

const brandIds = await ZendeskBrandResource.fetchAllBrandIds({
connectorId,
});
const result = await launchZendeskSyncWorkflow({
connectorId,
brandIds,
forceResync: true,
});

return result.isErr() ? result : new Ok(connector.id.toString());
}

async retrievePermissions({
Expand Down Expand Up @@ -146,9 +202,8 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}

const connectionId = connector.connectionId;
const zendeskConfiguration = await ZendeskConfigurationResource.fetchById(
this.connectorId
);
const zendeskConfiguration =
await ZendeskConfigurationResource.fetchByConnectorId(connectorId);
if (!zendeskConfiguration) {
logger.error(
{ connectorId },
Expand Down Expand Up @@ -276,7 +331,23 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}
}

/// Launch a sync workflow here
if (
toBeSignaledBrandIds.size > 0 ||
toBeSignaledTicketsIds.size > 0 ||
toBeSignaledHelpCenterIds.size > 0 ||
toBeSignaledCategoryIds.size > 0
) {
const sendSignalToWorkflowResult = await launchZendeskSyncWorkflow({
connectorId,
brandIds: [...toBeSignaledBrandIds],
ticketsBrandIds: [...toBeSignaledTicketsIds],
helpCenterBrandIds: [...toBeSignaledHelpCenterIds],
categoryIds: [...toBeSignaledCategoryIds],
});
if (sendSignalToWorkflowResult.isErr()) {
return new Err(sendSignalToWorkflowResult.error);
}
}

return new Ok(undefined);
}
Expand Down Expand Up @@ -469,11 +540,33 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}

async pause(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const connectorId = this.connectorId;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}
await connector.markAsPaused();
const result = await this.stop();

return result.isErr() ? result : new Ok(undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here? Can we simply return result?

}

async unpause(): Promise<Result<undefined, Error>> {
throw new Error("Method not implemented.");
const connectorId = this.connectorId;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}
await connector.markAsUnpaused();

const brandIds = await ZendeskBrandResource.fetchAllBrandIds({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we only resume with the brandIds?

Copy link
Contributor Author

@aubin-tchoi aubin-tchoi Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the brands are the top level collections, syncing them syncs everything

connectorId,
});
const result = await launchZendeskSyncWorkflow({ connectorId, brandIds });

return result.isErr() ? result : new Ok(undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here types don't match since we pass the workflow ID in the Ok of launchZendeskSyncWorkflow, turns out we don't need this workflow ID so i'll just change launchZendeskSyncWorkflow and return result everywhere appliable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flvndvd done

}

async garbageCollect(): Promise<Result<string, Error>> {
Expand Down
2 changes: 1 addition & 1 deletion connectors/src/connectors/zendesk/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async function _getZendeskConnectorOrRaise(connectorId: ModelId) {

async function _getZendeskConfigurationOrRaise(connectorId: ModelId) {
const configuration =
await ZendeskConfigurationResource.fetchById(connectorId);
await ZendeskConfigurationResource.fetchByConnectorId(connectorId);
if (!configuration) {
throw new Error("[Zendesk] Configuration not found.");
}
Expand Down
12 changes: 12 additions & 0 deletions connectors/src/resources/zendesk_resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ export class ZendeskBrandResource extends BaseResource<ZendeskBrand> {
return brands.map((brand) => new this(this.model, brand.get()));
}

static async fetchAllBrandIds({
connectorId,
}: {
connectorId: number;
}): Promise<number[]> {
const brands = await ZendeskBrand.findAll({
where: { connectorId },
attributes: ["brandId"],
});
return brands.map((brand) => brand.brandId);
}

static async fetchAllWithHelpCenter({
connectorId,
}: {
Expand Down
Loading