Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/bull_mq_integration #2735

Open
wants to merge 17 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,388 changes: 1,037 additions & 351 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions services/workflows-service/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ WEB_UI_SDK_URL=http://localhost:5202
#HASHING_KEY_SECRET="$2b$10$FovZTB91/QQ4Yu28nvL8e."
HASHING_KEY_SECRET_BASE64=JDJiJDEwJDNFeWtwWEs4QkdiczlRaWFwLkM4Vk8=
NOTION_API_KEY=secret
REDIS_HOST=localhost
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want something like USE_REDIS - to change between using redis to im-mem implmenation

REDIS_PORT=7381
REDIS_PASSWORD=password
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security concern: Weak Redis password.

The REDIS_PASSWORD=password is a significant security risk. Even though this is an example file, it's crucial to:

  1. Use a strong, unique password for each environment.
  2. Never commit actual passwords to version control.
  3. Consider using a secret management system for production environments.

Replace the current line with:

-REDIS_PASSWORD=password
+REDIS_PASSWORD=<strong-unique-password>

Also, add a comment above this line:

# IMPORTANT: Replace with a strong, unique password. Never commit the actual password.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
REDIS_PASSWORD=password
# IMPORTANT: Replace with a strong, unique password. Never commit the actual password.
REDIS_PASSWORD=<strong-unique-password>

22 changes: 22 additions & 0 deletions services/workflows-service/docker-compose.redis.yml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you create a new docker-compose file and didn't use the existing one?

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3.8'
services:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it located on a new docker image? Our service has to use redis in order to run ...
you can place it with a dependency and when Redis is ready start workflow service

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant we use the same docker-compose-db.yml ?

we can use same like this

version: '3'
services:
  db:
    image: sibedge/postgres-plv8:15.3-3.1.7
    ports:
      - ${DB_PORT}:5432
    environment:
      POSTGRES_USER: ${DB_USER}
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres15:/var/lib/postgresql/data
  redis:
    image: redis:alpine
    ports:
      - '${REDIS_PORT}:6379'
    volumes:
      - redis-data:/data
    command: >
      --requirepass ${REDIS_PASSWORD}
      --appendonly yes
    environment:
      - REDIS_PASSWORD=${REDIS_PASSWORD}
      - REDIS_PORT=${REDIS_PORT}
volumes:
  postgres15: ~
  redis-data:
    driver: local

Bring only redis using :

This Line
can be replaced with the following

docker:redis: docker compose -f docker-compose.db.yml up -d redis --wait
docker:redis:down: docker compose -f docker-compose.db.yml down -v redis,

redis:
image: redis:alpine
Copy link
Collaborator

@MatanYadaev MatanYadaev Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it for local only? Because the Alpine image is less performant than the Debian image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's local only

ports:
- '${REDIS_PORT}:6379'
volumes:
- redis-data:/data
command: >
--requirepass ${REDIS_PASSWORD}
--appendonly yes
environment:
- REDIS_PASSWORD=${REDIS_PASSWORD}
- REDIS_PORT=${REDIS_PORT}
networks:
- app-network
volumes:
redis-data:
driver: local
networks:
app-network:
driver: bridge
9 changes: 8 additions & 1 deletion services/workflows-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"description": "workflow-service",
"scripts": {
"spellcheck": "cspell \"*\"",
"setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run db:reset && npm run seed",
"setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && npm run db:reset && npm run seed",
Copy link
Collaborator

@pratapalakshmi pratapalakshmi Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is better

"setup": "cp .env.example .env && npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && npm run db:reset && npm run seed",

"format": "prettier --write . '!**/*.{md,hbs}'",
"format:check": "prettier --check . '!**/*.{md,hbs}'",
"lint": "eslint . --fix",
Expand All @@ -30,6 +30,8 @@
"db:reset:dev:with-data": "npm run db:reset:dev && npm run db:data-migration:migrate && npm run db:data-sync",
"db:init": "npm run db:migrate-dev -- --name 'initial version' && npm run db:migrate-up seed",
"prisma:generate": "prisma generate",
"docker:redis": "docker compose -f docker-compose.redis.yml up -d --wait",
Copy link
Collaborator

@MatanYadaev MatanYadaev Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you run this command this way, the REDIS_PORT variable won't be loaded from the dotenv. Is it part of the requirements?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from what i checked it worked

"docker:redis:down": "docker compose -f docker-compose.redis.yml down --volumes",
"docker:db": "docker compose -f docker-compose.db.yml up -d --wait",
"docker:db:down": "docker compose -f docker-compose.db.yml down --volumes",
"docker:build": "docker build .",
Expand All @@ -50,8 +52,12 @@
"@ballerine/common": "0.9.33",
"@ballerine/workflow-core": "0.6.45",
"@ballerine/workflow-node-sdk": "0.6.45",
"@bull-board/api": "^6.0.0",
"@bull-board/express": "^6.0.0",
"@bull-board/nestjs": "^6.0.0",
"@faker-js/faker": "^7.6.0",
"@nestjs/axios": "^2.0.0",
"@nestjs/bullmq": "^10.2.1",
"@nestjs/common": "^9.3.12",
"@nestjs/config": "2.3.1",
"@nestjs/core": "^9.3.12",
Expand All @@ -78,6 +84,7 @@
"ballerine-nestjs-typebox": "3.0.2-next.11",
"base64-stream": "^1.0.0",
"bcrypt": "5.1.0",
"bullmq": "^5.13.2",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should also include redis client

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add bull-arena which is the UI component for bull

"class-transformer": "0.5.1",
"class-validator": "0.14.0",
"concat-stream": "^2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion services/workflows-service/prisma/data-migrations
11 changes: 7 additions & 4 deletions services/workflows-service/src/alert/alert.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { ProjectModule } from '@/project/project.module';
import { UserRepository } from '@/user/user.repository';
import { AlertDefinitionModule } from '@/alert-definition/alert-definition.module';
import { SentryModule } from '@/sentry/sentry.module';
import { BullMqModule } from '@/bull-mq/bull-mq.module';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

@Module({
imports: [
Expand All @@ -32,6 +34,8 @@ import { SentryModule } from '@/sentry/sentry.module';
PrismaModule,
SentryModule,
ProjectModule,
BullMqModule,
OutgoingWebhooksModule,
HttpModule.register({
timeout: 5000,
maxRedirects: 10,
Expand All @@ -48,15 +52,14 @@ import { SentryModule } from '@/sentry/sentry.module';
],
controllers: [AlertControllerInternal, AlertControllerExternal],
providers: [
{
provide: WebhookHttpService,
useExisting: HttpService,
},
WebhookHttpService,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Review module inclusion in providers array.

The simplification of WebhookHttpService provider is good. However, including BullMqModule and OutgoingWebhooksModule in both the imports and providers arrays is unusual and potentially incorrect.

Modules are typically only included in the imports array, not in providers. The providers array is meant for services, not modules.

Consider removing BullMqModule and OutgoingWebhooksModule from the providers array:

 providers: [
   WebhookHttpService,
   AlertService,
   AlertRepository,
   AlertDefinitionRepository,
   WebhookManagerService,
   WebhookEventEmitterService,
-  BullMqModule,
-  OutgoingWebhooksModule,
   // TODO: Export to user module
   UserService,
   UserRepository,
   PasswordService,
 ],

Also applies to: 61-62

AlertService,
AlertRepository,
AlertDefinitionRepository,
WebhookManagerService,
WebhookEventEmitterService,
BullMqModule,
OutgoingWebhooksModule,
// TODO: Export to user module
UserService,
UserRepository,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import { alertWebhookFailure } from '@/events/alert-webhook-failure';
import { lastValueFrom } from 'rxjs';
import * as common from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ClsService } from 'nestjs-cls';
import { SentryInterceptor } from '@/sentry/sentry.interceptor';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { WebhookEventEmitterService } from './webhook-event-emitter.service';
import { IWebhookEntityEventData } from './types';
import { Webhook } from '@/events/get-webhooks';
import { HttpService } from '@nestjs/axios';
import { sign } from '@ballerine/common';
import { AnyRecord } from '@ballerine/common';
import { env } from '@/env';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { OutgoingWebhooksService } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.service';

@common.Injectable()
export abstract class WebhookHttpService extends HttpService {}
export class WebhookHttpService {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unused Empty Class WebhookHttpService

The class WebhookHttpService is currently empty without any methods or properties. If it's no longer needed, consider removing it to keep the codebase clean. If it's a placeholder for future development, adding a comment to indicate its intended use would be helpful.


@common.Injectable()
@common.UseInterceptors(SentryInterceptor)
export class WebhookManagerService {
constructor(
private readonly cls: ClsService,
protected readonly logger: AppLoggerService,
protected readonly configService: ConfigService,
protected readonly httpService: WebhookHttpService,
protected readonly outgoingQueueWebhookService: OutgoingWebhookQueueService,
protected readonly outgoingWebhookService: OutgoingWebhooksService,
protected readonly webhookEventEmitter: WebhookEventEmitterService,
) {
webhookEventEmitter.on('*', async (eventData: any) => {
Expand All @@ -45,21 +45,42 @@ export class WebhookManagerService {
webhook: Webhook;
webhookSharedSecret: string;
}) {
try {
const { id, url, environment, apiVersion } = webhook;
const { id, url, environment, apiVersion } = webhook;

if (env.QUEUE_SYSTEM_ENABLED) {
return await this.outgoingQueueWebhookService.addJob({
requestConfig: {
url,
method: 'POST',
headers: {},
body: data as unknown as AnyRecord,
timeout: 15_000,
},
customerConfig: {
webhookSharedSecret,
},
});
}

try {
this.logger.log('Sending webhook', { id, url });

const res = await lastValueFrom(
this.httpService.post(url, data, {
headers: {
'X-HMAC-Signature': sign({
payload: data,
key: webhookSharedSecret,
}),
},
}),
);
const response = await this.outgoingWebhookService.invokeWebhook({
requestConfig: {
url,
method: 'POST',
headers: {},
body: data as unknown as AnyRecord,
timeout: 15_000,
},
customerConfig: {
webhookSharedSecret,
},
});

if (response.status < 200 || response.status >= 300) {
throw new Error(`Webhook failed with status ${response.status} for ${url}`);
}
} catch (error: Error | any) {
this.logger.error('Webhook error data', {
data,
Expand Down
8 changes: 6 additions & 2 deletions services/workflows-service/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import { WorkflowModule } from '@/workflow/workflow.module';
import { TransactionModule } from '@/transaction/transaction.module';
import { AlertModule } from '@/alert/alert.module';
import { SwaggerController } from './swagger/swagger.controller';
import { WebhooksModule } from '@/webhooks/webhooks.module';
import { BusinessReportModule } from '@/business-report/business-report.module';
import { ScheduleModule } from '@nestjs/schedule';
import { CronModule } from '@/workflow/cron/cron.module';
Expand All @@ -48,6 +47,9 @@ import { hashKey } from './customer/api-key/utils';
import { RuleEngineModule } from './rule-engine/rule-engine.module';
import { NotionModule } from '@/notion/notion.module';
import { SecretsManagerModule } from '@/secrets-manager/secrets-manager.module';
import { BullMqModule } from '@/bull-mq/bull-mq.module';
import { IncomingWebhooksModule } from '@/webhooks/incoming/incoming-webhooks.module';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

export const validate = async (config: Record<string, unknown>) => {
const zodEnvSchema = z
Expand Down Expand Up @@ -85,7 +87,8 @@ export const validate = async (config: Record<string, unknown>) => {
EventEmitterModule.forRoot(),
UserModule,
WorkflowModule,
WebhooksModule,
IncomingWebhooksModule,
OutgoingWebhooksModule,
UiDefinitionModule,
StorageModule,
DataMigrationModule,
Expand Down Expand Up @@ -126,6 +129,7 @@ export const validate = async (config: Record<string, unknown>) => {
RuleEngineModule,
NotionModule,
SecretsManagerModule,
BullMqModule,
],
providers: [
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { ConnectionOptions, Job, Queue, Worker } from 'bullmq';
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { REDIS_CONFIG } from '@/redis/const/redis-config';
import { env } from '@/env';

@Injectable()
export abstract class BaseQueueWorkerService<T = any> implements OnModuleDestroy {
protected queue?: Queue;
protected worker?: Worker;
protected connectionOptions: ConnectionOptions;

protected constructor(private queueName: string) {
this.connectionOptions = {
...REDIS_CONFIG,
};

if (env.QUEUE_SYSTEM_ENABLED !== true) {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure proper type comparison for environment variable QUEUE_SYSTEM_ENABLED.

Environment variables are typically strings. Comparing env.QUEUE_SYSTEM_ENABLED directly to true (a boolean) may not yield the expected result. Consider parsing the environment variable to a boolean before the comparison.

Apply this diff to fix the comparison:

     if (env.QUEUE_SYSTEM_ENABLED !== true) {
-      return;
+      return;
     }

Or parse the environment variable:

     protected constructor(private queueName: string) {
+      const isQueueSystemEnabled = env.QUEUE_SYSTEM_ENABLED === 'true';
       this.connectionOptions = {
         ...REDIS_CONFIG,
       };

-      if (env.QUEUE_SYSTEM_ENABLED !== true) {
+      if (!isQueueSystemEnabled) {
         return;
       }

Committable suggestion was skipped due to low confidence.


this.queue = new Queue(queueName, { connection: this.connectionOptions });
}

abstract handleJob(job: Job<T>): Promise<void>;

async addJob(jobData: T, jobOptions = {}): Promise<void> {
await this.queue?.add(this.queueName, jobData, jobOptions);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling in addJob method.

The current implementation silently fails when the queue is undefined. Implement explicit error handling to make the behavior more predictable and easier to debug.

Apply this diff to improve the addJob method:

   async addJob(jobData: T, jobOptions = {}): Promise<void> {
-    await this.queue?.add(this.queueName, jobData, jobOptions);
+    if (!this.queue) {
+      this.logger.warn('Attempted to add job while queue system is disabled');
+      throw new Error('Queue system is disabled. Cannot add job.');
+    }
+    try {
+      await this.queue.add(this.queueName, jobData, jobOptions);
+    } catch (error) {
+      this.logger.error(`Failed to add job to queue: ${error.message}`);
+      throw error;
+    }
   }

This change ensures that callers are aware when jobs cannot be added due to a disabled queue system and provides better error logging.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async addJob(jobData: T, jobOptions = {}): Promise<void> {
await this.queue?.add(this.queueName, jobData, jobOptions);
}
async addJob(jobData: T, jobOptions = {}): Promise<void> {
if (!this.queue) {
this.logger.warn('Attempted to add job while queue system is disabled');
throw new Error('Queue system is disabled. Cannot add job.');
}
try {
await this.queue.add(this.queueName, jobData, jobOptions);
} catch (error) {
this.logger.error(`Failed to add job to queue: ${error.message}`);
throw error;
}
}


protected initializeWorker() {
this.worker = new Worker(
this.queueName,
async (job: Job<T>) => {
await this.handleJob(job);
},
{ connection: this.connectionOptions },
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add a check for the queue system being enabled in initializeWorker method.

If the queue system is disabled, initializing the worker is unnecessary and may cause unintended behavior. Add a condition to ensure the worker is only initialized when the queue system is enabled.

Apply this diff to add the check:

   protected initializeWorker() {
+    if (!this.queue) {
+      return;
+    }
     this.worker = new Worker(
       this.queueName,
       async (job: Job<T>) => {
         await this.handleJob(job);
       },
       { connection: this.connectionOptions },
     );
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected initializeWorker() {
this.worker = new Worker(
this.queueName,
async (job: Job<T>) => {
await this.handleJob(job);
},
{ connection: this.connectionOptions },
);
}
protected initializeWorker() {
if (!this.queue) {
return;
}
this.worker = new Worker(
this.queueName,
async (job: Job<T>) => {
await this.handleJob(job);
},
{ connection: this.connectionOptions },
);
}


async onModuleDestroy() {
await Promise.all([this.worker?.close(), this.queue?.close()]);
}
}
45 changes: 45 additions & 0 deletions services/workflows-service/src/bull-mq/bull-mq.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { BullBoardModule } from '@bull-board/nestjs';
import { ExpressAdapter } from '@bull-board/express';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { AppLoggerModule } from '@/common/app-logger/app-logger.module';
import { QUEUES } from '@/bull-mq/consts';
import { OutgoingWebhookQueueService } from '@/bull-mq/outgoing-webhook/outgoing-webhook-queue.service';
import { REDIS_CONFIG } from '@/redis/const/redis-config';
import { OutgoingWebhooksModule } from '@/webhooks/outgoing-webhooks/outgoing-webhooks.module';

@Module({
imports: [
AppLoggerModule,
OutgoingWebhooksModule,
BullModule.forRootAsync({
useFactory: () => {
return {
connection: {
...REDIS_CONFIG,
},
};
},
}),
BullModule.registerQueue(
...Object.values(QUEUES).map(queue => ({
name: queue.name,
...queue.config,
})),
),
BullBoardModule.forRoot({
route: '/queues',
adapter: ExpressAdapter,
}),
...Object.values(QUEUES).map(queue =>
BullBoardModule.forFeature({
name: queue.name,
adapter: BullAdapter,
}),
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

'/queues' route is not secured.

The '/queues' route in BullBoardModule is currently exposed without authentication or authorization measures. It's recommended to secure this route to prevent unauthorized access to the queue monitoring interface.

  • Implement authentication middleware for the '/queues' route.
  • Review and enhance global security configurations to ensure they protect all sensitive routes.
🔗 Analysis chain

BullBoardModule configuration is correct, but consider security implications.

The BullBoardModule is properly configured for monitoring queues, using the correct adapters and dynamically adding all queues from the QUEUES constant.

However, the '/queues' route might need to be secured in a production environment. Let's verify if there are any security measures in place:

If no security measures are found, consider adding authentication middleware to protect the '/queues' route.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for security measures on the '/queues' route

# Test: Search for middleware or guards applied to the '/queues' route
rg --type typescript -A 5 "route: '/queues'"

# Test: Search for any global guards or middleware in the main application file
rg --type typescript -A 10 'const app = await NestFactory.create'

Length of output: 190


Script:

#!/bin/bash
# Description: Check for security measures on the '/queues' route

# Test: Search for middleware or guards applied to the '/queues' route
rg "route: '/queues'" -A 5 -g "*.ts" -g "*.tsx"

# Test: Search for any global guards or middleware in the main application file
rg "const app = await NestFactory.create" -A 10 -g "*.ts" -g "*.tsx"

Length of output: 2919

],
providers: [OutgoingWebhookQueueService],
exports: [BullModule, OutgoingWebhookQueueService],
})
export class BullMqModule {}
24 changes: 24 additions & 0 deletions services/workflows-service/src/bull-mq/consts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { BaseJobOptions } from 'bullmq/dist/esm/interfaces';

export const QUEUES = {
INCOMING_WEBHOOKS_QUEUE: {
name: 'incoming-webhook-queue',
config: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
OUTGOING_WEBHOOKS_QUEUE: {
name: 'outgoing-webhook-queue',
config: {
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
} satisfies Record<string, { name: string; config: BaseJobOptions }>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Injectable } from '@nestjs/common';
import { BaseQueueWorkerService } from '@/bull-mq/base/base-queue-worker.service';
import { IncomingWebhookData } from '@/bull-mq/incoming-webhook/types/types';
import { AppLoggerService } from '@/common/app-logger/app-logger.service';
import { QUEUES } from '@/bull-mq/consts';
import { Job } from 'bullmq';

@Injectable()
export class IncomingWebhookQueueService extends BaseQueueWorkerService<IncomingWebhookData> {
constructor(protected readonly logger: AppLoggerService) {
super(QUEUES.INCOMING_WEBHOOKS_QUEUE.name);
this.initializeWorker();
}

async handleJob(job: Job<IncomingWebhookData>) {
this.logger.log(`Processing webhook job ${job.id}`);

const { service: workingService, payload } = job.data;
// TODO - handle the invoking webhook job internally
}

protected initializeWorker() {
super.initializeWorker();

this.worker?.on('completed', (job: Job) => {
this.logger.log(`Outgoing Webhook job ${job.id} completed successfully`);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect log messages: Replace 'Outgoing' with 'Incoming'

The logging statements in the initializeWorker method incorrectly refer to "Outgoing Webhook" jobs, whereas this service handles incoming webhooks. This could lead to confusion when monitoring logs and debugging.

Apply this diff to correct the log messages:

 this.worker?.on('completed', (job: Job) => {
-  this.logger.log(`Outgoing Webhook job ${job.id} completed successfully`);
+  this.logger.log(`Incoming Webhook job ${job.id} completed successfully`);
 });

 this.worker?.on('failed', (job, error, prev) => {
-  this.logger.error(`Outgoing Webhook job ${job?.id} failed after retries: ${error.message}`);
+  this.logger.error(`Incoming Webhook job ${job?.id} failed after retries: ${error.message}`);
 });

 this.queue?.on('cleaned', (jobs, type) => {
   this.logger.log(
-    `${jobs.length} ${type} Outgoing Webhook jobs have been cleaned from the webhook queue`,
+    `${jobs.length} ${type} Incoming Webhook jobs have been cleaned from the webhook queue`,
   );
 });

Also applies to: 29-31, 34-36


this.worker?.on('failed', (job, error, prev) => {
this.logger.error(`Outgoing Webhook job ${job?.id} failed after retries: ${error.message}`);
});

this.queue?.on('cleaned', (jobs, type) => {
this.logger.log(
`${jobs.length} ${type} Outgoing Webhook jobs have been cleaned from the webhook queue`,
);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface IncomingWebhookData {
source: string;
payload: Record<string, unknown>;
service: (payload: Record<string, unknown>) => Promise<void>;
}
Loading
Loading