From e0d30b75dba7eb53725b5de2f08f0006eb97a7f3 Mon Sep 17 00:00:00 2001 From: Brian Landers Date: Sat, 8 Oct 2022 15:02:22 -0700 Subject: [PATCH] Add SASL/SCAM support to Kafka transport. (#121) This pull request adds the optional -transport.kafka.scram flag, which tells the Kafka transport to use SASL/SCRAM for authentication. This allows goflow2 to work with AWS Managed Service for Kafka (MSK) which does not support SASL/PLAIN. Co-authored-by: Brian Landers Co-authored-by: lspgn --- go.mod | 4 ++ go.sum | 4 ++ transport/kafka/kafka.go | 84 ++++++++++++++++++++++++--------- transport/kafka/scram_client.go | 39 +++++++++++++++ 4 files changed, 110 insertions(+), 21 deletions(-) create mode 100644 transport/kafka/scram_client.go diff --git a/go.mod b/go.mod index 0fd25d1..d4d3786 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 + github.com/xdg-go/scram v1.0.2 google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -37,8 +38,11 @@ require ( github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/stringprep v1.0.2 // indirect golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect golang.org/x/net v0.0.0-20210917221730-978cfadd31cf // indirect golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect + golang.org/x/text v0.3.7 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index 8bc3c36..c18530c 100644 --- a/go.sum +++ b/go.sum @@ -152,8 +152,11 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.0.2 h1:akYIkZ28e6A96dkWNJQu3nmCzH3YfwMPQExUYDaRv7w= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= +github.com/xdg-go/stringprep v1.0.2 h1:6iq84/ryjjeRmMJwxutI51F2GIPlP5BfTvXHeYjyhBc= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -200,6 +203,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/transport/kafka/kafka.go b/transport/kafka/kafka.go index 99fe8dd..702ea5c 100644 --- a/transport/kafka/kafka.go +++ b/transport/kafka/kafka.go @@ -20,7 +20,8 @@ import ( type KafkaDriver struct { kafkaTLS bool - kafkaSASL bool + kafkaSASL string + kafkaSCRAM string kafkaTopic string kafkaSrv string kafkaBrk string @@ -30,8 +31,8 @@ type KafkaDriver struct { kafkaLogErrors bool - kafkaHashing bool - kafkaVersion string + kafkaHashing bool + kafkaVersion string kafkaCompressionCodec string producer sarama.AsyncProducer @@ -39,20 +40,44 @@ type KafkaDriver struct { q chan bool } +type KafkaSASLAlgorithm string + +const ( + KAFKA_SASL_NONE KafkaSASLAlgorithm = "none" + KAFKA_SASL_PLAIN KafkaSASLAlgorithm = "plain" + KAFKA_SASL_SCRAM_SHA256 KafkaSASLAlgorithm = "scram-sha256" + KAFKA_SASL_SCRAM_SHA512 KafkaSASLAlgorithm = "scram-sha512" +) + var ( compressionCodecs = map[string]sarama.CompressionCodec{ - strings.ToLower(sarama.CompressionNone.String()): sarama.CompressionNone, - strings.ToLower(sarama.CompressionGZIP.String()): sarama.CompressionGZIP, + strings.ToLower(sarama.CompressionNone.String()): sarama.CompressionNone, + strings.ToLower(sarama.CompressionGZIP.String()): sarama.CompressionGZIP, strings.ToLower(sarama.CompressionSnappy.String()): sarama.CompressionSnappy, - strings.ToLower(sarama.CompressionLZ4.String()): sarama.CompressionLZ4, - strings.ToLower(sarama.CompressionZSTD.String()): sarama.CompressionZSTD, + strings.ToLower(sarama.CompressionLZ4.String()): sarama.CompressionLZ4, + strings.ToLower(sarama.CompressionZSTD.String()): sarama.CompressionZSTD, + } + + saslAlgorithms = map[KafkaSASLAlgorithm]bool{ + KAFKA_SASL_PLAIN: true, + KAFKA_SASL_SCRAM_SHA256: true, + KAFKA_SASL_SCRAM_SHA512: true, + } + saslAlgorithmsList = []string{ + string(KAFKA_SASL_NONE), + string(KAFKA_SASL_PLAIN), + string(KAFKA_SASL_SCRAM_SHA256), + string(KAFKA_SASL_SCRAM_SHA512), } ) func (d *KafkaDriver) Prepare() error { flag.BoolVar(&d.kafkaTLS, "transport.kafka.tls", false, "Use TLS to connect to Kafka") + flag.StringVar(&d.kafkaSASL, "transport.kafka.sasl", "none", + fmt.Sprintf( + "Use SASL to connect to Kafka, available settings: %s (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)", + strings.Join(saslAlgorithmsList, ", "))) - flag.BoolVar(&d.kafkaSASL, "transport.kafka.sasl", false, "Use SASL/PLAIN data to connect to Kafka (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)") flag.StringVar(&d.kafkaTopic, "transport.kafka.topic", "flow-messages", "Kafka topic to produce to") flag.StringVar(&d.kafkaSrv, "transport.kafka.srv", "", "SRV record containing a list of Kafka brokers (or use brokers)") flag.StringVar(&d.kafkaBrk, "transport.kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas") @@ -86,14 +111,14 @@ func (d *KafkaDriver) Init(context.Context) error { if d.kafkaCompressionCodec != "" { /* - // when upgrading sarama, replace with: - // note: if the library adds more codecs, they will be supported natively - var cc *sarama.CompressionCodec + // when upgrading sarama, replace with: + // note: if the library adds more codecs, they will be supported natively + var cc *sarama.CompressionCodec - if err := cc.UnmarshalText([]byte(d.kafkaCompressionCodec)); err != nil { - return err - } - kafkaConfig.Producer.Compression = *cc + if err := cc.UnmarshalText([]byte(d.kafkaCompressionCodec)); err != nil { + return err + } + kafkaConfig.Producer.Compression = *cc */ if cc, ok := compressionCodecs[strings.ToLower(d.kafkaCompressionCodec)]; !ok { @@ -102,7 +127,7 @@ func (d *KafkaDriver) Init(context.Context) error { kafkaConfig.Producer.Compression = cc } } - + if d.kafkaTLS { rootCAs, err := x509.SystemCertPool() if err != nil { @@ -116,17 +141,34 @@ func (d *KafkaDriver) Init(context.Context) error { kafkaConfig.Producer.Partitioner = sarama.NewHashPartitioner } - if d.kafkaSASL { - if !d.kafkaTLS /*&& log != nil*/ { - log.Warn("Using SASL without TLS will transmit the authentication in plaintext!") + kafkaSASL := KafkaSASLAlgorithm(d.kafkaSASL) + if d.kafkaSASL != "" && kafkaSASL != KAFKA_SASL_NONE { + _, ok := saslAlgorithms[KafkaSASLAlgorithm(strings.ToLower(d.kafkaSASL))] + if !ok { + return errors.New("SASL algorithm does not exist") } + kafkaConfig.Net.SASL.Enable = true kafkaConfig.Net.SASL.User = os.Getenv("KAFKA_SASL_USER") kafkaConfig.Net.SASL.Password = os.Getenv("KAFKA_SASL_PASS") if kafkaConfig.Net.SASL.User == "" && kafkaConfig.Net.SASL.Password == "" { return errors.New("Kafka SASL config from environment was unsuccessful. KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set.") - } else /*if log != nil*/ { - log.Infof("Authenticating as user '%s'...", kafkaConfig.Net.SASL.User) + } + + if kafkaSASL == KAFKA_SASL_SCRAM_SHA256 || kafkaSASL == KAFKA_SASL_SCRAM_SHA512 { + kafkaConfig.Net.SASL.Handshake = true + + if kafkaSASL == KAFKA_SASL_SCRAM_SHA512 { + kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA512} + } + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + } else if kafkaSASL == KAFKA_SASL_SCRAM_SHA256 { + kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA256} + } + kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + } } } diff --git a/transport/kafka/scram_client.go b/transport/kafka/scram_client.go new file mode 100644 index 0000000..1490f6a --- /dev/null +++ b/transport/kafka/scram_client.go @@ -0,0 +1,39 @@ +package kafka + +// From https://github.com/Shopify/sarama/blob/main/examples/sasl_scram_client/scram_client.go + +import ( + "crypto/sha256" + "crypto/sha512" + + "github.com/xdg-go/scram" +) + +var ( + SHA256 scram.HashGeneratorFcn = sha256.New + SHA512 scram.HashGeneratorFcn = sha512.New +) + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +}