Skip to content

Commit

Permalink
Merge pull request #1 from Financial-Times/contexts
Browse files Browse the repository at this point in the history
Contexts
  • Loading branch information
peteclark-ft authored Dec 14, 2018
2 parents e2edea9 + 928fd87 commit fddecb2
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package consumergroup

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kazoo-go"
"golang.org/x/time/rate"
)

var (
Expand Down Expand Up @@ -38,6 +40,10 @@ func NewConfig() *Config {
return config
}

func newDefaultLimiter() *rate.Limiter {
return rate.NewLimiter(rate.Every(time.Second), 4)
}

func (cgc *Config) Validate() error {
if cgc.Zookeeper.Timeout <= 0 {
return sarama.ConfigurationError("ZookeeperTimeout should have a duration > 0")
Expand Down Expand Up @@ -251,32 +257,37 @@ func (cg *ConsumerGroup) FlushOffsets() error {
}

func (cg *ConsumerGroup) topicListConsumer(topics []string) {
limiter := newDefaultLimiter()
for {
select {
case <-cg.stopper:
return
default:
}

ctx, cancel := context.WithCancel(context.Background())
limiter.Wait(ctx)

consumers, consumerChanges, err := cg.group.WatchInstances()
if err != nil {
cg.Logf("FAILED to get list of registered consumer instances: %s\n", err)
cancel()
return
}

cg.consumers = consumers
cg.Logf("Currently registered consumers: %d\n", len(cg.consumers))

stopper := make(chan struct{})

for _, topic := range topics {
cg.wg.Add(1)
go cg.topicConsumer(topic, cg.messages, cg.errors, stopper)
go cg.topicConsumer(ctx, cancel, topic, cg.messages, cg.errors)
}

select {
case <-ctx.Done():
cg.wg.Wait()
case <-cg.stopper:
close(stopper)
cancel()
return

case <-consumerChanges:
Expand All @@ -293,17 +304,17 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {
}

cg.Logf("Triggering rebalance due to consumer list change\n")
close(stopper)
cancel()
cg.wg.Wait()
}
}
}

func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error, stopper <-chan struct{}) {
func (cg *ConsumerGroup) topicConsumer(ctx context.Context, cancel context.CancelFunc, topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
defer cg.wg.Done()

select {
case <-stopper:
case <-ctx.Done():
return
default:
}
Expand All @@ -319,6 +330,7 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
Partition: -1,
Err: err,
}
cancel()
return
}

Expand All @@ -330,6 +342,7 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
Partition: -1,
Err: err,
}
cancel()
return
}

Expand All @@ -342,7 +355,7 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con
for _, pid := range myPartitions {

wg.Add(1)
go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper)
go cg.partitionConsumer(ctx, topic, pid.ID, messages, errors, &wg)
}

wg.Wait()
Expand Down Expand Up @@ -374,7 +387,7 @@ func (cg *ConsumerGroup) consumePartition(topic string, partition int32, nextOff
}

// Consumes a partition
func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- error, wg *sync.WaitGroup, stopper <-chan struct{}) {
func (cg *ConsumerGroup) partitionConsumer(ctx context.Context, topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- error, wg *sync.WaitGroup) {
defer wg.Done()

// Since ProcessingTimeout is the amount of time we'll wait for the final batch
Expand All @@ -385,7 +398,7 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag
partitionClaimLoop:
for tries := 0; tries < maxRetries; tries++ {
select {
case <-stopper:
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
if err := cg.instance.ClaimPartition(topic, partition); err == nil {
Expand Down Expand Up @@ -452,7 +465,7 @@ partitionClaimLoop:
partitionConsumerLoop:
for {
select {
case <-stopper:
case <-ctx.Done():
break partitionConsumerLoop

case err := <-consumer.Errors():
Expand All @@ -473,7 +486,7 @@ partitionConsumerLoop:
case errors <- err:
continue partitionConsumerLoop

case <-stopper:
case <-ctx.Done():
break partitionConsumerLoop
}
}
Expand All @@ -494,7 +507,7 @@ partitionConsumerLoop:

for {
select {
case <-stopper:
case <-ctx.Done():
break partitionConsumerLoop

case messages <- message:
Expand Down

0 comments on commit fddecb2

Please sign in to comment.