Skip to content

Commit

Permalink
feat(back): Area actions and reactions
Browse files Browse the repository at this point in the history
  • Loading branch information
EdenComp committed Nov 5, 2023
1 parent 3f64c4a commit cb91dc4
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 18 deletions.
2 changes: 2 additions & 0 deletions backend/back/src/connections/connections.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import { OauthController } from "./oauth.controller";
import { OauthService } from "./oauth.service";
import { UsersModule } from "../users/users.module";
import { AuthModule } from "../auth/auth.module";
import { GrpcModule } from "../grpc/grpc.module";

@Module({
imports: [
TypeOrmModule.forFeature([UserConnection, ServiceScope]),
forwardRef(() => ServicesModule),
HttpModule,
UsersModule,
forwardRef(() => GrpcModule),
forwardRef(() => AuthModule),
],
controllers: [ConnectionsController, OauthController],
Expand Down
12 changes: 12 additions & 0 deletions backend/back/src/connections/oauth.controller.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import {
Controller,
ForbiddenException,
forwardRef,
Get,
Inject,
InternalServerErrorException,
Logger,
Param,
Expand All @@ -17,6 +19,7 @@ import { ServiceIdParamDto } from "../param-validators.dto";
import { ConnectionsService } from "./connections.service";
import { AuthService } from "../auth/auth.service";
import { UsersService } from "../users/users.service";
import { GrpcService } from "../grpc/grpc.service";

@ApiTags("OAuth Callbacks")
@Controller("connections/oauth")
Expand All @@ -29,6 +32,8 @@ export class OauthController {
private readonly connectionService: ConnectionsService,
private readonly authService: AuthService,
private readonly usersService: UsersService,
@Inject(forwardRef(() => GrpcService))
private readonly grpcService: GrpcService,
) {}

@Get("/:serviceId/callback")
Expand Down Expand Up @@ -66,6 +71,13 @@ export class OauthController {
const connection = await this.connectionService.createUserConnection(userId, serviceId, scopes, data);
if (!connection) throw new InternalServerErrorException("Failed to create connection");
this.logger.log(`Connection created for user ${userId}, redirecting to frontend...`);
await this.grpcService.onAction({
name: "area-on-account-connect",
identifier: `area-on-account-connect-${userId}`,
params: {
service: serviceId,
},
});
}
return response.redirect(
`${this.configService.getOrThrow<string>("FRONT_OAUTH_REDIRECTION_URL")}?${queryParams.toString()}`,
Expand Down
2 changes: 2 additions & 0 deletions backend/back/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import { CreateDiscordOnGuildJoinAction1699140593736 } from "./workflows/seed/16
import { CreateTodoistTaskReactions1699140054686 } from "./workflows/seed/1699140054686-CreateTodoistTaskReactions";
import { CreateAreaService1699143731594 } from "./services/seed/1699143731594-CreateAreaService";
import { CreateAreaOnActionAction1699145276143 } from "./workflows/seed/1699145276143-CreateAreaOnActionAction";
import { CreateAreaAreas1699152935342 } from "./workflows/seed/1699152935342-CreateAreaAreas";

dotenv.config();

Expand Down Expand Up @@ -173,6 +174,7 @@ export const DATA_SOURCE_OPTIONS: DataSourceOptions = {
CreateTodoistTaskReactions1699140054686,
CreateAreaService1699143731594,
CreateAreaOnActionAction1699145276143,
CreateAreaAreas1699152935342,
],
synchronize: process.env.NODE_ENV === "development",
};
Expand Down
4 changes: 2 additions & 2 deletions backend/back/src/grpc/grpc.controller.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Controller } from "@nestjs/common";
import { GrpcMethod } from "@nestjs/microservices";
import { ApiExcludeController } from "@nestjs/swagger";
import { JobData } from "./grpc.dto";
import { JobData, JobError } from "./grpc.dto";
import { GrpcService } from "./grpc.service";

@ApiExcludeController()
Expand All @@ -20,7 +20,7 @@ export class GrpcController {
}

@GrpcMethod("AreaBackService", "OnError")
async onError(data: JobData): Promise<void> {
async onError(data: JobError): Promise<void> {
await this.grpcService.onError(data);
}
}
4 changes: 2 additions & 2 deletions backend/back/src/grpc/grpc.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { forwardRef, Inject, Injectable, Logger, OnModuleInit } from "@nestjs/common";
import { ClientGrpc } from "@nestjs/microservices";
import { AuthenticatedJobData, GrpcResponse, JobData, JobId, JobList } from "./grpc.dto";
import { AuthenticatedJobData, GrpcResponse, JobData, JobError, JobId, JobList } from "./grpc.dto";
import { firstValueFrom, Observable } from "rxjs";
import { JobsParams, JobsType } from "../types/jobs";
import { JobsIdentifiers } from "../types/jobIds";
Expand Down Expand Up @@ -95,7 +95,7 @@ export class GrpcService implements OnModuleInit {
await this.jobsService.launchNextJob(data);
}

async onError(data: JobData) {
async onError(data: JobError) {
this.logger.error(`Received error job data ${JSON.stringify(data, undefined, 2)}`);
await this.activityService.createActivityLogsForJobIdentifier("error", data.identifier);
}
Expand Down
52 changes: 52 additions & 0 deletions backend/back/src/jobs/back-jobs.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { forwardRef, Inject, Injectable } from "@nestjs/common";
import { JobData } from "../grpc/grpc.dto";
import { JobsType } from "../types/jobs";
import { JobsService } from "./jobs.service";
import { WorkflowToggleParams } from "../types/jobParams";
import { WorkflowsService } from "../workflows/workflows.service";
import { GrpcService } from "../grpc/grpc.service";
import { JobsIdentifiers } from "../types/jobIds";

@Injectable()
export class BackJobsService {
constructor(
@Inject(forwardRef(() => GrpcService)) private readonly grpcService: GrpcService,
@Inject(forwardRef(() => JobsService)) private readonly jobsService: JobsService,
private readonly workflowsService: WorkflowsService,
) {}

async toggleWorkflow(params: WorkflowToggleParams, newState: boolean) {
const jobName = `area-${newState ? "enable" : "disable"}-workflow`;
const workflow = await this.workflowsService.getWorkflowIdByName(params.workflowName, params.ownerId);

if (workflow.active === newState) return;
try {
await this.workflowsService.toggleWorkflow(workflow.id, newState, params.ownerId);
await this.grpcService.onAction({
name: jobName,
identifier: `area-${newState ? "enable" : "disable"}-workflow-${params.ownerId}`,
params: {},
});
} catch (e) {
await this.grpcService.onError({
identifier: JobsIdentifiers[jobName](params),
error: e.message,
isAuthError: false,
});
}
}

async executeBackJob(job: JobData) {
const jobType: JobsType = job.name as JobsType;
const params = await this.jobsService.convertParams(jobType, job.params);

switch (jobType) {
case "area-disable-workflow":
await this.toggleWorkflow(params as WorkflowToggleParams, false);
break;
case "area-enable-workflow":
await this.toggleWorkflow(params as WorkflowToggleParams, true);
break;
}
}
}
12 changes: 12 additions & 0 deletions backend/back/src/jobs/jobs.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { JobsType } from "../types/jobs";

export const AREAS_WITHOUT_GRPC: JobsType[] = [
"area-disable-workflow",
"area-enable-workflow",
"area-on-account-connect",
"area-on-action",
"area-on-workflow-create",
"area-on-workflow-toggle",
"facebook-on-status-create",
"linear-on-issue-create",
];
3 changes: 2 additions & 1 deletion backend/back/src/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { WorkflowsModule } from "../workflows/workflows.module";
import { TypeOrmModule } from "@nestjs/typeorm";
import WorkflowArea from "../workflows/entities/workflow-area.entity";
import { ConnectionsModule } from "../connections/connections.module";
import { BackJobsService } from "./back-jobs.service";

@Module({
imports: [
Expand All @@ -14,7 +15,7 @@ import { ConnectionsModule } from "../connections/connections.module";
forwardRef(() => GrpcModule),
],
controllers: [],
providers: [JobsService],
providers: [BackJobsService, JobsService],
exports: [JobsService],
})
export class JobsModule {}
23 changes: 17 additions & 6 deletions backend/back/src/jobs/jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import { Repository } from "typeorm";
import { AuthenticatedJobData, JobData } from "../grpc/grpc.dto";
import { RuntimeException } from "@nestjs/core/errors/exceptions";
import { ConnectionsService } from "../connections/connections.service";
import { uniq, uniqBy } from "lodash";
import { partition, uniq, uniqBy } from "lodash";
import { AREAS_WITHOUT_GRPC } from "./jobs.dto";
import { BackJobsService } from "./back-jobs.service";

@Injectable()
export class JobsService {
Expand All @@ -21,6 +23,7 @@ export class JobsService {
@Inject(forwardRef(() => GrpcService)) private readonly grpcService: GrpcService,
@Inject(forwardRef(() => WorkflowsService)) private readonly workflowsService: WorkflowsService,
@InjectRepository(WorkflowArea) private readonly workflowAreaRepository: Repository<WorkflowArea>,
private readonly backJobsService: BackJobsService,
) {}

getJobName(areaServiceId: string, areaId: string): JobsType {
Expand Down Expand Up @@ -61,9 +64,13 @@ export class JobsService {
);
}

async getReactionsForJob(jobId: string): Promise<AuthenticatedJobData[]> {
async getReactionsForJob(jobId: string, active?: boolean): Promise<AuthenticatedJobData[]> {
if (active !== undefined) this.logger.log(`The workflows need to be ${active ? "active" : "inactive"}`);
const jobs = await this.workflowAreaRepository.find({
where: { jobId },
where: [
{ jobId, actionOfWorkflow: { active } },
{ jobId, workflow: { active } },
],
relations: { nextWorkflowReactions: { area: true }, workflow: true, actionOfWorkflow: true },
});
const nextJobs = await Promise.all(
Expand Down Expand Up @@ -92,7 +99,7 @@ export class JobsService {
async getWorkflowOwnersForJob(jobId: string): Promise<string[]> {
const jobs = await this.workflowAreaRepository.find({
where: { jobId },
relations: { nextWorkflowReactions: { area: true }, workflow: true, actionOfWorkflow: true },
relations: { actionOfWorkflow: true },
});
const owners = jobs.map((job) => job.actionOfWorkflow.ownerId);
return uniq(owners);
Expand Down Expand Up @@ -123,9 +130,10 @@ export class JobsService {

async launchJobs(jobs: AuthenticatedJobData[]): Promise<void> {
const uniqueJobs = uniqBy(jobs, (job) => job.identifier);
const [backJobs, grpcJobs] = partition(uniqueJobs, (job) => AREAS_WITHOUT_GRPC.includes(job.name as JobsType));

this.logger.log(`Launching ${uniqueJobs.length} jobs`);
for (const job of uniqueJobs) {
for (const job of grpcJobs) {
const jobType: JobsType = job.name as JobsType;
const params = await this.convertParams(jobType, job.params);
const response = await this.grpcService.launchJob(jobType, params, job.auth);
Expand All @@ -135,6 +143,9 @@ export class JobsService {
throw new RuntimeException(`Error while launching job: ${response.error.message}`);
}
}
for (const job of backJobs) {
await this.backJobsService.executeBackJob(job);
}
}

replaceParamsInJob(oldJob: unknown, params: unknown): unknown {
Expand All @@ -152,7 +163,7 @@ export class JobsService {
}

async launchNextJob(data: JobData): Promise<void> {
const jobs = await this.getReactionsForJob(data.identifier);
const jobs = await this.getReactionsForJob(data.identifier, true);
this.logger.log(`Launching next ${jobs.length} jobs for job ${data.identifier}`);
for (const job of jobs) {
job.params = this.replaceParamsInJob(data.params, job.params);
Expand Down
5 changes: 5 additions & 0 deletions backend/back/src/types/jobIds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ function uniqueWebhookId<TJobType extends JobsType>(jobId: TJobType, webhookId:

export const JobsIdentifiers: JobsIdentifiers = {
"airtable-delete-record": ({ workflowStepId }) => uniqueJobId("airtable-delete-record", workflowStepId),
"area-disable-workflow": ({ workflowStepId }) => uniqueJobId("area-disable-workflow", workflowStepId),
"area-enable-workflow": ({ workflowStepId }) => uniqueJobId("area-enable-workflow", workflowStepId),
"area-on-account-connect": ({ ownerId }) => `area-on-account-connect-${ownerId}`,
"area-on-action": ({ ownerId }) => `area-on-action-${ownerId}`,
"area-on-workflow-create": ({ ownerId }) => `area-on-workflow-create-${ownerId}`,
"area-on-workflow-toggle": ({ ownerId }) => `area-on-workflow-toggle-${ownerId}`,
"discord-on-guild-join": ({ workflowStepId }) => uniqueJobId("discord-on-guild-join", workflowStepId),
"facebook-on-status-create": ({ pageId }) => uniqueWebhookId("facebook-on-status-create", pageId),
"github-close-issue": ({ workflowStepId }) => uniqueJobId("github-close-issue", workflowStepId),
Expand Down
10 changes: 10 additions & 0 deletions backend/back/src/types/jobParams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ export class OwnerJobParams {
ownerId: string;
}

export class OwnerUniqueJobParams extends UniqueJobParams {
@IsUUID(4)
ownerId: string;
}

export class WorkflowToggleParams extends OwnerUniqueJobParams {
@IsString()
workflowName: string;
}

export class TimerSecondIntervalParams extends UniqueJobParams {
@IsNumber()
seconds: number;
Expand Down
6 changes: 6 additions & 0 deletions backend/back/src/types/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@ import {
TodoistUpdateTaskParams,
TwitterCreateTweetParams,
UniqueJobParams,
WorkflowToggleParams,
YoutubeChannelParams,
} from "./jobParams";

export const JobParamsClasses = {
"airtable-delete-record": AirtableDeleteRecordParams,
"area-disable-workflow": WorkflowToggleParams,
"area-enable-workflow": WorkflowToggleParams,
"area-on-account-connect": OwnerJobParams,
"area-on-action": OwnerJobParams,
"area-on-workflow-create": OwnerJobParams,
"area-on-workflow-toggle": OwnerJobParams,
"discord-on-guild-join": UniqueJobParams,
"facebook-on-status-create": FacebookPageParams,
"github-close-issue": GithubCloseIssueParams,
Expand Down
41 changes: 41 additions & 0 deletions backend/back/src/workflows/seed/1699152935342-CreateAreaAreas.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { MigrationInterface, QueryRunner } from "typeorm";
import { ParametersFormFlowFieldDto } from "../../services/dto/area.dto";

export class CreateAreaAreas1699152935342 implements MigrationInterface {
private readonly areaWorkspaceParametersFormFlow: ParametersFormFlowFieldDto[] = [
{
name: "workflowName",
type: "short-text",
required: true,
},
];

private readonly areaAccountConnectParametersReturnFlow: string[] = ["service"];
private readonly areaWorkspaceCreateParametersReturnFlow: string[] = ["name"];
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`INSERT INTO "area" ("id", "service_id", "is_action", "description", "parameters_form_flow", "parameters_return_flow")
VALUES ('disable-workflow', 'area', false, 'Disable a workflow', $1, '{}'),
('enable-workflow', 'area', false, 'Enable a workflow', $1, '{}'),
('on-account-connect', 'area', true, 'Triggers when a new OAuth account is connected', '[]', '{${this.areaAccountConnectParametersReturnFlow
.map((v) => `"${v}"`)
.join(",")}}'),
('on-workflow-create', 'area', true, 'Triggers when a new workflow is created', '[]', '{${this.areaWorkspaceCreateParametersReturnFlow
.map((v) => `"${v}"`)
.join(",")}}'),
('on-workflow-toggle', 'area', true, 'Triggers when a workflow is enabled', '[]', '{${this.areaWorkspaceCreateParametersReturnFlow
.map((v) => `"${v}"`)
.join(",")}}')`,
[JSON.stringify(this.areaWorkspaceParametersFormFlow)],
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`DELETE FROM "area"
WHERE service_id = 'area'
AND id IN ('disable-workflow', 'enable-workflow', 'on-account-connect', 'on-workflow-create', 'on-workflow-toggle')
`,
);
}
}
2 changes: 2 additions & 0 deletions backend/back/src/workflows/workflows.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import { User } from "../users/entities/user.entity";
import { JobsModule } from "../jobs/jobs.module";
import { ConnectionsModule } from "../connections/connections.module";
import { ServicesModule } from "../services/services.module";
import { GrpcModule } from "../grpc/grpc.module";

@Module({
imports: [
TypeOrmModule.forFeature([Workflow, WorkflowArea, Area, User]),
forwardRef(() => GrpcModule),
forwardRef(() => JobsModule),
ConnectionsModule,
ServicesModule,
Expand Down
Loading

0 comments on commit cb91dc4

Please sign in to comment.