Skip to content

Commit

Permalink
Merge pull request #231 from grafana/upstream-ip
Browse files Browse the repository at this point in the history
Allow interception of port-forwarded traffic by making the proxy target the pod IP
  • Loading branch information
pablochacin authored Jul 13, 2023
2 parents 796b721 + 6d11898 commit 43917d2
Show file tree
Hide file tree
Showing 17 changed files with 399 additions and 202 deletions.
30 changes: 18 additions & 12 deletions cmd/agent/commands/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"fmt"
"net"
"time"

"github.com/grafana/xk6-disruptor/pkg/agent"
Expand All @@ -14,28 +15,35 @@ import (
)

// BuildGrpcCmd returns a cobra command with the specification of the grpc command
//
//nolint:funlen
func BuildGrpcCmd(env runtime.Environment, config *agent.Config) *cobra.Command {
disruption := grpc.Disruption{}
var duration time.Duration
var port uint
var target uint
var iface string
var upstreamHost string
var targetPort uint
transparent := true

//nolint: dupl
cmd := &cobra.Command{
Use: "grpc",
Short: "grpc disruptor",
Long: "Disrupts http request by introducing delays and errors." +
" When running as a transparent proxy requires NET_ADMIM capabilities for setting" +
" iptable rules.",
RunE: func(cmd *cobra.Command, args []string) error {
if target == 0 {
if targetPort == 0 {
return fmt.Errorf("target port for fault injection is required")
}
listenAddress := fmt.Sprintf(":%d", port)
upstreamAddress := fmt.Sprintf("%s:%d", upstreamHost, target)

if transparent && (upstreamHost == "localhost" || upstreamHost == "127.0.0.1") {
// When running in transparent mode, the Redirector will also redirect traffic directed to 127.0.0.1 to
// the proxy. Using 127.0.0.1 as the proxy upstream would cause a redirection loop.
return fmt.Errorf("upstream host cannot be localhost when running in transparent mode")
}

listenAddress := net.JoinHostPort("", fmt.Sprint(port))
upstreamAddress := net.JoinHostPort(upstreamHost, fmt.Sprint(targetPort))

proxyConfig := grpc.ProxyConfig{
ListenAddress: listenAddress,
Expand All @@ -51,9 +59,8 @@ func BuildGrpcCmd(env runtime.Environment, config *agent.Config) *cobra.Command
var redirector protocol.TrafficRedirector
if transparent {
tr := &iptables.TrafficRedirectionSpec{
Iface: iface,
DestinationPort: target,
RedirectPort: port,
DestinationPort: targetPort, // Redirect traffic from the application (target) port...
RedirectPort: port, // to the proxy port.
}

redirector, err = iptables.NewTrafficRedirector(tr, env.Executor())
Expand Down Expand Up @@ -84,9 +91,8 @@ func BuildGrpcCmd(env runtime.Environment, config *agent.Config) *cobra.Command
cmd.Flags().Int32VarP(&disruption.StatusCode, "status", "s", 0, "status code")
cmd.Flags().Float32VarP(&disruption.ErrorRate, "rate", "r", 0, "error rate")
cmd.Flags().StringVarP(&disruption.StatusMessage, "message", "m", "", "error message for injected faults")
cmd.Flags().StringVarP(&iface, "interface", "i", "eth0", "interface to disrupt")
cmd.Flags().UintVarP(&port, "port", "p", 8000, "port the proxy will listen to")
cmd.Flags().UintVarP(&target, "target", "t", 0, "port the proxy will redirect request to")
cmd.Flags().UintVarP(&port, "port", "p", 8080, "port the proxy will listen to")
cmd.Flags().UintVarP(&targetPort, "target", "t", 0, "port the proxy will redirect request to")
cmd.Flags().StringSliceVarP(&disruption.Excluded, "exclude", "x", []string{}, "comma-separated list of grpc services"+
" to be excluded from disruption")
cmd.Flags().BoolVar(&transparent, "transparent", true, "run as transparent proxy")
Expand Down
30 changes: 18 additions & 12 deletions cmd/agent/commands/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"fmt"
"net"
"time"

"github.com/grafana/xk6-disruptor/pkg/agent"
Expand All @@ -13,28 +14,35 @@ import (
)

// BuildHTTPCmd returns a cobra command with the specification of the http command
//
//nolint:funlen
func BuildHTTPCmd(env runtime.Environment, config *agent.Config) *cobra.Command {
disruption := http.Disruption{}
var duration time.Duration
var port uint
var target uint
var iface string
var upstreamHost string
var targetPort uint
transparent := true

//nolint: dupl
cmd := &cobra.Command{
Use: "http",
Short: "http disruptor",
Long: "Disrupts http request by introducing delays and errors." +
" When running as a transparent proxy requires NET_ADMIM capabilities for setting" +
" iptable rules.",
RunE: func(cmd *cobra.Command, args []string) error {
if target == 0 {
if targetPort == 0 {
return fmt.Errorf("target port for fault injection is required")
}
listenAddress := fmt.Sprintf(":%d", port)
upstreamAddress := fmt.Sprintf("http://%s:%d", upstreamHost, target)

if transparent && (upstreamHost == "localhost" || upstreamHost == "127.0.0.1") {
// When running in transparent mode, the Redirector will also redirect traffic directed to 127.0.0.1 to
// the proxy. Using 127.0.0.1 as the proxy upstream would cause a redirection loop.
return fmt.Errorf("upstream host cannot be localhost when running in transparent mode")
}

listenAddress := net.JoinHostPort("", fmt.Sprint(port))
upstreamAddress := "http://" + net.JoinHostPort(upstreamHost, fmt.Sprint(targetPort))

proxyConfig := http.ProxyConfig{
ListenAddress: listenAddress,
Expand All @@ -50,9 +58,8 @@ func BuildHTTPCmd(env runtime.Environment, config *agent.Config) *cobra.Command
var redirector protocol.TrafficRedirector
if transparent {
tr := &iptables.TrafficRedirectionSpec{
Iface: iface,
DestinationPort: target,
RedirectPort: port,
DestinationPort: targetPort, // Redirect traffic from the application (target) port...
RedirectPort: port, // to the proxy port.
}

redirector, err = iptables.NewTrafficRedirector(tr, env.Executor())
Expand Down Expand Up @@ -89,9 +96,8 @@ func BuildHTTPCmd(env runtime.Environment, config *agent.Config) *cobra.Command
cmd.Flags().BoolVar(&transparent, "transparent", true, "run as transparent proxy")
cmd.Flags().StringVar(&upstreamHost, "upstream-host", "localhost",
"upstream host to redirect traffic to")
cmd.Flags().StringVarP(&iface, "interface", "i", "eth0", "interface to disrupt")
cmd.Flags().UintVarP(&port, "port", "p", 8000, "port the proxy will listen to")
cmd.Flags().UintVarP(&target, "target", "t", 0, "port the proxy will redirect request to")
cmd.Flags().UintVarP(&port, "port", "p", 8080, "port the proxy will listen to")
cmd.Flags().UintVarP(&targetPort, "target", "t", 0, "port the proxy will redirect request to")

return cmd
}
58 changes: 47 additions & 11 deletions e2e/agent/agent_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package e2e

import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
Expand All @@ -14,6 +16,7 @@ import (
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/cluster"
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/deploy"
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/fixtures"
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/kubectl"
"github.com/grafana/xk6-disruptor/pkg/testutils/e2e/kubernetes/namespace"
"github.com/grafana/xk6-disruptor/pkg/testutils/kubernetes/builders"

Expand All @@ -29,11 +32,14 @@ var injectHTTP500 = []string{
"--rate",
"1.0",
"--error",
"500",
"418",
"--port",
"8080",
"--target",
"80",
"--upstream-host",
// POD_IP is injected in the container and take its value from status.podIP
"$(POD_IP)",
}

var injectGrpcInternal = []string{
Expand All @@ -51,6 +57,9 @@ var injectGrpcInternal = []string{
"4000",
"--target",
"9000",
"--upstream-host",
// POD_IP is injected in the container and take its value from status.podIP
"$(POD_IP)",
"-x",
// exclude reflection service otherwise the dynamic client will not work
"grpc.reflection.v1alpha.ServerReflection,grpc.reflection.v1.ServerReflection",
Expand Down Expand Up @@ -82,6 +91,7 @@ func buildHttpbinPodWithDisruptorAgent(cmd []string) *corev1.Pod {

agent := builders.NewContainerBuilder("xk6-disruptor-agent").
WithImage("ghcr.io/grafana/xk6-disruptor-agent").
WithEnvVarFromField("POD_IP", "status.podIP").
WithCommand(cmd...).
WithCapabilities("NET_ADMIN").
Build()
Expand All @@ -106,6 +116,7 @@ func buildGrpcbinPodWithDisruptorAgent(cmd []string) *corev1.Pod {

agent := builders.NewContainerBuilder("xk6-disruptor-agent").
WithImage("ghcr.io/grafana/xk6-disruptor-agent").
WithEnvVarFromField("POD_IP", "status.podIP").
WithCommand(cmd...).
WithCapabilities("NET_ADMIN").
Build()
Expand All @@ -121,10 +132,8 @@ func buildGrpcbinPodWithDisruptorAgent(cmd []string) *corev1.Pod {
Build()
}


// deploy pod with the xk6-disruptor
func buildDisruptorAgentPod(cmd []string) *corev1.Pod {

agent := builders.NewContainerBuilder("xk6-disruptor-agent").
WithImage("ghcr.io/grafana/xk6-disruptor-agent").
WithPort("http", 80).
Expand All @@ -142,7 +151,6 @@ func buildDisruptorAgentPod(cmd []string) *corev1.Pod {
Build()
}


// builDisruptorService returns a Service definition that exposes httpbin pods
func builDisruptorService() *corev1.Service {
return builders.NewServiceBuilder("xk6-disruptor").
Expand Down Expand Up @@ -200,7 +208,7 @@ func Test_Agent(t *testing.T) {
Service: "httpbin",
Port: 80,
Path: "/status/200",
ExpectedCode: 500,
ExpectedCode: 418,
},
},
{
Expand Down Expand Up @@ -242,11 +250,40 @@ func Test_Agent(t *testing.T) {
return
}

err = tc.check.Verify(k8s, cluster.Ingress(), namespace)
if err != nil {
t.Errorf("failed : %v", err)
return
}
t.Run("via ingress", func(t *testing.T) {
t.Parallel()

err := tc.check.Verify(k8s, cluster.Ingress(), namespace)
if err != nil {
t.Errorf("failed : %v", err)
return
}
})

t.Run("via port-forward", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
})

kc, err := kubectl.NewFromKubeconfig(ctx, cluster.Kubeconfig())
if err != nil {
t.Fatalf("creating kubectl client from kubeconfig: %v", err)
}

port, err := kc.ForwardPodPort(ctx, namespace, tc.pod.Name, uint(tc.port))
if err != nil {
t.Fatalf("forwarding port from %s/%s: %v", namespace, tc.pod, err)
}

err = tc.check.Verify(k8s, net.JoinHostPort("localhost", fmt.Sprint(port)), namespace)
if err != nil {
t.Errorf("failed to access service: %v", err)
return
}
})
})
}
})
Expand Down Expand Up @@ -285,7 +322,6 @@ func Test_Agent(t *testing.T) {
}
})


t.Run("Non-transparent proxy to upstream service", func(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 0 additions & 4 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func Test_JsPodDisruptor(t *testing.T) {
const faultOpts = {
proxyPort: 8080,
iface: "eth0"
}
d.injectHTTPFaults(fault, "1s", faultOpts)
Expand Down Expand Up @@ -298,7 +297,6 @@ func Test_JsPodDisruptor(t *testing.T) {
const faultOpts = {
proxyPort: 4000,
iface: "eth0"
}
d.injectGrpcFaults(fault, "1s", faultOpts)
Expand Down Expand Up @@ -337,7 +335,6 @@ func Test_JsPodDisruptor(t *testing.T) {
const faultOpts = {
proxyPort: 4000,
iface: "eth0"
}
d.injectGrpcFaults(fault)
Expand All @@ -359,7 +356,6 @@ func Test_JsPodDisruptor(t *testing.T) {
const faultOpts = {
proxyPort: 4000,
iface: "eth0"
}
d.injectGrpcFaults(fault, "1") // missing duration unit
Expand Down
8 changes: 4 additions & 4 deletions pkg/disruptors/cmd_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func buildGrpcFaultCmd(fault GrpcFault, duration time.Duration, options GrpcDisr
cmd = append(cmd, "-p", fmt.Sprint(options.ProxyPort))
}

if options.Iface != "" {
cmd = append(cmd, "-i", options.Iface)
if options.TargetAddress != "" {
cmd = append(cmd, "--upstream-host", options.TargetAddress)
}

return cmd
Expand Down Expand Up @@ -101,8 +101,8 @@ func buildHTTPFaultCmd(fault HTTPFault, duration time.Duration, options HTTPDisr
cmd = append(cmd, "-p", fmt.Sprint(options.ProxyPort))
}

if options.Iface != "" {
cmd = append(cmd, "-i", options.Iface)
if options.TargetAddress != "" {
cmd = append(cmd, "--upstream-host", options.TargetAddress)
}

return cmd
Expand Down
2 changes: 2 additions & 0 deletions pkg/disruptors/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ func Test_InjectAgent(t *testing.T) {
pods: []*corev1.Pod{
builders.NewPodBuilder("pod1").
WithNamespace("test-ns").
WithIP("192.0.2.6").
Build(),
builders.NewPodBuilder("pod2").
WithNamespace("test-ns").
WithIP("192.0.2.6").
Build(),
},
timeout: -1,
Expand Down
Loading

0 comments on commit 43917d2

Please sign in to comment.