Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --tombstone-probability and --compacted #60

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/kgo-repeater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ var (
transactionAbortRate = flag.Float64("transaction-abort-rate", 0.0, "The probability that any given transaction should abort")
msgsPerTransaction = flag.Uint("msgs-per-transaction", 1, "The number of messages that should be in a given transaction")

compressionType = flag.String("compression-type", "", "One of gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer")
compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload")
compressionType = flag.String("compression-type", "", "One of gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer")
compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload")
tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.")
)

// NewAdmin returns a franz-go admin client.
Expand Down Expand Up @@ -150,7 +151,7 @@ func main() {
// it was refactored.
wConfig := worker.NewWorkerConfig(
name, *brokers, *trace, topicsList[0], *linger, *maxBufferedRecords, *useTransactions, *compressionType, *compressiblePayload, *username, *password, *enableTls)
config := repeater.NewRepeaterConfig(wConfig, topicsList, *group, *keys, *payloadSize, dataInFlightPerWorker, rateLimitPerWorker)
config := repeater.NewRepeaterConfig(wConfig, topicsList, *group, *keys, *payloadSize, dataInFlightPerWorker, rateLimitPerWorker, *tombstoneProbability)
lv := repeater.NewWorker(config)
if *useTransactions {
tconfig := worker.NewTransactionSTMConfig(*transactionAbortRate, *msgsPerTransaction)
Expand Down Expand Up @@ -212,9 +213,9 @@ func main() {
go func() {
listenAddr := fmt.Sprintf("0.0.0.0:%d", *remotePort)
if err := http.ListenAndServe(listenAddr, mux); err != nil {
panic(fmt.Sprintf("failed to listen on %s: %v", listenAddr, err));
panic(fmt.Sprintf("failed to listen on %s: %v", listenAddr, err))
nvartolomei marked this conversation as resolved.
Show resolved Hide resolved
}
}();
}()

if !*remote {
admin, err := NewAdmin()
Expand Down
10 changes: 6 additions & 4 deletions cmd/kgo-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ var (

tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events")
tolerateFailedProduce = flag.Bool("tolerate-failed-produce", false, "If true, tolerate and retry failed produce")
tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.")
compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not.")
)

func makeWorkerConfig() worker.WorkerConfig {
Expand Down Expand Up @@ -245,7 +247,7 @@ func main() {

if *pCount > 0 {
log.Info("Starting producer...")
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId)
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId, *tombstoneProbability)
pw := verifier.NewProducerWorker(pwc)

if *useTransactions {
Expand All @@ -261,7 +263,7 @@ func main() {
srw := verifier.NewSeqReadWorker(verifier.NewSeqReadConfig(
makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount,
(*consumeTputMb)*1024*1024,
))
), verifier.NewValidatorStatus(*compacted))
workers = append(workers, &srw)

for loopState.Next() {
Expand All @@ -280,7 +282,7 @@ func main() {
workerCfg := verifier.NewRandomReadConfig(
makeWorkerConfig(), fmt.Sprintf("random-%03d", i), nPartitions, *cCount,
)
worker := verifier.NewRandomReadWorker(workerCfg)
worker := verifier.NewRandomReadWorker(workerCfg, verifier.NewValidatorStatus(*compacted))
randomWorkers = append(randomWorkers, &worker)
workers = append(workers, &worker)
}
Expand Down Expand Up @@ -309,7 +311,7 @@ func main() {
grw := verifier.NewGroupReadWorker(
verifier.NewGroupReadConfig(
makeWorkerConfig(), *cgName, nPartitions, *cgReaders,
*seqConsumeCount, (*consumeTputMb)*1024*1024))
*seqConsumeCount, (*consumeTputMb)*1024*1024), verifier.NewValidatorStatus(*compacted))
workers = append(workers, &grw)

for loopState.Next() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/repeater/repeater_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type RepeaterConfig struct {
RateLimitBps int
}

func NewRepeaterConfig(cfg worker.WorkerConfig, topics []string, group string, keys uint64, payloadSize uint64, dataInFlight uint64, rateLimitBps int) RepeaterConfig {
func NewRepeaterConfig(cfg worker.WorkerConfig, topics []string, group string, keys uint64, payloadSize uint64, dataInFlight uint64, rateLimitBps int, tombstoneProbability float64) RepeaterConfig {
return RepeaterConfig{
workerCfg: cfg,
Topics: topics,
Group: group,
KeySpace: worker.KeySpace{UniqueCount: keys},
ValueGenerator: worker.ValueGenerator{PayloadSize: payloadSize, Compressible: cfg.CompressiblePayload},
ValueGenerator: worker.ValueGenerator{PayloadSize: payloadSize, Compressible: cfg.CompressiblePayload, TombstoneProbability: tombstoneProbability},
DataInFlight: dataInFlight,
RateLimitBps: rateLimitBps,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/verifier/group_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type GroupReadWorker struct {
Status GroupWorkerStatus
}

func NewGroupReadWorker(cfg GroupReadConfig) GroupReadWorker {
func NewGroupReadWorker(cfg GroupReadConfig, validatorStatus ValidatorStatus) GroupReadWorker {
return GroupReadWorker{
config: cfg,
Status: GroupWorkerStatus{Topic: cfg.workerCfg.Topic},
Status: GroupWorkerStatus{Topic: cfg.workerCfg.Topic, Validator: validatorStatus},
}
}

Expand Down
51 changes: 38 additions & 13 deletions pkg/worker/verifier/producer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ProducerConfig struct {
}

func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32,
messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int) ProducerConfig {
messageSize int, messageCount int, fakeTimestampMs int64, fakeTimestampStepMs int64, rateLimitBytes int, keySetCardinality int, messagesPerProducerId int, tombstoneProbability float64) ProducerConfig {
return ProducerConfig{
workerCfg: wc,
name: name,
Expand All @@ -47,8 +47,9 @@ func NewProducerConfig(wc worker.WorkerConfig, name string, nPartitions int32,
keySetCardinality: keySetCardinality,
messagesPerProducerId: messagesPerProducerId,
valueGenerator: worker.ValueGenerator{
PayloadSize: uint64(messageSize),
Compressible: wc.CompressiblePayload,
PayloadSize: uint64(messageSize),
Compressible: wc.CompressiblePayload,
TombstoneProbability: tombstoneProbability,
},
}
}
Expand Down Expand Up @@ -108,7 +109,10 @@ func (pw *ProducerWorker) newRecord(producerId int, sequence int64) *kgo.Record
pw.Status.AbortedTransactionMessages += 1
}

payload := make([]byte, pw.config.messageSize)
payload := pw.config.valueGenerator.Generate()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why valueGenerator wasn't used here before instead of an empty make([]byte)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😨

if payload == nil {
pw.Status.TombstonesProduced += 1
}
var r *kgo.Record

if pw.config.keySetCardinality < 0 {
Expand Down Expand Up @@ -147,12 +151,18 @@ type ProducerWorkerStatus struct {

MaxOffsetsProduced map[int32]int64 `json:"max_offsets_produced"`

// The last produced value for a given key in a partition.
LatestValueProduced map[int32]map[string]string `json:"latest_value_produced"`

// How many times did we restart the producer loop?
Restarts int64 `json:"restarts"`

// How many times produce request failed?
Fails int64 `json:"fails"`

// How many tombstone records were produced?
TombstonesProduced int64 `json:"tombstones_produced"`

// How many failures occured while trying to begin, abort,
// or commit a transaction.
FailedTransactions int64 `json:"failed_transactions"`
Expand All @@ -177,18 +187,22 @@ type ProducerWorkerStatus struct {

func NewProducerWorkerStatus(topic string) ProducerWorkerStatus {
return ProducerWorkerStatus{
Topic: topic,
MaxOffsetsProduced: make(map[int32]int64),
lastCheckpoint: time.Now(),
latency: metrics.NewHistogram(metrics.NewExpDecaySample(1024, 0.015)),
Topic: topic,
MaxOffsetsProduced: make(map[int32]int64),
LatestValueProduced: make(map[int32]map[string]string),
lastCheckpoint: time.Now(),
latency: metrics.NewHistogram(metrics.NewExpDecaySample(1024, 0.015)),
}
}

func (self *ProducerWorkerStatus) OnAcked(Partition int32, Offset int64) {
func (self *ProducerWorkerStatus) OnAcked(r *kgo.Record) {
self.lock.Lock()
defer self.lock.Unlock()
self.Acked += 1

Partition := r.Partition
Offset := r.Offset

self.Acked += 1
currentMax, present := self.MaxOffsetsProduced[Partition]
if present {
if currentMax < Offset {
Expand All @@ -202,6 +216,11 @@ func (self *ProducerWorkerStatus) OnAcked(Partition int32, Offset int64) {
} else {
self.MaxOffsetsProduced[Partition] = Offset
}

if self.LatestValueProduced[Partition] == nil {
self.LatestValueProduced[Partition] = make(map[string]string)
}
self.LatestValueProduced[Partition][string(r.Key)] = string(r.Value)
}

func (self *ProducerWorkerStatus) OnBadOffset() {
Expand All @@ -216,18 +235,23 @@ func (self *ProducerWorkerStatus) OnFail() {
self.Fails += 1
}

func (pw *ProducerWorkerStatus) String() string {
return fmt.Sprintf("Topic: %s, Sent: %d, Acked: %d, BadOffsets: %d, Restarts: %d, Fails: %d, TombstonesProduced: %d, FailedTransactions: %d, AbortedTransactionMessages: %d",
pw.Topic, pw.Sent, pw.Acked, pw.BadOffsets, pw.Restarts, pw.Fails, pw.TombstonesProduced, pw.FailedTransactions, pw.AbortedTransactionMessages)
}

func (pw *ProducerWorker) produceCheckpoint() {
err := pw.validOffsets.Store()
util.Chk(err, "Error writing offset map: %v", err)

status, lock := pw.GetStatus()

lock.Lock()
data, err := json.Marshal(status)
_, err = json.Marshal(status)
lock.Unlock()

util.Chk(err, "Status serialization error")
log.Infof("Producer status: %s", data)
log.Infof("Producer status: %s", status.(*ProducerWorkerStatus).String())
}

func (pw *ProducerWorker) Wait() error {
Expand Down Expand Up @@ -374,10 +398,11 @@ func (pw *ProducerWorker) produceInner(n int64) (int64, []BadOffset, error) {
log.Debugf("errored = %t", errored)
} else {
ackLatency := time.Now().Sub(sentAt)
pw.Status.OnAcked(r.Partition, r.Offset)
pw.Status.OnAcked(r)
pw.Status.latency.Update(ackLatency.Microseconds())
log.Debugf("Wrote partition %d at %d", r.Partition, r.Offset)
pw.validOffsets.Insert(r.Partition, r.Offset)

}
wg.Done()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/verifier/random_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func NewRandomReadConfig(wc worker.WorkerConfig, name string, nPartitions int32,
}
}

func NewRandomReadWorker(cfg RandomReadConfig) RandomReadWorker {
func NewRandomReadWorker(cfg RandomReadConfig, validatorStatus ValidatorStatus) RandomReadWorker {
return RandomReadWorker{
config: cfg,
Status: RandomWorkerStatus{Topic: cfg.workerCfg.Topic},
Status: RandomWorkerStatus{Topic: cfg.workerCfg.Topic, Validator: validatorStatus},
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/verifier/seq_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type SeqReadWorker struct {
Status SeqWorkerStatus
}

func NewSeqReadWorker(cfg SeqReadConfig) SeqReadWorker {
func NewSeqReadWorker(cfg SeqReadConfig, validatorStatus ValidatorStatus) SeqReadWorker {
return SeqReadWorker{
config: cfg,
Status: SeqWorkerStatus{Topic: cfg.workerCfg.Topic},
Status: SeqWorkerStatus{Topic: cfg.workerCfg.Topic, Validator: validatorStatus},
}
}

Expand Down
49 changes: 43 additions & 6 deletions pkg/worker/verifier/validator_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

func NewValidatorStatus() ValidatorStatus {
func NewValidatorStatus(compacted bool) ValidatorStatus {
return ValidatorStatus{
MaxOffsetsConsumed: make(map[int32]int64),
lastCheckpoint: time.Now(),

lastCheckpoint: time.Now(),
compacted: compacted,
}
}

Expand All @@ -37,8 +39,15 @@ type ValidatorStatus struct {
// The highest valid offset consumed throughout the consumer's lifetime
MaxOffsetsConsumed map[int32]int64 `json:"max_offsets_consumed"`

// The last consumed value for a given key in a partition. Only valid for sequential and group consumers
// (random consumers may not read the latest offset for a given key)
LatestValueConsumed map[int32]map[string]string `json:"latest_value_consumed"`

LostOffsets map[int32]int64 `json:"lost_offsets"`

// The number of tombstones consumed
TombstonesConsumed int64 `json:"tombstones_consumed"`

// Concurrent access happens when doing random reads
// with multiple reader fibers
lock sync.Mutex
Expand All @@ -49,8 +58,15 @@ type ValidatorStatus struct {
// Last consumed offset per partition. Used to assert monotonicity and check for gaps.
lastOffsetConsumed map[int32]int64

// The latest offset seen for a given key in a partition.
// Used to help track the latest key-value pair that should be seen.
lastOffsetPerKeyConsumed map[int32]map[string]int64

// Last leader epoch per partition. Used to assert monotonicity.
lastLeaderEpoch map[int32]int32

// Whether the topic to be consumed is compacted. Gaps in offsets will be ignored if true.
compacted bool
}

func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffsetRanges) {
Expand All @@ -59,6 +75,10 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse
cs.lock.Lock()
defer cs.lock.Unlock()

if r.Value == nil {
cs.TombstonesConsumed += 1
}

if r.LeaderEpoch < cs.lastLeaderEpoch[r.Partition] {
log.Panicf("Out of order leader epoch on p=%d at o=%d leaderEpoch=%d. Previous leaderEpoch=%d",
r.Partition, r.Offset, r.LeaderEpoch, cs.lastLeaderEpoch[r.Partition])
Expand All @@ -68,7 +88,7 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse
if present {
if currentMax < r.Offset {
expected := currentMax + 1
if r.Offset != expected {
if r.Offset != expected && !cs.compacted {
log.Warnf("Gap detected in consumed offsets. Expected %d, but got %d", expected, r.Offset)
}
} else {
Expand Down Expand Up @@ -115,6 +135,18 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record, recordExpected bool) {
if cs.lastLeaderEpoch == nil {
cs.lastLeaderEpoch = make(map[int32]int32)
}
if cs.lastOffsetPerKeyConsumed == nil {
cs.lastOffsetPerKeyConsumed = make(map[int32]map[string]int64)
}
if cs.lastOffsetPerKeyConsumed[r.Partition] == nil {
cs.lastOffsetPerKeyConsumed[r.Partition] = make(map[string]int64)
}
if cs.LatestValueConsumed == nil {
cs.LatestValueConsumed = make(map[int32]map[string]string)
}
if cs.LatestValueConsumed[r.Partition] == nil {
cs.LatestValueConsumed[r.Partition] = make(map[string]string)
}

// We bump highest offset only for valid records.
if r.Offset > cs.MaxOffsetsConsumed[r.Partition] && recordExpected {
Expand All @@ -123,6 +155,9 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record, recordExpected bool) {

cs.lastOffsetConsumed[r.Partition] = r.Offset
cs.lastLeaderEpoch[r.Partition] = r.LeaderEpoch
if r.Offset > cs.lastOffsetPerKeyConsumed[r.Partition][string(r.Key)] {
cs.LatestValueConsumed[r.Partition][string(r.Key)] = string(r.Value)
}
}

func (cs *ValidatorStatus) RecordLostOffsets(p int32, count int64) {
Expand Down Expand Up @@ -156,11 +191,13 @@ func (cs *ValidatorStatus) SetMonotonicityTestStateForPartition(partition int32,
}

func (cs *ValidatorStatus) Checkpoint() {
_, err := json.Marshal(cs)
util.Chk(err, "Status serialization error")

log.Infof("Validator status: %s", cs.String())
}

func (cs *ValidatorStatus) String() string {
data, err := json.Marshal(cs)
util.Chk(err, "Status serialization error")
return string(data)
return fmt.Sprintf("Name: %s, ValidReads: %d, InvalidReads: %d, OutOfScopeInvalidReads: %d, TombstonesConsumed: %d",
cs.Name, cs.ValidReads, cs.InvalidReads, cs.OutOfScopeInvalidReads, cs.TombstonesConsumed)
}
Loading
Loading