A decent module for a more advanced communication between microservices.
This package provides a more advanced communication between microservices using the full power of RabbitMQ.
- Separate server/client components
- Allowing multiple connections to one or more RabbitMQ server
- Just implement the
QueueHandler
interface and mark the class with one of the following decorators@Listener()
- Basic consumer, the simplest thing that does something@Worker()
- Work Queues, distributing tasks among workers@PubSub()
- Publish/Subscribe, sending messages to many consumers at once@Routing()
- Routing, receiving messages selectively@Topics()
- Topics, receiving messages based on a pattern@Rpc()
- RPC, Request/reply pattern
- Optional validation of message content using class-validator
- Lightweight wrapper of the
amqplib
for the Nest ecosystem
nest-rmq
must be integrated into the ecosystem of Nest, so your application must require @nestjs/common
and @nestjs/core
. You can replace all npm
commands with the package manager of your choice. So if you would like to use yarn
, replace npm
with yarn
.
# from official npm registry
$ npm i --save @mkfyi/nestjs-rmq
# using yarn
$ yarn add @mkfyi/nestjs-rmq
# from GitHub package registry
$ npm i --save --registry=https://npm.pkg.github.com @mkfyi/nestjs-rmq
# from GitHub package registry using yarn
$ yarn add --registry=https://npm.pkg.github.com @mkfyi/nestjs-rmq
Since nest-rmq
is built on top of amqplib you also need to install the types for it.
$ npm install -D @types/amqplib
Import the RabbitMQModule
from @mkfyi/nestjs-rmq
and call the forRoot()
method inside the imports of your application module. You can also set a custom name
for the connection, otherwise default
will be used.
import { RabbitMQModule } from '@mkfyi/nestjs-rmq';
@Module({
imports: [
// ...
RabbitMQModule.forRoot({
connection: {
hostname: '',
username: '',
password: '',
},
}),
],
})
export class AppModule {}
If you prefer to use environment variables, consider adding the @nestjs/config
and use forRootAsync()
method instead.
@Module({
imports: [
// ...
RabbitMQModule.forRootAsync({
connection: {
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
hostname: config.get('AMQP_HOSTNAME'),
username: config.get('AMQP_USERNAME'),
password: config.get('AMQP_PASSWORD'),
}),
inject: [ConfigService],
},
}),
],
})
export class AppModule {}
You can also create multiple connections, just pass the object as above into an array and add the name
property. This name
is being used for the QueueHandler
and QueueAdapter
.
@Module({
imports: [
// ...
RabbitMQModule.forRootAsync({
connection: [
{
name: 'default',
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
hostname: config.get('AMQP_HOSTNAME'),
username: config.get('AMQP_USERNAME'),
password: config.get('AMQP_PASSWORD'),
}),
inject: [ConfigService],
},
{
name: 'stage',
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
hostname: config.get('AMQP_STAGE_HOSTNAME'),
username: config.get('AMQP_STAGE_USERNAME'),
password: config.get('AMQP_STAGE_PASSWORD'),
}),
inject: [ConfigService],
}
],
}),
],
})
export class AppModule {}
You have to configure the adapters
properts in order send messages to the respective queue.
@Module({
imports: [
// ...
RabbitMQModule.forRootAsync({
connection: {
name: 'default',
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
hostname: config.get('AMQP_HOSTNAME'),
username: config.get('AMQP_USERNAME'),
password: config.get('AMQP_PASSWORD'),
}),
inject: [ConfigService],
},
adapters: [
{
name: 'BACKEND_SERVICE',
queue: 'example.worker,
type: QueueAdapterType.Worker,
connection: 'default',
},
],
}),
],
})
export class AppModule {}
The example shown above creates an adapter named BACKEND_SERVICE
for the default
connection. The value of the name
property can be injected as QueueAdapter
using @Inject()
. You may want to change to RpcQueueAdapter
for this type.
@Injectable()
export class MyService {
public constructor(
@Inject('BACKEND_SERVICE')
private readonly worker: QueueAdapter,
) {}
public notifyUsernameUpdate(id: string, name: string) {
return this.worker.send({ id, name });
}
}
Every custom queue handler has to implement the QueueHandler
interface. As for the adapters, there is a separate interface for RPC based handlers called RpcQueueHandler
.
@Worker({ queue: 'example.worker' })
export class ExampleWorkerQueueHandler implements QueueHandler {
public async execute(msg: Message): Promise<void> {
console.log(msg.object());
}
}
Don't forget to add your queue handlers to the application module providers.
@Module({
imports: [
// ...
RabbitMQModule.forRootAsync({
// ...
}),
],
providers: [
// ...
ExampleWorkerQueueHandler,
],
})
export class AppModule {}
nest-rmq is MIT licensed.