You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
As you see it's possible that __consumer_offsets had records before and doesn't have them right now. I means that Low Water Mark = High Water Mark != 0
offset_consumer.gocheckIfConsumerLagIsCaughtUp internally does the following:
highMarksRes, err:=offsetReq.RequestWith(ctx, s.kafkaSvc.Client)
...consumedOffsets:=s.storage.getConsumedOffsets()
...highWaterMark:=partition.Offset-1consumedOffset:=consumedOffsets[partition.Partition]
partitionLag:=highWaterMark-consumedOffsetifpartitionLag<0 {
partitionLag=0
}
ifpartitionLag>0 {
partitionsLagging++totalLag+=partitionLags.logger.Debug("consumer_offsets topic lag has not been caught up yet",
zap.Int32("partition_id", partition.Partition),
zap.Int64("high_water_mark", highWaterMark),
zap.Int64("consumed_offset", consumedOffset),
zap.Int64("partition_lag", partitionLag))
isReady=falsecontinue
}
partitionLag in this case is always greater than 0 isReady is always false.
It looks like there should be a check for default value for consumerOffset (0) somewhere before final condition.
The text was updated successfully, but these errors were encountered:
In case the __consumer_offsets topic is incomplete (due to configured retention settings) you should not use this mode as well, because it would yield wrong results. It has no chance of knowing the current group offset unless it would use the Kafka API again. This would unblock the start, but it would emit wrong metrics, or am I misunderstanding something?
We use the following settings for
__consumer_offsets
:As you see it's possible that
__consumer_offsets
had records before and doesn't have them right now. I means that Low Water Mark = High Water Mark != 0offset_consumer.go
checkIfConsumerLagIsCaughtUp
internally does the following:partitionLag
in this case is always greater than 0isReady
is alwaysfalse
.It looks like there should be a check for default value for
consumerOffset
(0
) somewhere before final condition.The text was updated successfully, but these errors were encountered: