Skip to content

Commit

Permalink
Kafka: enable compression (cloudflare#128)
Browse files Browse the repository at this point in the history
Co-authored-by: Arun Cherla <[email protected]>
  • Loading branch information
lspgn and acherla authored Oct 8, 2022
1 parent 7801c6a commit 3734d83
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ To partition the feed (any field of the protobuf is available), the following op
-format.hash=SamplerAddress,DstAS
```

By default, compression is disabled when sending data to Kafka.
To change the kafka compression type of the producer side configure the following option:
```
-transport.kafka.compression.type=gzip
```
The list of codecs is available in the [Sarama documentation](https://pkg.go.dev/github.com/Shopify/sarama#CompressionCodec).


By default, the collector will listen for IPFIX/NetFlow V9 on port 2055
and sFlow on port 6343.
To change the sockets binding, you can set the `-listen` argument and a URI
Expand Down
32 changes: 32 additions & 0 deletions transport/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,23 @@ type KafkaDriver struct {

kafkaHashing bool
kafkaVersion string
kafkaCompressionCodec string

producer sarama.AsyncProducer

q chan bool
}

var (
compressionCodecs = map[string]sarama.CompressionCodec{
strings.ToLower(sarama.CompressionNone.String()): sarama.CompressionNone,
strings.ToLower(sarama.CompressionGZIP.String()): sarama.CompressionGZIP,
strings.ToLower(sarama.CompressionSnappy.String()): sarama.CompressionSnappy,
strings.ToLower(sarama.CompressionLZ4.String()): sarama.CompressionLZ4,
strings.ToLower(sarama.CompressionZSTD.String()): sarama.CompressionZSTD,
}
)

func (d *KafkaDriver) Prepare() error {
flag.BoolVar(&d.kafkaTLS, "transport.kafka.tls", false, "Use TLS to connect to Kafka")

Expand All @@ -54,6 +65,7 @@ func (d *KafkaDriver) Prepare() error {

//flag.StringVar(&d.kafkaKeying, "transport.kafka.key", "SamplerAddress,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas")
flag.StringVar(&d.kafkaVersion, "transport.kafka.version", "2.8.0", "Kafka version")
flag.StringVar(&d.kafkaCompressionCodec, "transport.kafka.compression", "", "Kafka default compression")

return nil
}
Expand All @@ -71,6 +83,26 @@ func (d *KafkaDriver) Init(context.Context) error {
kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes
kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes
kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency

if d.kafkaCompressionCodec != "" {
/*
// when upgrading sarama, replace with:
// note: if the library adds more codecs, they will be supported natively
var cc *sarama.CompressionCodec
if err := cc.UnmarshalText([]byte(d.kafkaCompressionCodec)); err != nil {
return err
}
kafkaConfig.Producer.Compression = *cc
*/

if cc, ok := compressionCodecs[strings.ToLower(d.kafkaCompressionCodec)]; !ok {
return errors.New("compression codec does not exist")
} else {
kafkaConfig.Producer.Compression = cc
}
}

if d.kafkaTLS {
rootCAs, err := x509.SystemCertPool()
if err != nil {
Expand Down

0 comments on commit 3734d83

Please sign in to comment.