From 928fd8736b844d2915f9b62b784ebcc824564146 Mon Sep 17 00:00:00 2001 From: Pete Clark Date: Wed, 28 Nov 2018 11:34:24 +0000 Subject: [PATCH] :sparkles: Adding a rate limiter to throttle reconnectsd --- consumergroup/consumer_group.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/consumergroup/consumer_group.go b/consumergroup/consumer_group.go index f4a0f9b..4829818 100644 --- a/consumergroup/consumer_group.go +++ b/consumergroup/consumer_group.go @@ -9,6 +9,7 @@ import ( "github.com/Shopify/sarama" "github.com/wvanbergen/kazoo-go" + "golang.org/x/time/rate" ) var ( @@ -39,6 +40,10 @@ func NewConfig() *Config { return config } +func newDefaultLimiter() *rate.Limiter { + return rate.NewLimiter(rate.Every(time.Second), 4) +} + func (cgc *Config) Validate() error { if cgc.Zookeeper.Timeout <= 0 { return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0") @@ -252,6 +257,7 @@ func (cg *ConsumerGroup) FlushOffsets() error { } func (cg *ConsumerGroup) topicListConsumer(topics []string) { + limiter := newDefaultLimiter() for { select { case <-cg.stopper: @@ -259,16 +265,19 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { default: } + ctx, cancel := context.WithCancel(context.Background()) + limiter.Wait(ctx) + consumers, consumerChanges, err := cg.group.WatchInstances() if err != nil { cg.Logf("FAILED to get list of registered consumer instances: %s\n", err) + cancel() return } cg.consumers = consumers cg.Logf("Currently registered consumers: %d\n", len(cg.consumers)) - ctx, cancel := context.WithCancel(context.Background()) for _, topic := range topics { cg.wg.Add(1) go cg.topicConsumer(ctx, cancel, topic, cg.messages, cg.errors)