-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RabbitMQ transport doesn't allow custom message headers. #202
Comments
hi @vsabirov, I had a read through the rabbit implementation again. Generally one of the goals of this library is to abstract away transport-specific features so that they're largely interchangeable. With that said, the ability to specify custom headers is understandable and I wanted to confirm some behaviour with you. If you were to do Currently per Would this work for you if all attributes were spread onto the headers? ie:
|
@adenhertog Perhaps the ability to specify transport-agnostic attributes in a special header should be kept as is. Sure, it will work fine, however i'm afraid that the user can overwrite some special RMQ headers by accident and will be forced to rename their attributes to avoid collisions. Perhaps something like const rawHeaders = messageOptions.attributes.rawHeaders and headers: {
attributes: messageOptions.attributes ? JSON.stringify(messageOptions.attributes) : undefined,
stickyAttributes: messageOptions.stickyAttributes ? JSON.stringify(messageOptions.stickyAttributes) : undefined
...rawHeaders
} And to use that you'll just have to specify a |
@vsabirov I'm happy with what you've proposed - the reasoning makes a lot of sense. I can't get around to this immediately, but I'm open to accepting PRs if you need this soon |
@adenhertog I propose to hide access to internal attributes from regular use with symbols. The user should understand that this is an internal API. I see a solution to this problem as follows @node-ts/bus-messagesexport kInternalAttributes = Symbol('@node-ts/bus-messages/internal-attributes')
export type InternalAttributes<T> = T
export interface MessageAttributes<
AttributesType extends MessageAttributeMap = MessageAttributeMap,
StickyAttributesType extends MessageAttributeMap = MessageAttributeMap,
InternalAttributesType = InternalAttributes<unknown>
>
correlationId?: Uuid;
attributes: AttributesType;
stickyAttributes: StickyAttributesType;
[kInternalAttributes]: InternalAttributesType
} @node-ts/bus-rabbitmqexport const kRabbitMQHeaders = Symbol('@node-ts/bus-rabbitmq/rabbitmq-headers')
export interface RabbitMQInternalAttributes = {
headers: MessageAttributeMap
}
private async publishMessage(
message: Message,
messageOptions: MessageAttributes<..., ..., RabbitMQInternalAttributes> = { attributes: {}, stickyAttributes: {}, internalAttributes: {} }
): Promise<void> {
await this.assertExchange(message.$name)
const payload = this.coreDependencies.messageSerializer.serialize(message)
const internalAttributes = messageOptions[kInternalAttributes]
this.channel.publish(message.$name, '', Buffer.from(payload), {
correlationId: messageOptions.correlationId,
messageId: uuid.v4(),
headers: {
attributes: messageOptions.attributes
? JSON.stringify(messageOptions.attributes)
: undefined,
stickyAttributes: messageOptions.stickyAttributes
? JSON.stringify(messageOptions.stickyAttributes)
: undefined
...internalAttributes.headers
}
})
} Usageimport { kInternalAttributes } from '@node-ts/bus-messages'
bus.publish(new Message(), {
[kInternalAttributes]: { // typed
headers: {
'x-delay': 5000
}
}
}) This is a draft solution, I'm just suggesting a concept |
Hello, this is a very great library! However, there is one small issue regarding the RabbitMQ transport.
Take rabbitmq-delayed-message-exchange plugin for the RabbitMQ server as an example.
Given an up and running RabbitMQ server with the rabbitmq-delayed-message-exchange plugin installed, the task is to simply use it and send a delayed message. For example, notify an external web server with a request after 1 hour, and if it succeeds, send an event down the bus, or else send another notification command down the bus that would be delayed for another 1 hour in case the external server is down or whatever.
This is currently impossible with node-ts/bus, because:
publishMessage builds its own headers from MessageAttributes and doesn't allow user-specified custom headers, so we can't use the
x-delay
header required for the delayed message.There is no possible way to create an exchange with a custom type, in our case
x-delayed-type
is needed. This is because all methods and fields to achieve this are privated out, so there is no intended way.It would be much more helpful to be able to specify custom message headers and assert custom exchanges.
The text was updated successfully, but these errors were encountered: