From 3ae88aa94cc70e1e3b4374c229c222dd5891e801 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 14 Nov 2024 12:29:57 -0500 Subject: [PATCH 1/5] `worker`: add `tombstone_probability` as parameter Allows the `producer_worker` and `repeater_worker` to generate tombstone records. --- cmd/kgo-repeater/main.go | 11 ++++++----- cmd/kgo-verifier/main.go | 3 ++- pkg/worker/repeater/repeater_worker.go | 4 ++-- pkg/worker/verifier/producer_worker.go | 9 +++++---- pkg/worker/worker.go | 9 +++++++-- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cmd/kgo-repeater/main.go b/cmd/kgo-repeater/main.go index 071f795..d3e750b 100644 --- a/cmd/kgo-repeater/main.go +++ b/cmd/kgo-repeater/main.go @@ -50,8 +50,9 @@ var ( transactionAbortRate = flag.Float64("transaction-abort-rate", 0.0, "The probability that any given transaction should abort") msgsPerTransaction = flag.Uint("msgs-per-transaction", 1, "The number of messages that should be in a given transaction") - compressionType = flag.String("compression-type", "", "One of gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer") - compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload") + compressionType = flag.String("compression-type", "", "One of gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer") + compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload") + tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.") ) // NewAdmin returns a franz-go admin client. @@ -150,7 +151,7 @@ func main() { // it was refactored. wConfig := worker.NewWorkerConfig( name, *brokers, *trace, topicsList[0], *linger, *maxBufferedRecords, *useTransactions, *compressionType, *compressiblePayload, *username, *password, *enableTls) - config := repeater.NewRepeaterConfig(wConfig, topicsList, *group, *keys, *payloadSize, dataInFlightPerWorker, rateLimitPerWorker) + config := repeater.NewRepeaterConfig(wConfig, topicsList, *group, *keys, *payloadSize, dataInFlightPerWorker, rateLimitPerWorker, *tombstoneProbability) lv := repeater.NewWorker(config) if *useTransactions { tconfig := worker.NewTransactionSTMConfig(*transactionAbortRate, *msgsPerTransaction) @@ -212,9 +213,9 @@ func main() { go func() { listenAddr := fmt.Sprintf("0.0.0.0:%d", *remotePort) if err := http.ListenAndServe(listenAddr, mux); err != nil { - panic(fmt.Sprintf("failed to listen on %s: %v", listenAddr, err)); + panic(fmt.Sprintf("failed to listen on %s: %v", listenAddr, err)) } - }(); + }() if !*remote { admin, err := NewAdmin() diff --git a/cmd/kgo-verifier/main.go b/cmd/kgo-verifier/main.go index e8a5e59..ff74bdd 100644 --- a/cmd/kgo-verifier/main.go +++ b/cmd/kgo-verifier/main.go @@ -72,6 +72,7 @@ var ( tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events") tolerateFailedProduce = flag.Bool("tolerate-failed-produce", false, "If true, tolerate and retry failed produce") + tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.") ) func makeWorkerConfig() worker.WorkerConfig { @@ -245,7 +246,7 @@ func main() { if *pCount > 0 { log.Info("Starting producer...") - pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId) + pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability) pw := verifier.NewProducerWorker(pwc) if *useTransactions { diff --git a/pkg/worker/repeater/repeater_worker.go b/pkg/worker/repeater/repeater_worker.go index 165065e..0863054 100644 --- a/pkg/worker/repeater/repeater_worker.go +++ b/pkg/worker/repeater/repeater_worker.go @@ -52,13 +52,13 @@ type RepeaterConfig struct { RateLimitBps int } -func NewRepeaterConfig(cfg worker.WorkerConfig, topics []string, group string, keys uint64, payloadSize uint64, dataInFlight uint64, rateLimitBps int) RepeaterConfig { +func NewRepeaterConfig(cfg worker.WorkerConfig, topics []string, group string, keys uint64, payloadSize uint64, dataInFlight uint64, rateLimitBps int, tombstoneProbability float64) RepeaterConfig { return RepeaterConfig{ workerCfg: cfg, Topics: topics, Group: group, KeySpace: worker.KeySpace{UniqueCount: keys}, - ValueGenerator: worker.ValueGenerator{PayloadSize: payloadSize, Compressible: cfg.CompressiblePayload}, + ValueGenerator: worker.ValueGenerator{PayloadSize: payloadSize, Compressible: cfg.CompressiblePayload, TombstoneProbability: tombstoneProbability}, DataInFlight: dataInFlight, RateLimitBps: rateLimitBps, } diff --git a/pkg/worker/verifier/producer_worker.go b/pkg/worker/verifier/producer_worker.go index a8c2348..9d644fa 100644 --- a/pkg/worker/verifier/producer_worker.go +++ b/pkg/worker/verifier/producer_worker.go @@ -34,7 +34,7 @@ type ProducerConfig struct { } func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32, - messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int) ProducerConfig { + messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64) ProducerConfig { return ProducerConfig{ workerCfg: wc, name: name, @@ -47,8 +47,9 @@ func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32, keySetCardinality: keySetCardinality, messagesPerProducerId: messagesPerProducerId, valueGenerator: worker.ValueGenerator{ - PayloadSize: uint64(messageSize), - Compressible: wc.CompressiblePayload, + PayloadSize: uint64(messageSize), + Compressible: wc.CompressiblePayload, + TombstoneProbability: tombstoneProbability, }, } } @@ -108,7 +109,7 @@ func (pw *ProducerWorker) newRecord(producerId int, sequence int64) *kgo.Record pw.Status.AbortedTransactionMessages += 1 } - payload := make([]byte, pw.config.messageSize) + payload := pw.config.valueGenerator.Generate() var r *kgo.Record if pw.config.keySetCardinality < 0 { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 2de56e5..b46b45b 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -100,13 +100,18 @@ type KeySpace struct { } type ValueGenerator struct { - PayloadSize uint64 - Compressible bool + PayloadSize uint64 + Compressible bool + TombstoneProbability float64 } var compressible_payload []byte func (vg *ValueGenerator) Generate() []byte { + isTombstone := rand.Float64() < vg.TombstoneProbability + if isTombstone { + return nil + } if vg.Compressible { // Zeros, which is about as compressible as an array can be. if len(compressible_payload) == 0 { From 583032e9601fbef7779230a79eb3a2c8e9baeee1 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 14 Nov 2024 12:31:30 -0500 Subject: [PATCH 2/5] `worker`: add `compacted` parameter Offset gaps exist in compacted topics. Suppress a warning log in the read workers if the user passes a `--compacted` flag as a parameter. --- cmd/kgo-verifier/main.go | 7 ++++--- pkg/worker/verifier/group_read_worker.go | 4 ++-- pkg/worker/verifier/random_read_worker.go | 4 ++-- pkg/worker/verifier/seq_read_worker.go | 4 ++-- pkg/worker/verifier/validator_status.go | 14 +++++++++++--- pkg/worker/verifier/validator_status_test.go | 9 ++++----- 6 files changed, 25 insertions(+), 17 deletions(-) diff --git a/cmd/kgo-verifier/main.go b/cmd/kgo-verifier/main.go index ff74bdd..475a070 100644 --- a/cmd/kgo-verifier/main.go +++ b/cmd/kgo-verifier/main.go @@ -73,6 +73,7 @@ var ( tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events") tolerateFailedProduce = flag.Bool("tolerate-failed-produce", false, "If true, tolerate and retry failed produce") tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.") + compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not.") ) func makeWorkerConfig() worker.WorkerConfig { @@ -262,7 +263,7 @@ func main() { srw := verifier.NewSeqReadWorker(verifier.NewSeqReadConfig( makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount, (*consumeTputMb)*1024*1024, - )) + ), verifier.NewValidatorStatus(*compacted)) workers = append(workers, &srw) for loopState.Next() { @@ -281,7 +282,7 @@ func main() { workerCfg := verifier.NewRandomReadConfig( makeWorkerConfig(), fmt.Sprintf("random-%03d", i), nPartitions, *cCount, ) - worker := verifier.NewRandomReadWorker(workerCfg) + worker := verifier.NewRandomReadWorker(workerCfg, verifier.NewValidatorStatus(*compacted)) randomWorkers = append(randomWorkers, &worker) workers = append(workers, &worker) } @@ -310,7 +311,7 @@ func main() { grw := verifier.NewGroupReadWorker( verifier.NewGroupReadConfig( makeWorkerConfig(), *cgName, nPartitions, *cgReaders, - *seqConsumeCount, (*consumeTputMb)*1024*1024)) + *seqConsumeCount, (*consumeTputMb)*1024*1024), verifier.NewValidatorStatus(*compacted)) workers = append(workers, &grw) for loopState.Next() { diff --git a/pkg/worker/verifier/group_read_worker.go b/pkg/worker/verifier/group_read_worker.go index 5583222..a154efd 100644 --- a/pkg/worker/verifier/group_read_worker.go +++ b/pkg/worker/verifier/group_read_worker.go @@ -49,10 +49,10 @@ type GroupReadWorker struct { Status GroupWorkerStatus } -func NewGroupReadWorker(cfg GroupReadConfig) GroupReadWorker { +func NewGroupReadWorker(cfg GroupReadConfig, validatorStatus ValidatorStatus) GroupReadWorker { return GroupReadWorker{ config: cfg, - Status: GroupWorkerStatus{Topic: cfg.workerCfg.Topic}, + Status: GroupWorkerStatus{Topic: cfg.workerCfg.Topic, Validator: validatorStatus}, } } diff --git a/pkg/worker/verifier/random_read_worker.go b/pkg/worker/verifier/random_read_worker.go index a818b53..232e7aa 100644 --- a/pkg/worker/verifier/random_read_worker.go +++ b/pkg/worker/verifier/random_read_worker.go @@ -42,10 +42,10 @@ func NewRandomReadConfig(wc worker.WorkerConfig, name string, nPartitions int32, } } -func NewRandomReadWorker(cfg RandomReadConfig) RandomReadWorker { +func NewRandomReadWorker(cfg RandomReadConfig, validatorStatus ValidatorStatus) RandomReadWorker { return RandomReadWorker{ config: cfg, - Status: RandomWorkerStatus{Topic: cfg.workerCfg.Topic}, + Status: RandomWorkerStatus{Topic: cfg.workerCfg.Topic, Validator: validatorStatus}, } } diff --git a/pkg/worker/verifier/seq_read_worker.go b/pkg/worker/verifier/seq_read_worker.go index 99bcbc0..f55657d 100644 --- a/pkg/worker/verifier/seq_read_worker.go +++ b/pkg/worker/verifier/seq_read_worker.go @@ -43,10 +43,10 @@ type SeqReadWorker struct { Status SeqWorkerStatus } -func NewSeqReadWorker(cfg SeqReadConfig) SeqReadWorker { +func NewSeqReadWorker(cfg SeqReadConfig, validatorStatus ValidatorStatus) SeqReadWorker { return SeqReadWorker{ config: cfg, - Status: SeqWorkerStatus{Topic: cfg.workerCfg.Topic}, + Status: SeqWorkerStatus{Topic: cfg.workerCfg.Topic, Validator: validatorStatus}, } } diff --git a/pkg/worker/verifier/validator_status.go b/pkg/worker/verifier/validator_status.go index c358d22..82299d4 100644 --- a/pkg/worker/verifier/validator_status.go +++ b/pkg/worker/verifier/validator_status.go @@ -11,10 +11,12 @@ import ( "github.com/twmb/franz-go/pkg/kgo" ) -func NewValidatorStatus() ValidatorStatus { +func NewValidatorStatus(compacted bool) ValidatorStatus { return ValidatorStatus{ MaxOffsetsConsumed: make(map[int32]int64), - lastCheckpoint: time.Now(), + + lastCheckpoint: time.Now(), + compacted: compacted, } } @@ -49,8 +51,14 @@ type ValidatorStatus struct { // Last consumed offset per partition. Used to assert monotonicity and check for gaps. lastOffsetConsumed map[int32]int64 + // The latest offset seen for a given key. Used to help track the latest key-value pair that should be seen. + lastOffsetPerKeyConsumed map[string]int64 + // Last leader epoch per partition. Used to assert monotonicity. lastLeaderEpoch map[int32]int32 + + // Whether the topic to be consumed is compacted. Gaps in offsets will be ignored if true. + compacted bool } func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffsetRanges) { @@ -68,7 +76,7 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse if present { if currentMax < r.Offset { expected := currentMax + 1 - if r.Offset != expected { + if r.Offset != expected && !cs.compacted { log.Warnf("Gap detected in consumed offsets. Expected %d, but got %d", expected, r.Offset) } } else { diff --git a/pkg/worker/verifier/validator_status_test.go b/pkg/worker/verifier/validator_status_test.go index e177fa8..168a9a6 100644 --- a/pkg/worker/verifier/validator_status_test.go +++ b/pkg/worker/verifier/validator_status_test.go @@ -10,7 +10,7 @@ import ( ) func TestValidatorStatus_ValidateRecordHappyPath(t *testing.T) { - validator := verifier.NewValidatorStatus() + validator := verifier.NewValidatorStatus(false) validRanges := verifier.NewTopicOffsetRanges("topic", 1) validRanges.Insert(0, 41) validRanges.Insert(0, 42) @@ -42,7 +42,7 @@ func TestValidatorStatus_ValidateRecordHappyPath(t *testing.T) { } func TestValidatorStatus_ValidateRecordInvalidRead(t *testing.T) { - validator := verifier.NewValidatorStatus() + validator := verifier.NewValidatorStatus(false) validRanges := verifier.NewTopicOffsetRanges("topic", 1) validRanges.Insert(0, 41) @@ -63,7 +63,7 @@ func TestValidatorStatus_ValidateRecordInvalidRead(t *testing.T) { } func TestValidatorStatus_ValidateRecordNonMonotonicOffset(t *testing.T) { - validator := verifier.NewValidatorStatus() + validator := verifier.NewValidatorStatus(false) validRanges := verifier.NewTopicOffsetRanges("topic", 1) validator.ValidateRecord(&kgo.Record{ @@ -100,9 +100,8 @@ func TestValidatorStatus_ValidateRecordNonMonotonicOffset(t *testing.T) { }() } - func TestValidatorStatus_ValidateRecordNonMonotonicLeaderEpoch(t *testing.T) { - validator := verifier.NewValidatorStatus() + validator := verifier.NewValidatorStatus(false) validRanges := verifier.NewTopicOffsetRanges("topic", 1) validator.ValidateRecord(&kgo.Record{ From 98b0a59751fc8d3c5de32784fd7303bb94a390c0 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 14 Nov 2024 12:33:45 -0500 Subject: [PATCH 3/5] `worker`: add `TombstonesProduced` and `TombstonesConsumed` --- pkg/worker/verifier/producer_worker.go | 6 ++++++ pkg/worker/verifier/validator_status.go | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/pkg/worker/verifier/producer_worker.go b/pkg/worker/verifier/producer_worker.go index 9d644fa..e89474b 100644 --- a/pkg/worker/verifier/producer_worker.go +++ b/pkg/worker/verifier/producer_worker.go @@ -110,6 +110,9 @@ func (pw *ProducerWorker) newRecord(producerId int, sequence int64) *kgo.Record } payload := pw.config.valueGenerator.Generate() + if payload == nil { + pw.Status.TombstonesProduced += 1 + } var r *kgo.Record if pw.config.keySetCardinality < 0 { @@ -154,6 +157,9 @@ type ProducerWorkerStatus struct { // How many times produce request failed? Fails int64 `json:"fails"` + // How many tombstone records were produced? + TombstonesProduced int64 `json:"tombstones_produced"` + // How many failures occured while trying to begin, abort, // or commit a transaction. FailedTransactions int64 `json:"failed_transactions"` diff --git a/pkg/worker/verifier/validator_status.go b/pkg/worker/verifier/validator_status.go index 82299d4..67be7a2 100644 --- a/pkg/worker/verifier/validator_status.go +++ b/pkg/worker/verifier/validator_status.go @@ -41,6 +41,9 @@ type ValidatorStatus struct { LostOffsets map[int32]int64 `json:"lost_offsets"` + // The number of tombstones consumed + TombstonesConsumed int64 `json:"tombstones_consumed"` + // Concurrent access happens when doing random reads // with multiple reader fibers lock sync.Mutex @@ -67,6 +70,10 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse cs.lock.Lock() defer cs.lock.Unlock() + if r.Value == nil { + cs.TombstonesConsumed += 1 + } + if r.LeaderEpoch < cs.lastLeaderEpoch[r.Partition] { log.Panicf("Out of order leader epoch on p=%d at o=%d leaderEpoch=%d. Previous leaderEpoch=%d", r.Partition, r.Offset, r.LeaderEpoch, cs.lastLeaderEpoch[r.Partition]) From 7dc92cd7ab89084f26c4f9cb852bafa70cacd241 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 14 Nov 2024 16:53:18 -0500 Subject: [PATCH 4/5] `worker`: add `LatestValueProduced` For verification of compacted topics, we can track the last expected key-value pair that will be seen after the log has been fully compacted. --- pkg/worker/verifier/producer_worker.go | 36 +++++++++++++++++++------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/pkg/worker/verifier/producer_worker.go b/pkg/worker/verifier/producer_worker.go index e89474b..eeff183 100644 --- a/pkg/worker/verifier/producer_worker.go +++ b/pkg/worker/verifier/producer_worker.go @@ -151,6 +151,9 @@ type ProducerWorkerStatus struct { MaxOffsetsProduced map[int32]int64 `json:"max_offsets_produced"` + // The last produced value for a given key in a partition. + LatestValueProduced map[int32]map[string]string `json:"latest_value_produced"` + // How many times did we restart the producer loop? Restarts int64 `json:"restarts"` @@ -184,18 +187,22 @@ type ProducerWorkerStatus struct { func NewProducerWorkerStatus(topic string) ProducerWorkerStatus { return ProducerWorkerStatus{ - Topic: topic, - MaxOffsetsProduced: make(map[int32]int64), - lastCheckpoint: time.Now(), - latency: metrics.NewHistogram(metrics.NewExpDecaySample(1024, 0.015)), + Topic: topic, + MaxOffsetsProduced: make(map[int32]int64), + LatestValueProduced: make(map[int32]map[string]string), + lastCheckpoint: time.Now(), + latency: metrics.NewHistogram(metrics.NewExpDecaySample(1024, 0.015)), } } -func (self *ProducerWorkerStatus) OnAcked(Partition int32, Offset int64) { +func (self *ProducerWorkerStatus) OnAcked(r *kgo.Record) { self.lock.Lock() defer self.lock.Unlock() - self.Acked += 1 + Partition := r.Partition + Offset := r.Offset + + self.Acked += 1 currentMax, present := self.MaxOffsetsProduced[Partition] if present { if currentMax < Offset { @@ -209,6 +216,11 @@ func (self *ProducerWorkerStatus) OnAcked(Partition int32, Offset int64) { } else { self.MaxOffsetsProduced[Partition] = Offset } + + if self.LatestValueProduced[Partition] == nil { + self.LatestValueProduced[Partition] = make(map[string]string) + } + self.LatestValueProduced[Partition][string(r.Key)] = string(r.Value) } func (self *ProducerWorkerStatus) OnBadOffset() { @@ -223,6 +235,11 @@ func (self *ProducerWorkerStatus) OnFail() { self.Fails += 1 } +func (pw *ProducerWorkerStatus) String() string { + return fmt.Sprintf("Topic: %s, Sent: %d, Acked: %d, BadOffsets: %d, Restarts: %d, Fails: %d, TombstonesProduced: %d, FailedTransactions: %d, AbortedTransactionMessages: %d", + pw.Topic, pw.Sent, pw.Acked, pw.BadOffsets, pw.Restarts, pw.Fails, pw.TombstonesProduced, pw.FailedTransactions, pw.AbortedTransactionMessages) +} + func (pw *ProducerWorker) produceCheckpoint() { err := pw.validOffsets.Store() util.Chk(err, "Error writing offset map: %v", err) @@ -230,11 +247,11 @@ func (pw *ProducerWorker) produceCheckpoint() { status, lock := pw.GetStatus() lock.Lock() - data, err := json.Marshal(status) + _, err = json.Marshal(status) lock.Unlock() util.Chk(err, "Status serialization error") - log.Infof("Producer status: %s", data) + log.Infof("Producer status: %s", status.(*ProducerWorkerStatus).String()) } func (pw *ProducerWorker) Wait() error { @@ -381,10 +398,11 @@ func (pw *ProducerWorker) produceInner(n int64) (int64, []BadOffset, error) { log.Debugf("errored = %t", errored) } else { ackLatency := time.Now().Sub(sentAt) - pw.Status.OnAcked(r.Partition, r.Offset) + pw.Status.OnAcked(r) pw.Status.latency.Update(ackLatency.Microseconds()) log.Debugf("Wrote partition %d at %d", r.Partition, r.Offset) pw.validOffsets.Insert(r.Partition, r.Offset) + } wg.Done() } From 28a4a05bdc5f8dfa3c56137f4d3adca90428b3ed Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 14 Nov 2024 16:55:14 -0500 Subject: [PATCH 5/5] `worker`: add `LatestValueConsumed` As a means to verify the results along with `LatestValueProduced` for a compacted topic. --- pkg/worker/verifier/validator_status.go | 32 +++++++++++++++++++++---- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/worker/verifier/validator_status.go b/pkg/worker/verifier/validator_status.go index 67be7a2..51e180c 100644 --- a/pkg/worker/verifier/validator_status.go +++ b/pkg/worker/verifier/validator_status.go @@ -39,6 +39,10 @@ type ValidatorStatus struct { // The highest valid offset consumed throughout the consumer's lifetime MaxOffsetsConsumed map[int32]int64 `json:"max_offsets_consumed"` + // The last consumed value for a given key in a partition. Only valid for sequential and group consumers + // (random consumers may not read the latest offset for a given key) + LatestValueConsumed map[int32]map[string]string `json:"latest_value_consumed"` + LostOffsets map[int32]int64 `json:"lost_offsets"` // The number of tombstones consumed @@ -54,8 +58,9 @@ type ValidatorStatus struct { // Last consumed offset per partition. Used to assert monotonicity and check for gaps. lastOffsetConsumed map[int32]int64 - // The latest offset seen for a given key. Used to help track the latest key-value pair that should be seen. - lastOffsetPerKeyConsumed map[string]int64 + // The latest offset seen for a given key in a partition. + // Used to help track the latest key-value pair that should be seen. + lastOffsetPerKeyConsumed map[int32]map[string]int64 // Last leader epoch per partition. Used to assert monotonicity. lastLeaderEpoch map[int32]int32 @@ -130,6 +135,18 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record, recordExpected bool) { if cs.lastLeaderEpoch == nil { cs.lastLeaderEpoch = make(map[int32]int32) } + if cs.lastOffsetPerKeyConsumed == nil { + cs.lastOffsetPerKeyConsumed = make(map[int32]map[string]int64) + } + if cs.lastOffsetPerKeyConsumed[r.Partition] == nil { + cs.lastOffsetPerKeyConsumed[r.Partition] = make(map[string]int64) + } + if cs.LatestValueConsumed == nil { + cs.LatestValueConsumed = make(map[int32]map[string]string) + } + if cs.LatestValueConsumed[r.Partition] == nil { + cs.LatestValueConsumed[r.Partition] = make(map[string]string) + } // We bump highest offset only for valid records. if r.Offset > cs.MaxOffsetsConsumed[r.Partition] && recordExpected { @@ -138,6 +155,9 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record, recordExpected bool) { cs.lastOffsetConsumed[r.Partition] = r.Offset cs.lastLeaderEpoch[r.Partition] = r.LeaderEpoch + if r.Offset > cs.lastOffsetPerKeyConsumed[r.Partition][string(r.Key)] { + cs.LatestValueConsumed[r.Partition][string(r.Key)] = string(r.Value) + } } func (cs *ValidatorStatus) RecordLostOffsets(p int32, count int64) { @@ -171,11 +191,13 @@ func (cs *ValidatorStatus) SetMonotonicityTestStateForPartition(partition int32, } func (cs *ValidatorStatus) Checkpoint() { + _, err := json.Marshal(cs) + util.Chk(err, "Status serialization error") + log.Infof("Validator status: %s", cs.String()) } func (cs *ValidatorStatus) String() string { - data, err := json.Marshal(cs) - util.Chk(err, "Status serialization error") - return string(data) + return fmt.Sprintf("Name: %s, ValidReads: %d, InvalidReads: %d, OutOfScopeInvalidReads: %d, TombstonesConsumed: %d", + cs.Name, cs.ValidReads, cs.InvalidReads, cs.OutOfScopeInvalidReads, cs.TombstonesConsumed) }