Skip to content

Commit

Permalink
Revert "kgo: broadcast batch finishes in one big blast"
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb authored Jan 20, 2025
1 parent 9d27aac commit 2c576e5
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,6 @@ func (p *producer) promiseRecordBeforeBuf(pr promisedRec, err error) {
func (p *producer) finishPromises(b batchPromise) {
cl := p.cl
var more bool
var broadcast bool
defer func() {
if broadcast {
p.c.Broadcast()
}
}()
start:
p.promisesMu.Lock()
for i, pr := range b.recs {
Expand All @@ -570,8 +564,7 @@ start:
pr.ProducerID = b.pid
pr.ProducerEpoch = b.epoch
pr.Attrs = b.attrs
recBroadcast := cl.finishRecordPromise(pr, b.err, b.beforeBuf)
broadcast = broadcast || recBroadcast
cl.finishRecordPromise(pr, b.err, b.beforeBuf)
b.recs[i] = promisedRec{}
}
p.promisesMu.Unlock()
Expand All @@ -585,7 +578,7 @@ start:
}
}

func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) (broadcast bool) {
func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) {
p := &cl.producer

if p.hooks != nil && len(p.hooks.unbuffered) > 0 {
Expand All @@ -612,10 +605,12 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering
p.mu.Lock()
p.bufferedBytes -= userSize
p.bufferedRecords--
broadcast = p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0
broadcast := p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0
p.mu.Unlock()

return broadcast
if broadcast {
p.c.Broadcast()
}
}

// partitionRecord loads the partitions for a topic and produce to them. If
Expand Down

0 comments on commit 2c576e5

Please sign in to comment.