Skip to content

Commit

Permalink
✨ Adding a rate limiter to throttle reconnectsd
Browse files Browse the repository at this point in the history
  • Loading branch information
peteclark-ft committed Nov 28, 2018
1 parent 50ba9d7 commit 928fd87
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/Shopify/sarama"
"github.com/wvanbergen/kazoo-go"
"golang.org/x/time/rate"
)

var (
Expand Down Expand Up @@ -39,6 +40,10 @@ func NewConfig() *Config {
return config
}

func newDefaultLimiter() *rate.Limiter {
return rate.NewLimiter(rate.Every(time.Second), 4)
}

func (cgc *Config) Validate() error {
if cgc.Zookeeper.Timeout <= 0 {
return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0")
Expand Down Expand Up @@ -252,23 +257,27 @@ func (cg *ConsumerGroup) FlushOffsets() error {
}

func (cg *ConsumerGroup) topicListConsumer(topics []string) {
limiter := newDefaultLimiter()
for {
select {
case <-cg.stopper:
return
default:
}

ctx, cancel := context.WithCancel(context.Background())
limiter.Wait(ctx)

consumers, consumerChanges, err := cg.group.WatchInstances()
if err != nil {
cg.Logf("FAILED to get list of registered consumer instances: %s\n", err)
cancel()
return
}

cg.consumers = consumers
cg.Logf("Currently registered consumers: %d\n", len(cg.consumers))

ctx, cancel := context.WithCancel(context.Background())
for _, topic := range topics {
cg.wg.Add(1)
go cg.topicConsumer(ctx, cancel, topic, cg.messages, cg.errors)
Expand Down

0 comments on commit 928fd87

Please sign in to comment.