diff --git a/core/internal/httpserver/prometheus.go b/core/internal/httpserver/prometheus.go index 97e71146..1ba1059e 100644 --- a/core/internal/httpserver/prometheus.go +++ b/core/internal/httpserver/prometheus.go @@ -73,6 +73,7 @@ func DeleteConsumerMetrics(cluster, consumer string) { consumerStatusGauge.Delete(labels) consumerPartitionLagGauge.DeletePartialMatch(labels) consumerPartitionCurrentOffset.DeletePartialMatch(labels) + partitionStatusGauge.DeletePartialMatch(labels) } // DeleteTopicMetrics deletes all metrics that are labeled with a topic @@ -92,6 +93,19 @@ func DeleteTopicMetrics(cluster, topic string) { consumerStatusGauge.DeletePartialMatch(labels) } +// DeleteConsumerTopicMetrics deletes all metrics that are labeled with the provided consumer group AND topic +func DeleteConsumerTopicMetrics(cluster, consumer, topic string) { + labels := map[string]string{ + "cluster": cluster, + "consumer_group": consumer, + "topic": topic, + } + + partitionStatusGauge.DeletePartialMatch(labels) + consumerPartitionCurrentOffset.DeletePartialMatch(labels) + consumerPartitionLagGauge.DeletePartialMatch(labels) +} + func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc { promHandler := promhttp.Handler() diff --git a/core/internal/storage/inmemory.go b/core/internal/storage/inmemory.go index a4088234..09ddc91f 100644 --- a/core/internal/storage/inmemory.go +++ b/core/internal/storage/inmemory.go @@ -667,16 +667,26 @@ func (module *InMemoryStorage) deleteGroup(request *protocol.StorageRequest, req } clusterMap.consumerLock.Lock() + deleteAllGroupMetrics := true if group, ok := clusterMap.consumer[request.Group]; ok && request.Topic != "" { delete(group.topics, request.Topic) if len(group.topics) == 0 { delete(clusterMap.consumer, request.Group) + } else { + // The consumer group consumes other topics, thus we need to keep its metrics + deleteAllGroupMetrics = false } } else { delete(clusterMap.consumer, request.Group) } clusterMap.consumerLock.Unlock() - httpserver.DeleteConsumerMetrics(request.Cluster, request.Group) + + if deleteAllGroupMetrics { + httpserver.DeleteConsumerMetrics(request.Cluster, request.Group) + } else { + // only a specific topic was deleted and the consumer group still exists, thus we delete only a subset of the metrics + httpserver.DeleteConsumerTopicMetrics(request.Cluster, request.Group, request.Topic) + } requestLogger.Debug("ok") }