Skip to content

Commit

Permalink
updated initProvider function of the Virtual Kubelet to handle mTLS a…
Browse files Browse the repository at this point in the history
…uthentication; updated the name of the OTLP Service
  • Loading branch information
Bianco95 committed Sep 6, 2024
1 parent 24c4597 commit 18538de
Showing 1 changed file with 76 additions and 4 deletions.
80 changes: 76 additions & 4 deletions cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"net"
"os"
"path"
Expand Down Expand Up @@ -64,6 +66,8 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/google/uuid"
)

func PodInformerFilter(node string) informers.SharedInformerOption {
Expand Down Expand Up @@ -94,10 +98,29 @@ type Opts struct {
}

func initProvider(ctx context.Context) (func(context.Context) error, error) {

log.G(ctx).Info("Tracing is enabled, setting up the TracerProvider")

// Get the TELEMETRY_UNIQUE_ID from the environment, if it is not set, use the hostname
uniqueID := os.Getenv("TELEMETRY_UNIQUE_ID")
if uniqueID == "" {
log.G(ctx).Info("No TELEMETRY_UNIQUE_ID set, generating a new one")
newUUID := uuid.New()
uniqueID = newUUID.String()
log.G(ctx).Info("Generated unique ID: ", uniqueID, " use VK-InterLink-"+uniqueID+" as service name from Grafana")
}

// Create a new resource with the service name set to the TELEMETRY_UNIQUE_ID
// The nomenclature VK-InterLink-<TELEMETRY_UNIQUE_ID> is used to identify the service in Grafana.
// VK-InterLink-<TELEMETRY_UNIQUE_ID> means that the traces are coming from Virtual Kubelet
// and are related to the call that are made for the InterLink API service

serviceName := "VK-InterLink-" + uniqueID

res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("InterLink-VK"),
semconv.ServiceName(serviceName),
),
)
if err != nil {
Expand All @@ -113,11 +136,60 @@ func initProvider(ctx context.Context) (func(context.Context) error, error) {
otlpEndpoint = "localhost:4317"
}

fmt.Println("TELEMETRY_ENDPOINT: ", otlpEndpoint)
log.G(ctx).Info("TELEMETRY_ENDPOINT: ", otlpEndpoint)

caCrtFilePath := os.Getenv("TELEMETRY_CA_CRT_FILEPATH")

conn := &grpc.ClientConn{}
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(creds), grpc.WithBlock())
if caCrtFilePath != "" {

// if the CA certificate is provided, set up mutual TLS

log.G(ctx).Info("CA certificate provided, setting up mutual TLS")

caCert, err := ioutil.ReadFile(caCrtFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load CA certificate: %w", err)
}

clientKeyFilePath := os.Getenv("TELEMETRY_CLIENT_KEY_FILEPATH")
if clientKeyFilePath == "" {
return nil, fmt.Errorf("client key file path not provided. Since a CA certificate is provided, a client key is required for mutual TLS")
}

clientCrtFilePath := os.Getenv("TELEMETRY_CLIENT_CRT_FILEPATH")
if clientCrtFilePath == "" {
return nil, fmt.Errorf("client certificate file path not provided. Since a CA certificate is provided, a client certificate is required for mutual TLS")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA certificate")
}

cert, err := tls.LoadX509KeyPair(clientCrtFilePath, clientKeyFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: true,
}
creds := credentials.NewTLS(tlsConfig)

conn, err = grpc.DialContext(ctx,
otlpEndpoint,
grpc.WithTransportCredentials(creds),
grpc.WithBlock())

} else {
// if the CA certificate is not provided, use an insecure connection
// this means that the telemetry collector is not using a certificate, i.e. is inside the k8s cluster
conn, err = grpc.DialContext(ctx, otlpEndpoint, grpc.WithInsecure(), grpc.WithBlock())
}

if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
Expand Down

0 comments on commit 18538de

Please sign in to comment.