Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend: use kadm client to list partition offsets #277

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/twmb/franz-go/pkg/kadm v1.14.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs=
github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4=
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
Expand Down
73 changes: 29 additions & 44 deletions minion/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package minion

import (
"context"
"errors"
"fmt"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"
"strconv"
"time"

"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kadm"
"go.uber.org/zap"
)

func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) {
func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
reqId := ctx.Value("requestId").(string)
key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId

if cachedRes, exists := s.getCachedItem(key); exists {
return cachedRes.(*kmsg.ListOffsetsResponse), nil
return cachedRes.(kadm.ListedOffsets), nil
}

res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) {
Expand All @@ -33,70 +33,55 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg
return nil, err
}

return res.(*kmsg.ListOffsetsResponse), nil
return res.(kadm.ListedOffsets), nil
}

// ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) {
metadata, err := s.GetMetadataCached(ctx)
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
listedOffsets, err := s.admClient.ListEndOffsets(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list consumer groups: %w", err)
}

topicReqs := make([]kmsg.ListOffsetsRequestTopic, len(metadata.Topics))
for i, topic := range metadata.Topics {
req := kmsg.NewListOffsetsRequestTopic()
req.Topic = *topic.Topic

partitionReqs := make([]kmsg.ListOffsetsRequestTopicPartition, len(topic.Partitions))
for j, partition := range topic.Partitions {
partitionReqs[j] = kmsg.NewListOffsetsRequestTopicPartition()
partitionReqs[j].Partition = partition.Partition
partitionReqs[j].Timestamp = timestamp
var se *kadm.ShardErrors
if !errors.As(err, &se) {
return nil, fmt.Errorf("failed to list offsets: %w", err)
}
req.Partitions = partitionReqs

topicReqs[i] = req
}

req := kmsg.NewListOffsetsRequest()
req.Topics = topicReqs

res, err := req.RequestWith(ctx, s.client)
if err != nil {
return res, err
if se.AllFailed {
return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err)
}
s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs)))
for _, shardErr := range se.Errs {
s.logger.Warn("shard error for listing end offsets",
zap.Int32("broker_id", shardErr.Broker.NodeID),
zap.Error(shardErr.Err))
}
}

// Log inner errors before returning them. We do that inside of this function to avoid duplicate logging as the response
// are cached for each scrape anyways.
//
// Create two metrics to aggregate error logs in few messages. Logging one message per occured partition error
// is too much. Typical errors are LEADER_NOT_AVAILABLE etc.
errorCountByErrCode := make(map[int16]int)
errorCountByErrCode := make(map[error]int)
errorCountByTopic := make(map[string]int)

// Iterate on all partitions
for _, topic := range res.Topics {
for _, partition := range topic.Partitions {
err := kerr.TypedErrorForCode(partition.ErrorCode)
if err != nil {
errorCountByErrCode[partition.ErrorCode]++
errorCountByTopic[topic.Topic]++
}
listedOffsets.Each(func(offset kadm.ListedOffset) {
if offset.Err != nil {
errorCountByTopic[offset.Topic]++
errorCountByErrCode[offset.Err]++
}
}
})

// Print log line for each error type
for errCode, count := range errorCountByErrCode {
typedErr := kerr.TypedErrorForCode(errCode)
for err, count := range errorCountByErrCode {
s.logger.Warn("failed to list some partitions watermarks",
zap.Error(typedErr),
zap.Error(err),
zap.Int("error_count", count))
}
if len(errorCountByTopic) > 0 {
s.logger.Warn("some topics had one or more partitions whose watermarks could not be fetched from Kafka",
zap.Int("topics_with_errors", len(errorCountByTopic)))
}

return res, nil
return listedOffsets, nil
}
10 changes: 7 additions & 3 deletions minion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kversion"
Expand All @@ -33,8 +34,9 @@ type Service struct {
AllowedTopicsExpr []*regexp.Regexp
IgnoredTopicsExpr []*regexp.Regexp

client *kgo.Client
storage *Storage
client *kgo.Client
admClient *kadm.Client
storage *Storage
}

func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricsNamespace string, ctx context.Context) (*Service, error) {
Expand Down Expand Up @@ -82,7 +84,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
AllowedTopicsExpr: allowedTopicsExpr,
IgnoredTopicsExpr: ignoredTopicsExpr,

client: client,
client: client,
admClient: kadm.NewClient(client),

storage: storage,
}

Expand Down
74 changes: 31 additions & 43 deletions prometheus/collect_consumer_group_lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"

"github.com/cloudhut/kminion/v2/minion"
Expand Down Expand Up @@ -211,61 +211,49 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan
return isOk
}

func (e *Exporter) waterMarksByTopic(lowMarks *kmsg.ListOffsetsResponse, highMarks *kmsg.ListOffsetsResponse) map[string]map[int32]waterMark {
func (e *Exporter) waterMarksByTopic(lowMarks kadm.ListedOffsets, highMarks kadm.ListedOffsets) map[string]map[int32]waterMark {
type partitionID = int32
type topicName = string
waterMarks := make(map[topicName]map[partitionID]waterMark)

for _, topic := range lowMarks.Topics {
_, exists := waterMarks[topic.Topic]
for topic, lowMarksByPartitionID := range lowMarks {
_, exists := waterMarks[topic]
if !exists {
waterMarks[topic.Topic] = make(map[partitionID]waterMark)
waterMarks[topic] = make(map[partitionID]waterMark)
}
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {

for _, lowOffset := range lowMarksByPartitionID {
if lowOffset.Err != nil {
e.logger.Debug("failed to get partition low water mark, inner kafka error",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Error(err))
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Error(lowOffset.Err))
continue
}
waterMarks[topic.Topic][partition.Partition] = waterMark{
TopicName: topic.Topic,
PartitionID: partition.Partition,
LowWaterMark: partition.Offset,
HighWaterMark: -1,
}
}
}

for _, topic := range highMarks.Topics {
mark, exists := waterMarks[topic.Topic]
if !exists {
e.logger.Error("got high water marks for a topic but no low watermarks", zap.String("topic_name", topic.Topic))
delete(waterMarks, topic.Topic)
continue
}
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
e.logger.Debug("failed to get partition high water mark, inner kafka error",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Error(err))
continue
}
partitionMark, exists := mark[partition.Partition]
higOffset, exists := highMarks.Lookup(lowOffset.Topic, lowOffset.Partition)
if !exists {
e.logger.Error("got high water marks for a topic's partition but no low watermarks",
zap.String("topic_name", topic.Topic),
zap.Int32("partition_id", partition.Partition),
zap.Int64("offset", partition.Offset))
delete(waterMarks, topic.Topic)
e.logger.Error("got low water marks for a topic's partition but no high watermarks",
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Int64("offset", lowOffset.Offset))
delete(waterMarks, lowOffset.Topic)
break // Topic watermarks are invalid -> delete & skip this topic
}
partitionMark.HighWaterMark = partition.Offset
waterMarks[topic.Topic][partition.Partition] = partitionMark
if higOffset.Err != nil {
e.logger.Debug("failed to get partition low water mark, inner kafka error",
zap.String("topic_name", lowOffset.Topic),
zap.Int32("partition_id", lowOffset.Partition),
zap.Error(lowOffset.Err))
continue
}

waterMarks[lowOffset.Topic][lowOffset.Partition] = waterMark{
TopicName: lowOffset.Topic,
PartitionID: lowOffset.Partition,
LowWaterMark: lowOffset.Offset,
HighWaterMark: higOffset.Offset,
}
}
}

Expand Down
40 changes: 19 additions & 21 deletions prometheus/collect_topic_partition_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"

"github.com/cloudhut/kminion/v2/minion"
Expand All @@ -32,31 +31,31 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
}

// Process Low Watermarks
for _, topic := range lowWaterMarks.Topics {
if !e.minionSvc.IsTopicAllowed(topic.Topic) {

for topicName, partitions := range lowWaterMarks {
if !e.minionSvc.IsTopicAllowed(topicName) {
continue
}

waterMarkSum := int64(0)
hasErrors := false
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
for _, offset := range partitions {
if offset.Err != nil {
hasErrors = true
isOk = false
continue
}
waterMarkSum += partition.Offset
waterMarkSum += offset.Offset
// Let's end here if partition metrics shall not be exposed
if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic {
continue
}
ch <- prometheus.MustNewConstMetric(
e.partitionLowWaterMark,
prometheus.GaugeValue,
float64(partition.Offset),
topic.Topic,
strconv.Itoa(int(partition.Partition)),
float64(offset.Offset),
topicName,
strconv.Itoa(int(offset.Partition)),
)
}
// We only want to report the sum of all partition marks if we receive watermarks from all partition
Expand All @@ -65,35 +64,34 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
e.topicLowWaterMarkSum,
prometheus.GaugeValue,
float64(waterMarkSum),
topic.Topic,
topicName,
)
}
}

for _, topic := range highWaterMarks.Topics {
if !e.minionSvc.IsTopicAllowed(topic.Topic) {
for topicName, partitions := range highWaterMarks {
if !e.minionSvc.IsTopicAllowed(topicName) {
continue
}
waterMarkSum := int64(0)
hasErrors := false
for _, partition := range topic.Partitions {
err := kerr.ErrorForCode(partition.ErrorCode)
if err != nil {
for _, offset := range partitions {
if offset.Err != nil {
hasErrors = true
isOk = false
continue
}
waterMarkSum += partition.Offset
waterMarkSum += offset.Offset
// Let's end here if partition metrics shall not be exposed
if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic {
continue
}
ch <- prometheus.MustNewConstMetric(
e.partitionHighWaterMark,
prometheus.GaugeValue,
float64(partition.Offset),
topic.Topic,
strconv.Itoa(int(partition.Partition)),
float64(offset.Offset),
topicName,
strconv.Itoa(int(offset.Partition)),
)
}
// We only want to report the sum of all partition marks if we receive watermarks from all partitions
Expand All @@ -102,7 +100,7 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p
e.topicHighWaterMarkSum,
prometheus.GaugeValue,
float64(waterMarkSum),
topic.Topic,
topicName,
)
}
}
Expand Down