diff --git a/go.mod b/go.mod index b3143b9..a9b59e9 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.60.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/twmb/franz-go/pkg/kadm v1.14.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/net v0.30.0 // indirect diff --git a/go.sum b/go.sum index b75502d..29326d0 100644 --- a/go.sum +++ b/go.sum @@ -320,6 +320,8 @@ github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= +github.com/twmb/franz-go/pkg/kadm v1.14.0/go.mod h1:XjOPz6ZaXXjrW2jVCfLuucP8H1w2TvD6y3PT2M+aAM4= github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= diff --git a/minion/list_offsets.go b/minion/list_offsets.go index 2ab38e1..ae8617f 100644 --- a/minion/list_offsets.go +++ b/minion/list_offsets.go @@ -2,21 +2,21 @@ package minion import ( "context" + "errors" "fmt" - "github.com/twmb/franz-go/pkg/kerr" - "go.uber.org/zap" "strconv" "time" - "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/pkg/kadm" + "go.uber.org/zap" ) -func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) { +func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) { reqId := ctx.Value("requestId").(string) key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId if cachedRes, exists := s.getCachedItem(key); exists { - return cachedRes.(*kmsg.ListOffsetsResponse), nil + return cachedRes.(kadm.ListedOffsets), nil } res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) { @@ -33,38 +33,27 @@ func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg return nil, err } - return res.(*kmsg.ListOffsetsResponse), nil + return res.(kadm.ListedOffsets), nil } // ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions -func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) { - metadata, err := s.GetMetadataCached(ctx) +func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) { + listedOffsets, err := s.admClient.ListEndOffsets(ctx) if err != nil { - return nil, fmt.Errorf("failed to list consumer groups: %w", err) - } - - topicReqs := make([]kmsg.ListOffsetsRequestTopic, len(metadata.Topics)) - for i, topic := range metadata.Topics { - req := kmsg.NewListOffsetsRequestTopic() - req.Topic = *topic.Topic - - partitionReqs := make([]kmsg.ListOffsetsRequestTopicPartition, len(topic.Partitions)) - for j, partition := range topic.Partitions { - partitionReqs[j] = kmsg.NewListOffsetsRequestTopicPartition() - partitionReqs[j].Partition = partition.Partition - partitionReqs[j].Timestamp = timestamp + var se *kadm.ShardErrors + if !errors.As(err, &se) { + return nil, fmt.Errorf("failed to list offsets: %w", err) } - req.Partitions = partitionReqs - - topicReqs[i] = req - } - req := kmsg.NewListOffsetsRequest() - req.Topics = topicReqs - - res, err := req.RequestWith(ctx, s.client) - if err != nil { - return res, err + if se.AllFailed { + return nil, fmt.Errorf("failed to list offsets, all shard responses failed: %w", err) + } + s.logger.Info("failed to list offset from some shards", zap.Int("failed_shards", len(se.Errs))) + for _, shardErr := range se.Errs { + s.logger.Warn("shard error for listing end offsets", + zap.Int32("broker_id", shardErr.Broker.NodeID), + zap.Error(shardErr.Err)) + } } // Log inner errors before returning them. We do that inside of this function to avoid duplicate logging as the response @@ -72,25 +61,21 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListO // // Create two metrics to aggregate error logs in few messages. Logging one message per occured partition error // is too much. Typical errors are LEADER_NOT_AVAILABLE etc. - errorCountByErrCode := make(map[int16]int) + errorCountByErrCode := make(map[error]int) errorCountByTopic := make(map[string]int) // Iterate on all partitions - for _, topic := range res.Topics { - for _, partition := range topic.Partitions { - err := kerr.TypedErrorForCode(partition.ErrorCode) - if err != nil { - errorCountByErrCode[partition.ErrorCode]++ - errorCountByTopic[topic.Topic]++ - } + listedOffsets.Each(func(offset kadm.ListedOffset) { + if offset.Err != nil { + errorCountByTopic[offset.Topic]++ + errorCountByErrCode[offset.Err]++ } - } + }) // Print log line for each error type - for errCode, count := range errorCountByErrCode { - typedErr := kerr.TypedErrorForCode(errCode) + for err, count := range errorCountByErrCode { s.logger.Warn("failed to list some partitions watermarks", - zap.Error(typedErr), + zap.Error(err), zap.Int("error_count", count)) } if len(errorCountByTopic) > 0 { @@ -98,5 +83,5 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListO zap.Int("topics_with_errors", len(errorCountByTopic))) } - return res, nil + return listedOffsets, nil } diff --git a/minion/service.go b/minion/service.go index b30b6c9..003bef2 100644 --- a/minion/service.go +++ b/minion/service.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" @@ -33,8 +34,9 @@ type Service struct { AllowedTopicsExpr []*regexp.Regexp IgnoredTopicsExpr []*regexp.Regexp - client *kgo.Client - storage *Storage + client *kgo.Client + admClient *kadm.Client + storage *Storage } func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricsNamespace string, ctx context.Context) (*Service, error) { @@ -82,7 +84,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics AllowedTopicsExpr: allowedTopicsExpr, IgnoredTopicsExpr: ignoredTopicsExpr, - client: client, + client: client, + admClient: kadm.NewClient(client), + storage: storage, } diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index 1629d29..b641766 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -6,8 +6,8 @@ import ( "strconv" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" - "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" "github.com/cloudhut/kminion/v2/minion" @@ -211,61 +211,49 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan return isOk } -func (e *Exporter) waterMarksByTopic(lowMarks *kmsg.ListOffsetsResponse, highMarks *kmsg.ListOffsetsResponse) map[string]map[int32]waterMark { +func (e *Exporter) waterMarksByTopic(lowMarks kadm.ListedOffsets, highMarks kadm.ListedOffsets) map[string]map[int32]waterMark { type partitionID = int32 type topicName = string waterMarks := make(map[topicName]map[partitionID]waterMark) - for _, topic := range lowMarks.Topics { - _, exists := waterMarks[topic.Topic] + for topic, lowMarksByPartitionID := range lowMarks { + _, exists := waterMarks[topic] if !exists { - waterMarks[topic.Topic] = make(map[partitionID]waterMark) + waterMarks[topic] = make(map[partitionID]waterMark) } - for _, partition := range topic.Partitions { - err := kerr.ErrorForCode(partition.ErrorCode) - if err != nil { + + for _, lowOffset := range lowMarksByPartitionID { + if lowOffset.Err != nil { e.logger.Debug("failed to get partition low water mark, inner kafka error", - zap.String("topic_name", topic.Topic), - zap.Int32("partition_id", partition.Partition), - zap.Error(err)) + zap.String("topic_name", lowOffset.Topic), + zap.Int32("partition_id", lowOffset.Partition), + zap.Error(lowOffset.Err)) continue } - waterMarks[topic.Topic][partition.Partition] = waterMark{ - TopicName: topic.Topic, - PartitionID: partition.Partition, - LowWaterMark: partition.Offset, - HighWaterMark: -1, - } - } - } - for _, topic := range highMarks.Topics { - mark, exists := waterMarks[topic.Topic] - if !exists { - e.logger.Error("got high water marks for a topic but no low watermarks", zap.String("topic_name", topic.Topic)) - delete(waterMarks, topic.Topic) - continue - } - for _, partition := range topic.Partitions { - err := kerr.ErrorForCode(partition.ErrorCode) - if err != nil { - e.logger.Debug("failed to get partition high water mark, inner kafka error", - zap.String("topic_name", topic.Topic), - zap.Int32("partition_id", partition.Partition), - zap.Error(err)) - continue - } - partitionMark, exists := mark[partition.Partition] + higOffset, exists := highMarks.Lookup(lowOffset.Topic, lowOffset.Partition) if !exists { - e.logger.Error("got high water marks for a topic's partition but no low watermarks", - zap.String("topic_name", topic.Topic), - zap.Int32("partition_id", partition.Partition), - zap.Int64("offset", partition.Offset)) - delete(waterMarks, topic.Topic) + e.logger.Error("got low water marks for a topic's partition but no high watermarks", + zap.String("topic_name", lowOffset.Topic), + zap.Int32("partition_id", lowOffset.Partition), + zap.Int64("offset", lowOffset.Offset)) + delete(waterMarks, lowOffset.Topic) break // Topic watermarks are invalid -> delete & skip this topic } - partitionMark.HighWaterMark = partition.Offset - waterMarks[topic.Topic][partition.Partition] = partitionMark + if higOffset.Err != nil { + e.logger.Debug("failed to get partition low water mark, inner kafka error", + zap.String("topic_name", lowOffset.Topic), + zap.Int32("partition_id", lowOffset.Partition), + zap.Error(lowOffset.Err)) + continue + } + + waterMarks[lowOffset.Topic][lowOffset.Partition] = waterMark{ + TopicName: lowOffset.Topic, + PartitionID: lowOffset.Partition, + LowWaterMark: lowOffset.Offset, + HighWaterMark: higOffset.Offset, + } } } diff --git a/prometheus/collect_topic_partition_offsets.go b/prometheus/collect_topic_partition_offsets.go index ef03aba..4ca6695 100644 --- a/prometheus/collect_topic_partition_offsets.go +++ b/prometheus/collect_topic_partition_offsets.go @@ -5,7 +5,6 @@ import ( "strconv" "github.com/prometheus/client_golang/prometheus" - "github.com/twmb/franz-go/pkg/kerr" "go.uber.org/zap" "github.com/cloudhut/kminion/v2/minion" @@ -32,21 +31,21 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p } // Process Low Watermarks - for _, topic := range lowWaterMarks.Topics { - if !e.minionSvc.IsTopicAllowed(topic.Topic) { + + for topicName, partitions := range lowWaterMarks { + if !e.minionSvc.IsTopicAllowed(topicName) { continue } waterMarkSum := int64(0) hasErrors := false - for _, partition := range topic.Partitions { - err := kerr.ErrorForCode(partition.ErrorCode) - if err != nil { + for _, offset := range partitions { + if offset.Err != nil { hasErrors = true isOk = false continue } - waterMarkSum += partition.Offset + waterMarkSum += offset.Offset // Let's end here if partition metrics shall not be exposed if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic { continue @@ -54,9 +53,9 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p ch <- prometheus.MustNewConstMetric( e.partitionLowWaterMark, prometheus.GaugeValue, - float64(partition.Offset), - topic.Topic, - strconv.Itoa(int(partition.Partition)), + float64(offset.Offset), + topicName, + strconv.Itoa(int(offset.Partition)), ) } // We only want to report the sum of all partition marks if we receive watermarks from all partition @@ -65,25 +64,24 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p e.topicLowWaterMarkSum, prometheus.GaugeValue, float64(waterMarkSum), - topic.Topic, + topicName, ) } } - for _, topic := range highWaterMarks.Topics { - if !e.minionSvc.IsTopicAllowed(topic.Topic) { + for topicName, partitions := range highWaterMarks { + if !e.minionSvc.IsTopicAllowed(topicName) { continue } waterMarkSum := int64(0) hasErrors := false - for _, partition := range topic.Partitions { - err := kerr.ErrorForCode(partition.ErrorCode) - if err != nil { + for _, offset := range partitions { + if offset.Err != nil { hasErrors = true isOk = false continue } - waterMarkSum += partition.Offset + waterMarkSum += offset.Offset // Let's end here if partition metrics shall not be exposed if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic { continue @@ -91,9 +89,9 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p ch <- prometheus.MustNewConstMetric( e.partitionHighWaterMark, prometheus.GaugeValue, - float64(partition.Offset), - topic.Topic, - strconv.Itoa(int(partition.Partition)), + float64(offset.Offset), + topicName, + strconv.Itoa(int(offset.Partition)), ) } // We only want to report the sum of all partition marks if we receive watermarks from all partitions @@ -102,7 +100,7 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p e.topicHighWaterMarkSum, prometheus.GaugeValue, float64(waterMarkSum), - topic.Topic, + topicName, ) } }