Skip to content

Commit

Permalink
kgo: return decompression errors while consuming
Browse files Browse the repository at this point in the history
Kafka can return partial batches, so decompression errors are common. If
I ask for at most 100 bytes, and the broker has two 60 byte batches, I
will receive one valid 60 byte batch and then a partial 40 byte batch.
The second partial batch will fail at decompressing. This is the reason
I previously never returned decompression errors.

However, if a client truly does produce somewhat-valid compressed data
that *some* decompressors can process, but *others* (Go's) cannot, then
the first batch received could fail to decompress. The client would fail
processing, return an empty batch, and try consuming at the same spot.
The client would spin loop trying to consume and the end user would
never be aware.

Now, if the first error received is a decompression error, we bubble it
up to the end user.

This is hard to test internally, so this was hack manually tested.

Scenario one:
* I changed the code to ignore crc errors, since that just got in the
  way
* I ran a local kfake where the first five bytes of a
  RecordBatch.Records was overwritten with "aaaaa"
* I consumed _before_ this patch -- the client spin-looped, never
  progressing and never printing anything.
* I consumed _after_ this patch -- the client immediately received the
  error.

Scenario two:
* Same crc ignoring
* I ran a local kfake where, when consuming, all batches AFTER the
  first had their RecordBatch.Records overwritten with "aaaaa".
* I consumed before and after this patch -- in both cases, the client
  progressed to the end of the partition and no errors were printed.
* To double verify the decompression error was being encountered, I
  added a println in kgo where the decompression error is generated --
  the println was always encountered.

Closes #854.
  • Loading branch information
twmb committed Jan 8, 2025
1 parent 3d0d08c commit 1473778
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,18 @@ func (e *ErrGroupSession) Error() string {
}

func (e *ErrGroupSession) Unwrap() error { return e.Err }

type errDecompress struct {
err error
}

func (e *errDecompress) Error() string {
return fmt.Sprintf("unable to decompress batch: %v", e.err)
}

func (e *errDecompress) Unwrap() error { return e.err }

func isDecompressErr(err error) bool {
var ed *errDecompress
return errors.As(err, &ed)
}
16 changes: 16 additions & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,19 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m)
}
})

// If we encounter a decompression error BUT we have successfully decompressed
// one batch, it is likely that we have received a partial batch. Kafka returns
// UP TO the requested max partition bytes, sometimes truncating data at the end.
// It returns at least one valid batch, but everything after is copied as is
// (i.e. a quick slab copy). We set the error to nil and return what we have.
//
// If we have a decompression error immediately, we keep it and bubble it up.
// The client cannot progress, and the end user needs visibility.
if isDecompressErr(fp.Err) && len(fp.Records) > 0 {
fp.Err = nil
break
}
}

return fp
Expand Down Expand Up @@ -1476,6 +1489,7 @@ func (o *cursorOffsetNext) processRecordBatch(
if compression := byte(batch.Attributes & 0x0007); compression != 0 {
var err error
if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil {
fp.Err = &errDecompress{err}
return 0, 0 // truncated batch
}
}
Expand Down Expand Up @@ -1542,6 +1556,7 @@ func (o *cursorOffsetNext) processV1OuterMessage(

rawInner, err := decompressor.decompress(message.Value, compression)
if err != nil {
fp.Err = &errDecompress{err}
return 0, 0 // truncated batch
}

Expand Down Expand Up @@ -1653,6 +1668,7 @@ func (o *cursorOffsetNext) processV0OuterMessage(

rawInner, err := decompressor.decompress(message.Value, compression)
if err != nil {
fp.Err = &errDecompress{err}
return 0, 0 // truncated batch
}

Expand Down

0 comments on commit 1473778

Please sign in to comment.