Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Add TLS & SASL configuration for existing kafka brokers (#799)
Browse files Browse the repository at this point in the history
* Add TLS & SASL configuraiton for existing kafka brokers

* Added some unit tests and refactor some code to add KAFKA_ENABLE_TLS && KAFKA_ENABLE_SASL

* Cleanup kafka utils tests & move sarama config to init function

* Set MinVersion of kafka when use SASL (see https://github.com/Shopify/sarama/blob/master/sasl_handshake_request.go#L32)
  • Loading branch information
ryarnyah authored and andresmgot committed Jun 15, 2018
1 parent e993277 commit 2b9b08e
Show file tree
Hide file tree
Showing 4 changed files with 556 additions and 6 deletions.
40 changes: 40 additions & 0 deletions docs/use-existing-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,43 @@ Open another terminal and check for the pubsub function log to see if it receive
$ kubectl logs -f pubsub-python-5445bdcb64-48bv2
hello world
```
When using SASL you must add `KAFKA_ENABLE_SASL`, `KAFKA_USERNAME` and `KAFKA_PASSWORD` env var to set authentification (might use a secret).:
```yaml
$ echo '
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
labels:
kubeless: kafka-trigger-controller
name: kafka-trigger-controller
namespace: kubeless
spec:
selector:
matchLabels:
kubeless: kafka-trigger-controller
template:
metadata:
labels:
kubeless: kafka-trigger-controller
spec:
containers:
- image: bitnami/kafka-trigger-controller:latest
imagePullPolicy: IfNotPresent
name: kafka-trigger-controller
env:
...
- name: KAFKA_ENABLE_SASL
value: true # CHANGE THIS!
- name: KAFKA_USERNAME
value: kafka-sasl-username # CHANGE THIS!
- name: KAFKA_PASSWORD
value: kafka-sasl-password # CHANGE THIS!
...
```

When using SSL to secure kafka communication, you must set `KAFKA_ENABLE_TLS`, and specify some of these:
* `KAFKA_CACERTS` to check server certificate
* `KAFKA_CERT` and `KAFKA_KEY` to check client certificate
* `KAFKA_INSECURE` to skip TLS verfication
32 changes: 26 additions & 6 deletions pkg/event-consumers/kafka/kafka-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kafka

import (
"os"
"strconv"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
Expand All @@ -31,29 +32,48 @@ var (
stoppedM map[string](chan struct{})
consumerM map[string]bool
brokers string
config *cluster.Config
)

func init() {
stopM = make(map[string](chan struct{}))
stoppedM = make(map[string](chan struct{}))
consumerM = make(map[string]bool)

// Init config
// taking brokers from env var
brokers = os.Getenv("KAFKA_BROKERS")
if brokers == "" {
brokers = "kafka.kubeless:9092"
}
}

// createConsumerProcess gets messages to a Kafka topic from the broker and send the payload to function service
func createConsumerProcess(broker, topic, funcName, ns, consumerGroupID string, clientset kubernetes.Interface, stopchan, stoppedchan chan struct{}) {
// Init config
config := cluster.NewConfig()
config = cluster.NewConfig()

config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest

var err error

if enableTLS, _ := strconv.ParseBool(os.Getenv("KAFKA_ENABLE_TLS")); enableTLS {
config.Net.TLS.Enable = true
config.Net.TLS.Config, err = GetTLSConfiguration(os.Getenv("KAFKA_CACERTS"), os.Getenv("KAFKA_CERT"), os.Getenv("KAFKA_KEY"), os.Getenv("KAFKA_INSECURE"))
if err != nil {
logrus.Fatalf("Failed to set tls configuration: %v", err)
}
}
if enableSASL, _ := strconv.ParseBool(os.Getenv("KAFKA_ENABLE_SASL")); enableSASL {
config.Net.SASL.Enable = true
config.Version = sarama.V0_10_0_0
config.Net.SASL.User, config.Net.SASL.Password, err = GetSASLConfiguration(os.Getenv("KAFKA_USERNAME"), os.Getenv("KAFKA_PASSWORD"))
if err != nil {
logrus.Fatalf("Failed to set SASL configuration: %v", err)
}
}

}

// createConsumerProcess gets messages to a Kafka topic from the broker and send the payload to function service
func createConsumerProcess(broker, topic, funcName, ns, consumerGroupID string, clientset kubernetes.Interface, stopchan, stoppedchan chan struct{}) {
// Init consumer
brokersSlice := []string{broker}
topicsSlice := []string{topic}
Expand Down
77 changes: 77 additions & 0 deletions pkg/event-consumers/kafka/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright (c) 2016-2017 Bitnami
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

import (
"crypto/tls"
"crypto/x509"
"errors"
"io/ioutil"
"strconv"
)

var (
// ErrorCrtFileMandatory when private key is provided but certificate not.
ErrorCrtFileMandatory = errors.New("client certificate is mandatory when private key file is provided")
// ErrorKeyFileMandatory when certificate is provided but private not.
ErrorKeyFileMandatory = errors.New("client private key is mandatory when certificate file is provided")
// ErrorUsernameOrPasswordMandatory when username or password is not provided
ErrorUsernameOrPasswordMandatory = errors.New("username and password is mandatory")
)

// GetTLSConfiguration build TLS configuration for kafka, return tlsConfig associated with kafka connection.
func GetTLSConfiguration(caFile string, certFile string, keyFile string, insecure string) (*tls.Config, error) {

if certFile != "" && keyFile == "" {
return nil, ErrorKeyFileMandatory
}
if keyFile != "" && certFile == "" {
return nil, ErrorCrtFileMandatory
}

isInsecure, _ := strconv.ParseBool(insecure)
t := &tls.Config{}

if caFile != "" {
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t.RootCAs = caCertPool
}

if certFile != "" && keyFile != "" {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
t.Certificates = []tls.Certificate{cert}
}

t.InsecureSkipVerify = isInsecure
return t, nil
}

// GetSASLConfiguration build SASL configuration for kafka.
func GetSASLConfiguration(username string, password string) (string, string, error) {
if username == "" || password == "" {
return "", "", ErrorUsernameOrPasswordMandatory
}
return username, password, nil
}
Loading

0 comments on commit 2b9b08e

Please sign in to comment.