Allow us to add queue sub to existing consumer #244
-
Is your feature request related to a problem? Please describe.the problem is that dynamically adding more subscribers (more queues) to a Rabbit ch.Consume call is not possible. We have to call ch.Consume for every time we have more topics/queues to subscribe to. Whereas in the 2nd example, the pattern for Redis seems more performant and sustainable for real/large systems Here is the minimum viable example: package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"log"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, err := conn.Channel()
defer ch.Close()
// Function to dynamically declare and consume from a new queue
consumeFromQueue := func(queueName string) {
q, err := ch.QueueDeclare(
// ....
)
msgs, err := ch.Consume(
q.Name, // queue
// ...
)
go func() {
for d := range msgs { // storing extra memory here with this "permanent" goroutine
log.Printf("Received a message: %s", d.Body)
}
}()
fmt.Println("Subscribed to:", queueName)
}
// Subscribe to the initial queue
consumeFromQueue("queue1")
// Dynamically subscribe (in future) to more queues as needed
go func(){
consumeFromQueue("queue2")
consumeFromQueue("queue3")
}()
// Block forever (for this example)
forever := make(chan bool)
<-forever
} But the above ^^ seems suboptimal performance-wise - I am looking for something more like this Redis code: package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis server address
})
ctx := context.Background()
pubsub := rdb.Subscribe(ctx)
defer pubsub.Close()
// Function to dynamically add channels to the subscription
addChannel := func(channel string) {
err := pubsub.Subscribe(ctx, channel)
if err != nil {
fmt.Println("Subscribe error:", err)
return
}
fmt.Println("Subscribed to:", channel)
}
// Add initial channel
addChannel("channel1")
// Simulate dynamically adding more channels
go func() {
time.Sleep(5 * time.Second)
addChannel("channel2")
time.Sleep(5 * time.Second)
addChannel("channel3")
}()
// read all messages for all channels/topics
for msg := range pubsub.Channel() {
fmt.Println(msg.Channel, msg.Payload)
}
} with Redis, we can add new channels/topics to an existing subscriber (saving memory and cpu, in theory). We should be able to add the queue to existing subscriber (I suppose, to share a channel). Describe the solution you'd likeThe redis code (above) is what I want, that's a good system Describe alternatives you've consideredNot sure, open to alternatives Additional contextNo response |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 5 replies
-
Is it, though? Have you benchmarked it to demonstrate that the current behavior does not meet your needs? |
Beta Was this translation helpful? Give feedback.
-
I also just remembered that the AMQP protocol allows a single channel to consume from multiple queues. That seems like it fits your requirement. Give that a try. |
Beta Was this translation helpful? Give feedback.
-
The protocol and client library design assumes a few things:
Specifically topic exchanges can support a wide range of complex patterns, and in many cases help reduce the topology churn. Instead of making generalized claims that something is "perplexing", "antiquated", and other completely unbacked bs like that, perhaps you should present your problem, explain you decisions, and ask where you may be missing something important. Or just go use Redis or whatever else you like, and abuse their maintainers when something isn't up to your liking. |
Beta Was this translation helpful? Give feedback.
-
What the protocol does not assume is that a consumer (a subscription) is reused across queues. Reusing an object or function in an application is not the same as reusing a subscription: AMQP 0-9-1 does not allow you to do that. In fact, neither do MQTT, STOMP, and to my knowledge AMQP 1.0. This piece of code in a loop consumeFromQueue := func(queueName string) {
q, err := ch.QueueDeclare(
// ....
)
msgs, err := ch.Consume(
q.Name, // queue
// ...
) would be a loop of subscriptions in MQTT, STOMP (if the destinations are known to exist, otherwise they will have to be declared by the broker), AMQP 1.0, and the RabbitMQ stream protocol. Not a whole lot of difference with AMQP 0-9-1 semantically, although one protocol method (a round trip in this case, to declare a queue) would be avoided. So your options are (pick some or do all of them):
Each consumer consumes from only one queue at a time in AMQP 0-9-1. It is also roughly the same story in other protocols but the step of declaring a queue (destination) may be implicit. |
Beta Was this translation helpful? Give feedback.
I also just remembered that the AMQP protocol allows a single channel to consume from multiple queues. That seems like it fits your requirement. Give that a try.