diff --git a/README.md b/README.md index 807807d..36804b2 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,14 @@ To partition the feed (any field of the protobuf is available), the following op -format.hash=SamplerAddress,DstAS ``` +By default, compression is disabled when sending data to Kafka. +To change the kafka compression type of the producer side configure the following option: +``` +-transport.kafka.compression.type=gzip +``` +The list of codecs is available in the [Sarama documentation](https://pkg.go.dev/github.com/Shopify/sarama#CompressionCodec). + + By default, the collector will listen for IPFIX/NetFlow V9 on port 2055 and sFlow on port 6343. To change the sockets binding, you can set the `-listen` argument and a URI diff --git a/transport/kafka/kafka.go b/transport/kafka/kafka.go index 3ea9f30..99fe8dd 100644 --- a/transport/kafka/kafka.go +++ b/transport/kafka/kafka.go @@ -32,12 +32,23 @@ type KafkaDriver struct { kafkaHashing bool kafkaVersion string + kafkaCompressionCodec string producer sarama.AsyncProducer q chan bool } +var ( + compressionCodecs = map[string]sarama.CompressionCodec{ + strings.ToLower(sarama.CompressionNone.String()): sarama.CompressionNone, + strings.ToLower(sarama.CompressionGZIP.String()): sarama.CompressionGZIP, + strings.ToLower(sarama.CompressionSnappy.String()): sarama.CompressionSnappy, + strings.ToLower(sarama.CompressionLZ4.String()): sarama.CompressionLZ4, + strings.ToLower(sarama.CompressionZSTD.String()): sarama.CompressionZSTD, + } +) + func (d *KafkaDriver) Prepare() error { flag.BoolVar(&d.kafkaTLS, "transport.kafka.tls", false, "Use TLS to connect to Kafka") @@ -54,6 +65,7 @@ func (d *KafkaDriver) Prepare() error { //flag.StringVar(&d.kafkaKeying, "transport.kafka.key", "SamplerAddress,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas") flag.StringVar(&d.kafkaVersion, "transport.kafka.version", "2.8.0", "Kafka version") + flag.StringVar(&d.kafkaCompressionCodec, "transport.kafka.compression", "", "Kafka default compression") return nil } @@ -71,6 +83,26 @@ func (d *KafkaDriver) Init(context.Context) error { kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency + + if d.kafkaCompressionCodec != "" { + /* + // when upgrading sarama, replace with: + // note: if the library adds more codecs, they will be supported natively + var cc *sarama.CompressionCodec + + if err := cc.UnmarshalText([]byte(d.kafkaCompressionCodec)); err != nil { + return err + } + kafkaConfig.Producer.Compression = *cc + */ + + if cc, ok := compressionCodecs[strings.ToLower(d.kafkaCompressionCodec)]; !ok { + return errors.New("compression codec does not exist") + } else { + kafkaConfig.Producer.Compression = cc + } + } + if d.kafkaTLS { rootCAs, err := x509.SystemCertPool() if err != nil {