diff --git a/client/pkg/transport/transport.go b/client/pkg/transport/transport.go index 67170d7436d0..74e63272ec1c 100644 --- a/client/pkg/transport/transport.go +++ b/client/pkg/transport/transport.go @@ -18,12 +18,29 @@ import ( "context" "net" "net/http" + "net/url" + "os" "strings" "time" ) type unixTransport struct{ *http.Transport } +var httpTransportProxyParsingFunc = determineHTTPTransportProxyParsingFunc + +func determineHTTPTransportProxyParsingFunc() func(req *http.Request) (*url.URL, error) { + // according to the comment of http.ProxyFromEnvironment: if the proxy URL is "localhost" + // (with or without a port number), then a nil URL and nil error will be returned. + // Thus, we workaround this limitation by manually setting an ENV named E2E_TEST_FORWARD_PROXY_IP + // and parse the URL (which is a localhost in our case) + if forwardProxy, exists := os.LookupEnv("E2E_TEST_FORWARD_PROXY_IP"); exists { + return func(req *http.Request) (*url.URL, error) { + return url.Parse(forwardProxy) + } + } + return http.ProxyFromEnvironment +} + func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, error) { cfg, err := info.ClientConfig() if err != nil { @@ -39,7 +56,7 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er } t := &http.Transport{ - Proxy: http.ProxyFromEnvironment, + Proxy: httpTransportProxyParsingFunc(), DialContext: (&net.Dialer{ Timeout: dialtimeoutd, LocalAddr: ipAddr, @@ -60,7 +77,7 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er return dialer.DialContext(ctx, "unix", addr) } tu := &http.Transport{ - Proxy: http.ProxyFromEnvironment, + Proxy: httpTransportProxyParsingFunc(), DialContext: dialContext, TLSHandshakeTimeout: 10 * time.Second, TLSClientConfig: cfg, diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index bc71c3a16605..2d9534df7ed4 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -19,6 +19,8 @@ import ( "errors" "fmt" "io" + "log" + "math/bits" mrand "math/rand" "net" "net/http" @@ -44,11 +46,17 @@ var ( // latency spikes and packet drop or corruption. The proxy overhead is very // small overhead (<500μs per request). Please run tests to compute actual // overhead. +// +// Note that the current implementation is a forward proxy, thus, unix socket +// is not supported, due to the forwarding is done in L7, which requires +// properly constructed HTTP header and body +// +// Also, because we are forced to use TLS to communicate with the proxy server +// and using well-formed header to talk to the destination server, +// we can't do random modification on the data on-the-fly anymore. type Server interface { - // From returns proxy source address in "scheme://host:port" format. - From() string - // To returns proxy destination address in "scheme://host:port" format. - To() string + // Listen returns proxy listen address in "scheme://host:port" format. + Listen() string // Ready returns when proxy is ready to serve. Ready() <-chan struct{} @@ -115,6 +123,16 @@ type Server interface { // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() + // BlackholePeerTx drops all outgoing traffic of a peer. + BlackholePeerTx(peer url.URL) + // UnblackholePeerTx removes blackhole operation on "sending". + UnblackholePeerTx(peer url.URL) + + // BlackholePeerTx drops all incoming traffic of a peer. + BlackholePeerRx(peer url.URL) + // UnblackholePeerRx removes blackhole operation on "receiving". + UnblackholePeerRx(peer url.URL) + // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. PauseTx() // UnpauseTx removes "forwarding" pause operation. @@ -124,29 +142,29 @@ type Server interface { PauseRx() // UnpauseRx removes "receiving" pause operation. UnpauseRx() - - // ResetListener closes and restarts listener. - ResetListener() error } // ServerConfig defines proxy server configuration. type ServerConfig struct { Logger *zap.Logger - From url.URL - To url.URL + Listen url.URL TLSInfo transport.TLSInfo DialTimeout time.Duration BufferSize int RetryInterval time.Duration } +const ( + blackholePeerTypeNone uint8 = iota + blackholePeerTypeTx + blackholePeerTypeRx +) + type server struct { lg *zap.Logger - from url.URL - fromPort int - to url.URL - toPort int + listen url.URL + listenPort int tlsInfo transport.TLSInfo dialTimeout time.Duration @@ -158,11 +176,12 @@ type server struct { donec chan struct{} errc chan error - closeOnce sync.Once - closeWg sync.WaitGroup + closeOnce sync.Once + closeWg sync.WaitGroup + closeHijackedConn sync.WaitGroup listenerMu sync.RWMutex - listener net.Listener + listener *customListener pauseAcceptMu sync.Mutex pauseAcceptc chan struct{} @@ -187,6 +206,11 @@ type server struct { latencyRxMu sync.RWMutex latencyRx time.Duration + + blackholePeerMap map[int]uint8 // port number, blackhole type + blackholePeerMapMu sync.RWMutex + + httpServer *http.Server } // NewServer returns a proxy implementation with no iptables/tc dependencies. @@ -195,8 +219,7 @@ func NewServer(cfg ServerConfig) Server { s := &server{ lg: cfg.Logger, - from: cfg.From, - to: cfg.To, + listen: cfg.Listen, tlsInfo: cfg.TLSInfo, dialTimeout: cfg.DialTimeout, @@ -211,18 +234,13 @@ func NewServer(cfg ServerConfig) Server { pauseAcceptc: make(chan struct{}), pauseTxc: make(chan struct{}), pauseRxc: make(chan struct{}), - } - _, fromPort, err := net.SplitHostPort(cfg.From.Host) - if err == nil { - s.fromPort, _ = strconv.Atoi(fromPort) - } - var toPort string - _, toPort, err = net.SplitHostPort(cfg.To.Host) - if err == nil { - s.toPort, _ = strconv.Atoi(toPort) + blackholePeerMap: make(map[int]uint8), } + var err error + var fromPort string + if s.dialTimeout == 0 { s.dialTimeout = defaultDialTimeout } @@ -237,183 +255,276 @@ func NewServer(cfg ServerConfig) Server { close(s.pauseTxc) close(s.pauseRxc) - if strings.HasPrefix(s.from.Scheme, "http") { - s.from.Scheme = "tcp" - } - if strings.HasPrefix(s.to.Scheme, "http") { - s.to.Scheme = "tcp" - } + // L7 is http (scheme), L4 is tcp (network listener) + addr := "" + if strings.HasPrefix(s.listen.Scheme, "http") { + s.listen.Scheme = "tcp" - addr := fmt.Sprintf(":%d", s.fromPort) - if s.fromPort == 0 { // unix - addr = s.from.Host + if _, fromPort, err = net.SplitHostPort(cfg.Listen.Host); err != nil { + s.errc <- err + s.Close() + return nil + } + if s.listenPort, err = strconv.Atoi(fromPort); err != nil { + s.errc <- err + s.Close() + return nil + } + + addr = fmt.Sprintf(":%d", s.listenPort) + } else { + panic(fmt.Sprintf("%s is not supported", s.listen.Scheme)) } + s.closeWg.Add(1) var ln net.Listener if !s.tlsInfo.Empty() { - ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo) + ln, err = transport.NewListener(addr, s.listen.Scheme, &s.tlsInfo) } else { - ln, err = net.Listen(s.from.Scheme, addr) + ln, err = net.Listen(s.listen.Scheme, addr) } if err != nil { s.errc <- err s.Close() - return s + return nil } - s.listener = ln - s.closeWg.Add(1) - go s.listenAndServe() + s.listener = &customListener{ + s: s, + l: &ln, + } - s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To())) + go func() { + defer s.closeWg.Done() + + s.httpServer = &http.Server{ + Handler: &serverHandler{s: s}, + } + + s.lg.Info("proxy is listening on", zap.String("listen on", s.Listen())) + close(s.readyc) + if err := s.httpServer.Serve(s.listener); err != http.ErrServerClosed { + // always returns error. ErrServerClosed on graceful close + panic(fmt.Sprintf("startHTTPServer Serve(): %v", err)) + } + }() + + s.lg.Info("started proxying", zap.String("listen on", s.Listen())) return s } -func (s *server) From() string { - return fmt.Sprintf("%s://%s", s.from.Scheme, s.from.Host) +// Because we are implementing L7 proxy, but would like to keep the L4 features, +// thus, we need to encapsulate the L4 functionalities in our custom Listener +type customListener struct { + s *server + l *net.Listener } -func (s *server) To() string { - return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host) +func (c *customListener) Accept() (net.Conn, error) { + // we implement the L4 features here (pause / latency accept) + c.s.pauseAcceptMu.Lock() + pausec := c.s.pauseAcceptc + c.s.pauseAcceptMu.Unlock() + select { + case <-pausec: + case <-c.s.donec: + return nil, fmt.Errorf("listener is closed") + } + + c.s.latencyAcceptMu.RLock() + lat := c.s.latencyAccept + c.s.lg.Info( + "get accept latency", + zap.Duration("latency", lat), + ) + c.s.latencyAcceptMu.RUnlock() + if lat > 0 { + select { + case <-time.After(lat): + case <-c.s.donec: + return nil, fmt.Errorf("listener is closed") + } + } + + c.s.listenerMu.RLock() + conn, err := (*c.l).Accept() + c.s.listenerMu.RUnlock() + if err != nil { + select { + case c.s.errc <- err: + select { + case <-c.s.donec: + return nil, err + default: + } + case <-c.s.donec: + return nil, err + } + c.s.lg.Debug("listener accept error", zap.Error(err)) + + if strings.HasSuffix(err.Error(), "use of closed network connection") { + select { + case <-time.After(c.s.retryInterval): + case <-c.s.donec: + return nil, err + } + c.s.lg.Debug("listener is closed") + } + } + + return conn, err } -// TODO: implement packet reordering from multiple TCP connections -// buffer packets per connection for awhile, reorder before transmit -// - https://github.com/etcd-io/etcd/issues/5614 -// - https://github.com/etcd-io/etcd/pull/6918#issuecomment-264093034 +// Close closes the listener. +// Any blocked Accept operations will be unblocked and return errors. +func (c *customListener) Close() error { + c.s.listenerMu.RLock() + defer c.s.listenerMu.RUnlock() + return (*c.l).Close() +} -func (s *server) listenAndServe() { - defer s.closeWg.Done() +// Addr returns the listener's network address. +func (c *customListener) Addr() net.Addr { + c.s.listenerMu.RLock() + defer c.s.listenerMu.RUnlock() + return (*c.l).Addr() +} - ctx := context.Background() - s.lg.Info("proxy is listening on", zap.String("from", s.From())) - close(s.readyc) +type serverHandler struct { + s *server +} - for { - s.pauseAcceptMu.Lock() - pausec := s.pauseAcceptc - s.pauseAcceptMu.Unlock() +func (sh *serverHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + hijacker, _ := resp.(http.Hijacker) + in, _, err := hijacker.Hijack() + if err != nil { select { - case <-pausec: - case <-s.donec: - return - } - - s.latencyAcceptMu.RLock() - lat := s.latencyAccept - s.latencyAcceptMu.RUnlock() - if lat > 0 { + case sh.s.errc <- err: select { - case <-time.After(lat): - case <-s.donec: + case <-sh.s.donec: return + default: } + case <-sh.s.donec: + return } + sh.s.lg.Debug("ServeHTTP hijack error", zap.Error(err)) + panic(err) + } + + targetScheme := "tcp" + targetHost := req.URL.Host + ctx := context.Background() - s.listenerMu.RLock() - ln := s.listener - s.listenerMu.RUnlock() + /* + If the traffic to the destination is HTTPS, a CONNECT request will be sent + first (containing the intended destination HOST). + + If the traffic to the destination is HTTP, no CONNECT request will be sent + first. Only normal HTTP request is sent, with the HOST set to the final destination. + This will be troublesome since we need to manually forward the request to the + destination, and we can't do bte stream manipulation. + + Thus, we need to send the traffic to destination with HTTPS, allowing us to + handle byte streams. + */ + if req.Method == "CONNECT" { + // for CONNECT, we need to send 200 response back first + in.Write([]byte("HTTP/1.0 200 Connection established\r\n\r\n")) + } - in, err := ln.Accept() + var out net.Conn + if !sh.s.tlsInfo.Empty() { + var tp *http.Transport + tp, err = transport.NewTransport(sh.s.tlsInfo, sh.s.dialTimeout) if err != nil { select { - case s.errc <- err: + case sh.s.errc <- err: select { - case <-s.donec: + case <-sh.s.donec: return default: } - case <-s.donec: + case <-sh.s.donec: return } - s.lg.Debug("listener accept error", zap.Error(err)) - - if strings.HasSuffix(err.Error(), "use of closed network connection") { - select { - case <-time.After(s.retryInterval): - case <-s.donec: - return - } - s.lg.Debug("listener is closed; retry listening on", zap.String("from", s.From())) - - if err = s.ResetListener(); err != nil { - select { - case s.errc <- err: - select { - case <-s.donec: - return - default: - } - case <-s.donec: - return - } - s.lg.Warn("failed to reset listener", zap.Error(err)) - } - } - - continue + sh.s.lg.Debug("failed to get new Transport", zap.Error(err)) + return } - - var out net.Conn - if !s.tlsInfo.Empty() { - var tp *http.Transport - tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout) - if err != nil { - select { - case s.errc <- err: - select { - case <-s.donec: - return - default: - } - case <-s.donec: - return - } - continue + out, err = tp.DialContext(ctx, targetScheme, targetHost) + } else { + out, err = net.Dial(targetScheme, targetHost) + } + if err != nil { + select { + case sh.s.errc <- err: + select { + case <-sh.s.donec: + return + default: } - out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host) - } else { - out, err = net.Dial(s.to.Scheme, s.to.Host) + case <-sh.s.donec: + return } - if err != nil { + sh.s.lg.Debug("failed to dial", zap.Error(err)) + return + } + + var dstPort int + dstPort, err = getPort(out.RemoteAddr()) + if err != nil { + select { + case sh.s.errc <- err: select { - case s.errc <- err: - select { - case <-s.donec: - return - default: - } - case <-s.donec: + case <-sh.s.donec: return + default: } - s.lg.Debug("failed to dial", zap.Error(err)) - continue + case <-sh.s.donec: + return } + sh.s.lg.Debug("failed to parse port in transmit", zap.Error(err)) + return + } - s.closeWg.Add(2) - go func() { - defer s.closeWg.Done() - // read incoming bytes from listener, dispatch to outgoing connection - s.transmit(out, in) - out.Close() - in.Close() - }() - go func() { - defer s.closeWg.Done() - // read response from outgoing connection, write back to listener - s.receive(in, out) - in.Close() - out.Close() - }() + sh.s.closeHijackedConn.Add(2) + go func() { + defer sh.s.closeHijackedConn.Done() + // read incoming bytes from listener, dispatch to outgoing connection + sh.s.transmit(out, in, dstPort) + out.Close() + in.Close() + }() + go func() { + defer sh.s.closeHijackedConn.Done() + // read response from outgoing connection, write back to listener + sh.s.receive(in, out, dstPort) + in.Close() + out.Close() + }() +} + +func (s *server) Listen() string { + return fmt.Sprintf("%s://%s", s.listen.Scheme, s.listen.Host) +} + +func getPort(addr net.Addr) (int, error) { + switch addr := addr.(type) { + case *net.TCPAddr: + return addr.Port, nil + case *net.UDPAddr: + return addr.Port, nil + default: + return 0, fmt.Errorf("unsupported address type: %T", addr) } } -func (s *server) transmit(dst io.Writer, src io.Reader) { - s.ioCopy(dst, src, proxyTx) +func (s *server) transmit(dst, src net.Conn, port int) { + s.ioCopy(dst, src, proxyTx, port) } -func (s *server) receive(dst io.Writer, src io.Reader) { - s.ioCopy(dst, src, proxyRx) +func (s *server) receive(dst, src net.Conn, port int) { + s.ioCopy(dst, src, proxyRx, port) } type proxyType uint8 @@ -423,7 +534,7 @@ const ( proxyRx ) -func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { +func (s *server) ioCopy(dst, src net.Conn, ptype proxyType, peerPort int) { buf := make([]byte, s.bufferSize) for { nr1, err := src.Read(buf) @@ -464,12 +575,30 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { data = s.modifyTx(data) } s.modifyTxMu.RUnlock() + + s.blackholePeerMapMu.RLock() + // Tx from other peers is Rx for the target peer + if val, exist := s.blackholePeerMap[peerPort]; exist { + if (val & blackholePeerTypeRx) > 0 { + data = nil + } + } + s.blackholePeerMapMu.RUnlock() case proxyRx: s.modifyRxMu.RLock() if s.modifyRx != nil { data = s.modifyRx(data) } s.modifyRxMu.RUnlock() + + s.blackholePeerMapMu.RLock() + // Rx from other peers is Tx for the target peer + if val, exist := s.blackholePeerMap[peerPort]; exist { + if (val & blackholePeerTypeTx) > 0 { + data = nil + } + } + s.blackholePeerMapMu.RUnlock() default: panic("unknown proxy type") } @@ -480,16 +609,16 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { "modified tx", zap.String("data-received", humanize.Bytes(uint64(nr1))), zap.String("data-modified", humanize.Bytes(uint64(nr2))), - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listening on", s.Listen()), + zap.Int("to peer port", peerPort), ) case proxyRx: s.lg.Debug( "modified rx", zap.String("data-received", humanize.Bytes(uint64(nr1))), zap.String("data-modified", humanize.Bytes(uint64(nr2))), - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listening on", s.Listen()), + zap.Int("to peer port", peerPort), ) default: panic("unknown proxy type") @@ -607,15 +736,15 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { s.lg.Debug( "transmitted", zap.String("data-size", humanize.Bytes(uint64(nr1))), - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listening on", s.Listen()), + zap.Int("to peer port", peerPort), ) case proxyRx: s.lg.Debug( "received", zap.String("data-size", humanize.Bytes(uint64(nr1))), - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listening on", s.Listen()), + zap.Int("to peer port", peerPort), ) default: panic("unknown proxy type") @@ -629,19 +758,28 @@ func (s *server) Error() <-chan error { return s.errc } func (s *server) Close() (err error) { s.closeOnce.Do(func() { close(s.donec) - s.listenerMu.Lock() - if s.listener != nil { - err = s.listener.Close() - s.lg.Info( - "closed proxy listener", - zap.String("from", s.From()), - zap.String("to", s.To()), - ) + + // we shutdown the server + log.Println("we shutdown the server") + if err = s.httpServer.Shutdown(context.TODO()); err != nil { + return } + s.httpServer = nil + + log.Println("waiting for listenerMu") + // listener was closed by the Shutdown() call + s.listenerMu.Lock() + s.listener = nil s.lg.Sync() s.listenerMu.Unlock() + + // the hijacked connections aren't tracked by the server so we need to wait for them + log.Println("waiting for closeHijackedConn") + s.closeHijackedConn.Wait() }) - s.closeWg.Wait() + + // s.closeWg.Wait() + return err } @@ -652,8 +790,7 @@ func (s *server) PauseAccept() { s.lg.Info( "paused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listen on", s.Listen()), ) } @@ -671,8 +808,7 @@ func (s *server) UnpauseAccept() { s.lg.Info( "unpaused accept", - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listen on", s.Listen()), ) } @@ -690,8 +826,7 @@ func (s *server) DelayAccept(latency, rv time.Duration) { zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listening on", s.Listen()), ) } @@ -704,8 +839,7 @@ func (s *server) UndelayAccept() { s.lg.Info( "removed accept latency", zap.Duration("latency", d), - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listening on", s.Listen()), ) } @@ -720,6 +854,7 @@ func (s *server) DelayTx(latency, rv time.Duration) { if latency <= 0 { return } + d := computeLatency(latency, rv) s.latencyTxMu.Lock() s.latencyTx = d @@ -730,8 +865,7 @@ func (s *server) DelayTx(latency, rv time.Duration) { zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listening on", s.Listen()), ) } @@ -744,8 +878,7 @@ func (s *server) UndelayTx() { s.lg.Info( "removed transmit latency", zap.Duration("latency", d), - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listening on", s.Listen()), ) } @@ -770,8 +903,7 @@ func (s *server) DelayRx(latency, rv time.Duration) { zap.Duration("latency", d), zap.Duration("given-latency", latency), zap.Duration("given-latency-random-variable", rv), - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listening on", s.Listen()), ) } @@ -784,8 +916,7 @@ func (s *server) UndelayRx() { s.lg.Info( "removed receive latency", zap.Duration("latency", d), - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listening on", s.Listen()), ) } @@ -821,8 +952,7 @@ func (s *server) ModifyTx(f func([]byte) []byte) { s.lg.Info( "modifying tx", - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listen on", s.Listen()), ) } @@ -833,8 +963,7 @@ func (s *server) UnmodifyTx() { s.lg.Info( "unmodifyed tx", - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listen on", s.Listen()), ) } @@ -844,8 +973,7 @@ func (s *server) ModifyRx(f func([]byte) []byte) { s.modifyRxMu.Unlock() s.lg.Info( "modifying rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listen on", s.Listen()), ) } @@ -856,8 +984,7 @@ func (s *server) UnmodifyRx() { s.lg.Info( "unmodifyed rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listen on", s.Listen()), ) } @@ -865,8 +992,7 @@ func (s *server) BlackholeTx() { s.ModifyTx(func([]byte) []byte { return nil }) s.lg.Info( "blackholed tx", - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listening on", s.Listen()), ) } @@ -874,8 +1000,7 @@ func (s *server) UnblackholeTx() { s.UnmodifyTx() s.lg.Info( "unblackholed tx", - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listening on", s.Listen()), ) } @@ -883,8 +1008,7 @@ func (s *server) BlackholeRx() { s.ModifyRx(func([]byte) []byte { return nil }) s.lg.Info( "blackholed rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listening on", s.Listen()), ) } @@ -892,11 +1016,70 @@ func (s *server) UnblackholeRx() { s.UnmodifyRx() s.lg.Info( "unblackholed rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listening on", s.Listen()), ) } +func (s *server) BlackholePeerTx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val |= blackholePeerTypeTx + s.blackholePeerMap[port] = val + } else { + s.blackholePeerMap[port] = blackholePeerTypeTx + } +} + +func (s *server) UnblackholePeerTx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val &= bits.Reverse8(blackholePeerTypeTx) + s.blackholePeerMap[port] = val + } +} + +func (s *server) BlackholePeerRx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val |= blackholePeerTypeRx + s.blackholePeerMap[port] = val + } else { + s.blackholePeerMap[port] = blackholePeerTypeTx + } +} + +func (s *server) UnblackholePeerRx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val &= bits.Reverse8(blackholePeerTypeRx) + s.blackholePeerMap[port] = val + } +} + func (s *server) PauseTx() { s.pauseTxMu.Lock() s.pauseTxc = make(chan struct{}) @@ -904,8 +1087,7 @@ func (s *server) PauseTx() { s.lg.Info( "paused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listen on", s.Listen()), ) } @@ -923,8 +1105,7 @@ func (s *server) UnpauseTx() { s.lg.Info( "unpaused tx", - zap.String("from", s.From()), - zap.String("to", s.To()), + zap.String("proxy listen on", s.Listen()), ) } @@ -935,8 +1116,7 @@ func (s *server) PauseRx() { s.lg.Info( "paused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), + zap.String("proxy listen on", s.Listen()), ) } @@ -954,37 +1134,6 @@ func (s *server) UnpauseRx() { s.lg.Info( "unpaused rx", - zap.String("from", s.To()), - zap.String("to", s.From()), - ) -} - -func (s *server) ResetListener() error { - s.listenerMu.Lock() - defer s.listenerMu.Unlock() - - if err := s.listener.Close(); err != nil { - // already closed - if !strings.HasSuffix(err.Error(), "use of closed network connection") { - return err - } - } - - var ln net.Listener - var err error - if !s.tlsInfo.Empty() { - ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo) - } else { - ln, err = net.Listen(s.from.Scheme, s.from.Host) - } - if err != nil { - return err - } - s.listener = ln - - s.lg.Info( - "reset listener on", - zap.String("from", s.From()), + zap.String("proxy listen on", s.Listen()), ) - return nil } diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index d19c947c6465..825be3ac751a 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -17,88 +17,254 @@ package proxy import ( "bytes" "context" - "crypto/tls" "fmt" "io" "log" - "math/rand" "net" "net/http" "net/url" - "os" "strings" "testing" "time" - "github.com/stretchr/testify/assert" "go.uber.org/zap" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/client/pkg/v3/transport" ) -func TestServer_Unix_Insecure(t *testing.T) { testServer(t, "unix", false, false) } -func TestServer_TCP_Insecure(t *testing.T) { testServer(t, "tcp", false, false) } -func TestServer_Unix_Secure(t *testing.T) { testServer(t, "unix", true, false) } -func TestServer_TCP_Secure(t *testing.T) { testServer(t, "tcp", true, false) } -func TestServer_Unix_Insecure_DelayTx(t *testing.T) { testServer(t, "unix", false, true) } -func TestServer_TCP_Insecure_DelayTx(t *testing.T) { testServer(t, "tcp", false, true) } -func TestServer_Unix_Secure_DelayTx(t *testing.T) { testServer(t, "unix", true, true) } -func TestServer_TCP_Secure_DelayTx(t *testing.T) { testServer(t, "tcp", true, true) } +/* Helper functions */ +type dummyServerHandler struct { + t *testing.T + output chan<- []byte +} -func testServer(t *testing.T, scheme string, secure bool, delayTx bool) { - lg := zaptest.NewLogger(t) - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - if scheme == "tcp" { - ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{}) - srcAddr, dstAddr = ln1.Addr().String(), ln2.Addr().String() - ln1.Close() - ln2.Close() +// reads the request body and write back to the response object +func (sh *dummyServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + resp.WriteHeader(200) + + if data, err := io.ReadAll(req.Body); err != nil { + sh.t.Fatal(err) } else { - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() + sh.output <- data } - tlsInfo := createTLSInfo(lg, secure) - ln := listen(t, scheme, dstAddr, tlsInfo) - defer ln.Close() +} + +func prepare(t *testing.T, serverIsClosed bool) (chan []byte, chan struct{}, chan []byte, Server, *http.Server, func(data []byte)) { + lg := zaptest.NewLogger(t) + scheme := "tcp" + L7Scheme := "http" + + // we always send the traffic to destination with HTTPS + // this will force the CONNECT header to be sent first + tlsInfo := createTLSInfo(lg) + + ln1, ln2 := listen(t, "tcp", "localhost:0", transport.TLSInfo{}), listen(t, "tcp", "localhost:0", transport.TLSInfo{}) + forwardProxyAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String() + ln1.Close() + ln2.Close() + recvc := make(chan []byte, 1) + httpServer := &http.Server{ + Handler: &dummyServerHandler{ + t: t, + output: recvc, + }, + } + go startHTTPServer(scheme, dstAddr, tlsInfo, httpServer) + + // we connect to the proxy without TLS + proxyURL := url.URL{Scheme: L7Scheme, Host: forwardProxyAddr} cfg := ServerConfig{ Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - } - if secure { - cfg.TLSInfo = tlsInfo + Listen: proxyURL, } p := NewServer(cfg) - waitForServer(t, p) - defer p.Close() + // setup forward proxy + t.Setenv("E2E_TEST_FORWARD_PROXY_IP", proxyURL.String()) + t.Logf("Proxy URL %s", proxyURL.String()) - data1 := []byte("Hello World!") donec, writec := make(chan struct{}), make(chan []byte) - go func() { - defer close(donec) - for data := range writec { - send(t, data, scheme, srcAddr, tlsInfo) + var tp *http.Transport + var err error + if !tlsInfo.Empty() { + tp, err = transport.NewTransport(tlsInfo, 1*time.Second) + } else { + tp, err = transport.NewTransport(tlsInfo, 1*time.Second) + } + if err != nil { + t.Fatal(err) + } + tp.IdleConnTimeout = 100 * time.Microsecond + + sendData := func(data []byte) { + send(tp, t, data, scheme, dstAddr, tlsInfo, serverIsClosed) + } + + return recvc, donec, writec, p, httpServer, sendData +} + +func destroy(t *testing.T, writec chan []byte, donec chan struct{}, p Server, serverIsClosed bool, httpServer *http.Server) { + close(writec) + if err := httpServer.Shutdown(context.Background()); err != nil { + t.Fatal(err) + } + + select { + case <-donec: + case <-time.After(3 * time.Second): + t.Fatal("took too long to write") + } + + if !serverIsClosed { + select { + case <-p.Done(): + t.Fatal("unexpected done") + case err := <-p.Error(): + t.Fatal(err) + default: + } + + if err := p.Close(); err != nil { + t.Fatal(err) + } + + select { + case <-p.Done(): + case err := <-p.Error(): + if !strings.HasPrefix(err.Error(), "accept ") && + !strings.HasSuffix(err.Error(), "use of closed network connection") { + t.Fatal(err) + } + case <-time.After(3 * time.Second): + t.Fatal("took too long to close") } + } +} + +func createTLSInfo(lg *zap.Logger) transport.TLSInfo { + return transport.TLSInfo{ + KeyFile: "../../tests/fixtures/server.key.insecure", + CertFile: "../../tests/fixtures/server.crt", + TrustedCAFile: "../../tests/fixtures/ca.crt", + ClientCertAuth: true, + Logger: lg, + } +} + +func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) { + var err error + if !tlsInfo.Empty() { + ln, err = transport.NewListener(addr, scheme, &tlsInfo) + } else { + ln, err = net.Listen(scheme, addr) + } + if err != nil { + t.Fatal(err) + } + return ln +} + +func startHTTPServer(scheme, addr string, tlsInfo transport.TLSInfo, httpServer *http.Server) { + var err error + var ln net.Listener + + ln, err = net.Listen(scheme, addr) + if err != nil { + log.Fatal(err) + } + + log.Println("HTTP Server started on", addr) + if err := httpServer.ServeTLS(ln, tlsInfo.CertFile, tlsInfo.KeyFile); err != http.ErrServerClosed { + // always returns error. ErrServerClosed on graceful close + log.Fatalf(fmt.Sprintf("startHTTPServer ServeTLS(): %v", err)) + } +} + +func send(tp *http.Transport, t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo, serverIsClosed bool) { + defer func() { + tp.CloseIdleConnections() }() - recvc := make(chan []byte, 1) + // If you call Dial(), you will get a Conn that you can write the byte stream directly + // If you call RoundTrip(), you will get a connection managed for you, but you need to send valid HTTP request + dataReader := bytes.NewReader(data) + protocolScheme := scheme + if scheme == "tcp" { + if !tlsInfo.Empty() { + protocolScheme = "https" + } else { + panic("only https is supported") + } + } else { + panic("scheme not supported") + } + rawURL := url.URL{ + Scheme: protocolScheme, + Host: addr, + } + + req, err := http.NewRequest("POST", rawURL.String(), dataReader) + if err != nil { + t.Fatal(err) + } + res, err := tp.RoundTrip(req) + if err != nil { + if strings.Contains(err.Error(), "TLS handshake timeout") { + t.Logf("TLS handshake timeout") + return + } + if serverIsClosed { + // when the proxy server is closed before sending, we will get this error message + if strings.Contains(err.Error(), "connect: connection refused") { + t.Logf("connect: connection refused") + return + } + } + panic(err) + } + defer func() { + if err := res.Body.Close(); err != nil { + panic(err) + } + }() + + if res.StatusCode != 200 { + t.Fatalf("status code not 200") + } +} + +// Waits until a proxy is ready to serve. +// Aborts test on proxy start-up error. +func waitForServer(t *testing.T, s Server) { + select { + case <-s.Ready(): + case err := <-s.Error(): + t.Fatal(err) + } +} + +/* Unit tests */ +func TestServer_TCP(t *testing.T) { testServer(t, false) } +func TestServer_TCP_DelayTx(t *testing.T) { testServer(t, true) } +func testServer(t *testing.T, delayTx bool) { + recvc, donec, writec, p, httpServer, sendData := prepare(t, false) + defer destroy(t, writec, donec, p, false, httpServer) go func() { - for i := 0; i < 2; i++ { - recvc <- receive(t, ln) + defer close(donec) + for data := range writec { + sendData(data) } }() + data1 := []byte("Hello World!") writec <- data1 now := time.Now() if d := <-recvc; !bytes.Equal(data1, d) { - close(writec) t.Fatalf("expected %q, got %q", string(data1), string(d)) } took1 := time.Since(now) @@ -113,7 +279,6 @@ func testServer(t *testing.T, scheme string, secure bool, delayTx bool) { writec <- data2 now = time.Now() if d := <-recvc; !bytes.Equal(data2, d) { - close(writec) t.Fatalf("expected %q, got %q", string(data2), string(d)) } took2 := time.Since(now) @@ -130,100 +295,35 @@ func testServer(t *testing.T, scheme string, secure bool, delayTx bool) { t.Fatalf("expected took2 %v (with latency) > delay: %v", took2, lat-rv) } } - - close(writec) - select { - case <-donec: - case <-time.After(3 * time.Second): - t.Fatal("took too long to write") - } - - select { - case <-p.Done(): - t.Fatal("unexpected done") - case err := <-p.Error(): - t.Fatal(err) - default: - } - - if err := p.Close(); err != nil { - t.Fatal(err) - } - - select { - case <-p.Done(): - case err := <-p.Error(): - if !strings.HasPrefix(err.Error(), "accept ") && - !strings.HasSuffix(err.Error(), "use of closed network connection") { - t.Fatal(err) - } - case <-time.After(3 * time.Second): - t.Fatal("took too long to close") - } } -func createTLSInfo(lg *zap.Logger, secure bool) transport.TLSInfo { - if secure { - return transport.TLSInfo{ - KeyFile: "../../tests/fixtures/server.key.insecure", - CertFile: "../../tests/fixtures/server.crt", - TrustedCAFile: "../../tests/fixtures/ca.crt", - ClientCertAuth: true, - Logger: lg, +func TestServer_DelayAccept(t *testing.T) { + recvc, donec, writec, p, httpServer, sendData := prepare(t, false) + defer destroy(t, writec, donec, p, false, httpServer) + go func() { + defer close(donec) + for data := range writec { + sendData(data) } - } - return transport.TLSInfo{Logger: lg} -} - -func TestServer_Unix_Insecure_DelayAccept(t *testing.T) { testServerDelayAccept(t, false) } -func TestServer_Unix_Secure_DelayAccept(t *testing.T) { testServerDelayAccept(t, true) } -func testServerDelayAccept(t *testing.T, secure bool) { - lg := zaptest.NewLogger(t) - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) }() - tlsInfo := createTLSInfo(lg, secure) - scheme := "unix" - ln := listen(t, scheme, dstAddr, tlsInfo) - defer ln.Close() - - cfg := ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - } - if secure { - cfg.TLSInfo = tlsInfo - } - p := NewServer(cfg) - - waitForServer(t, p) - - defer p.Close() data := []byte("Hello World!") - now := time.Now() - send(t, data, scheme, srcAddr, tlsInfo) - if d := receive(t, ln); !bytes.Equal(data, d) { + writec <- data + if d := <-recvc; !bytes.Equal(data, d) { t.Fatalf("expected %q, got %q", string(data), string(d)) } took1 := time.Since(now) t.Logf("took %v with no latency", took1) + time.Sleep(1 * time.Second) // wait for the idle connection to timeout lat, rv := 700*time.Millisecond, 10*time.Millisecond p.DelayAccept(lat, rv) defer p.UndelayAccept() - if err := p.ResetListener(); err != nil { - t.Fatal(err) - } - time.Sleep(200 * time.Millisecond) now = time.Now() - send(t, data, scheme, srcAddr, tlsInfo) - if d := receive(t, ln); !bytes.Equal(data, d) { + writec <- data + if d := <-recvc; !bytes.Equal(data, d) { t.Fatalf("expected %q, got %q", string(data), string(d)) } took2 := time.Since(now) @@ -235,36 +335,22 @@ func testServerDelayAccept(t *testing.T, secure bool) { } func TestServer_PauseTx(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) + recvc, donec, writec, p, httpServer, sendData := prepare(t, false) + defer destroy(t, writec, donec, p, false, httpServer) + // the sendData function must be in a goroutine + // otherwise, the pauseTx will cause the sendData to block + go func() { + defer close(donec) + for data := range writec { + sendData(data) + } }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - p.PauseTx() data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - recvc := make(chan []byte, 1) - go func() { - recvc <- receive(t, ln) - }() + p.PauseTx() + writec <- data select { case d := <-recvc: t.Fatalf("received unexpected data %q during pause", string(d)) @@ -283,114 +369,32 @@ func TestServer_PauseTx(t *testing.T) { } } -func TestServer_ModifyTx_corrupt(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - p.ModifyTx(func(d []byte) []byte { - d[len(d)/2]++ - return d - }) - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); bytes.Equal(d, data) { - t.Fatalf("expected corrupted data, got %q", string(d)) - } - - p.UnmodifyTx() - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected uncorrupted data, got %q", string(d)) - } -} - -func TestServer_ModifyTx_packet_loss(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) +func TestServer_BlackholeTx(t *testing.T) { + recvc, donec, writec, p, httpServer, sendData := prepare(t, false) + defer destroy(t, writec, donec, p, false, httpServer) + // the sendData function must be in a goroutine + // otherwise, the pauseTx will cause the sendData to block + go func() { + defer close(donec) + for data := range writec { + sendData(data) + } }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - defer p.Close() - - // 50% packet loss - p.ModifyTx(func(d []byte) []byte { - half := len(d) / 2 - return d[:half:half] - }) + // before enabling blacklhole data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); bytes.Equal(d, data) { - t.Fatalf("expected corrupted data, got %q", string(d)) - } - - p.UnmodifyTx() - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected uncorrupted data, got %q", string(d)) + writec <- data + if d := <-recvc; !bytes.Equal(data, d) { + t.Fatalf("expected %q, got %q", string(data), string(d)) } -} - -func TestServer_BlackholeTx(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() + // enable blackhole + // note that the transport is set to use 10s for TLSHandshakeTimeout, so + // this test will require at least 10s to execute, since send() is a + // blocking call thus we need to wait for ssl handshake to timeout p.BlackholeTx() - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - - recvc := make(chan []byte, 1) - go func() { - recvc <- receive(t, ln) - }() - + writec <- data select { case d := <-recvc: t.Fatalf("unexpected data receive %q during blackhole", string(d)) @@ -399,10 +403,12 @@ func TestServer_BlackholeTx(t *testing.T) { p.UnblackholeTx() + // disable blackhole + // TODO: figure out why HTTPS won't attempt to reconnect when the blackhole is disabled + // expect different data, old data dropped data[0]++ - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - + writec <- data select { case d := <-recvc: if !bytes.Equal(data, d) { @@ -414,286 +420,31 @@ func TestServer_BlackholeTx(t *testing.T) { } func TestServer_Shutdown(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - s, _ := p.(*server) - s.listener.Close() - time.Sleep(200 * time.Millisecond) - - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } -} - -func TestServer_ShutdownListener(t *testing.T) { - lg := zaptest.NewLogger(t) - scheme := "unix" - srcAddr, dstAddr := newUnixAddr(), newUnixAddr() - defer func() { - os.RemoveAll(srcAddr) - os.RemoveAll(dstAddr) - }() - - ln := listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - p := NewServer(ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - }) - - waitForServer(t, p) - - defer p.Close() - - // shut down destination - ln.Close() - time.Sleep(200 * time.Millisecond) - - ln = listen(t, scheme, dstAddr, transport.TLSInfo{}) - defer ln.Close() - - data := []byte("Hello World!") - send(t, data, scheme, srcAddr, transport.TLSInfo{}) - if d := receive(t, ln); !bytes.Equal(d, data) { - t.Fatalf("expected %q, got %q", string(data), string(d)) - } -} - -func TestServerHTTP_Insecure_DelayTx(t *testing.T) { testServerHTTP(t, false, true) } -func TestServerHTTP_Secure_DelayTx(t *testing.T) { testServerHTTP(t, true, true) } -func TestServerHTTP_Insecure_DelayRx(t *testing.T) { testServerHTTP(t, false, false) } -func TestServerHTTP_Secure_DelayRx(t *testing.T) { testServerHTTP(t, true, false) } -func testServerHTTP(t *testing.T, secure, delayTx bool) { - lg := zaptest.NewLogger(t) - scheme := "tcp" - ln1, ln2 := listen(t, scheme, "localhost:0", transport.TLSInfo{}), listen(t, scheme, "localhost:0", transport.TLSInfo{}) - srcAddr, dstAddr := ln1.Addr().String(), ln2.Addr().String() - ln1.Close() - ln2.Close() - - mux := http.NewServeMux() - mux.HandleFunc("/hello", func(w http.ResponseWriter, req *http.Request) { - d, err := io.ReadAll(req.Body) - req.Body.Close() - if err != nil { - t.Fatal(err) - } - if _, err = w.Write([]byte(fmt.Sprintf("%q(confirmed)", string(d)))); err != nil { - t.Fatal(err) - } - }) - tlsInfo := createTLSInfo(lg, secure) - var tlsConfig *tls.Config - if secure { - _, err := tlsInfo.ServerConfig() - if err != nil { - t.Fatal(err) - } - } - srv := &http.Server{ - Addr: dstAddr, - Handler: mux, - TLSConfig: tlsConfig, - ErrorLog: log.New(io.Discard, "net/http", 0), - } - - donec := make(chan struct{}) - defer func() { - srv.Close() - <-donec - }() + recvc, donec, writec, p, httpServer, sendData := prepare(t, true) + defer destroy(t, writec, donec, p, true, httpServer) go func() { - if !secure { - srv.ListenAndServe() - } else { - srv.ListenAndServeTLS(tlsInfo.CertFile, tlsInfo.KeyFile) - } defer close(donec) - }() - time.Sleep(200 * time.Millisecond) - - cfg := ServerConfig{ - Logger: lg, - From: url.URL{Scheme: scheme, Host: srcAddr}, - To: url.URL{Scheme: scheme, Host: dstAddr}, - } - if secure { - cfg.TLSInfo = tlsInfo - } - p := NewServer(cfg) - - waitForServer(t, p) - - defer func() { - lg.Info("closing Proxy server...") - p.Close() - lg.Info("closed Proxy server.") - }() - - data := "Hello World!" - - var resp *http.Response - var err error - now := time.Now() - if secure { - tp, terr := transport.NewTransport(tlsInfo, 3*time.Second) - assert.NoError(t, terr) - cli := &http.Client{Transport: tp} - resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data)) - defer cli.CloseIdleConnections() - defer tp.CloseIdleConnections() - } else { - resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data)) - defer http.DefaultClient.CloseIdleConnections() - } - assert.NoError(t, err) - d, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - took1 := time.Since(now) - t.Logf("took %v with no latency", took1) - - rs1 := string(d) - exp := fmt.Sprintf("%q(confirmed)", data) - if rs1 != exp { - t.Fatalf("got %q, expected %q", rs1, exp) - } - - lat, rv := 100*time.Millisecond, 10*time.Millisecond - if delayTx { - p.DelayTx(lat, rv) - defer p.UndelayTx() - } else { - p.DelayRx(lat, rv) - defer p.UndelayRx() - } - - now = time.Now() - if secure { - tp, terr := transport.NewTransport(tlsInfo, 3*time.Second) - if terr != nil { - t.Fatal(terr) + for data := range writec { + sendData(data) } - cli := &http.Client{Transport: tp} - resp, err = cli.Post("https://"+srcAddr+"/hello", "", strings.NewReader(data)) - defer cli.CloseIdleConnections() - defer tp.CloseIdleConnections() - } else { - resp, err = http.Post("http://"+srcAddr+"/hello", "", strings.NewReader(data)) - defer http.DefaultClient.CloseIdleConnections() - } - if err != nil { - t.Fatal(err) - } - d, err = io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - took2 := time.Since(now) - t.Logf("took %v with latency %v±%v", took2, lat, rv) - - rs2 := string(d) - if rs2 != exp { - t.Fatalf("got %q, expected %q", rs2, exp) - } - if took1 > took2 { - t.Fatalf("expected took1 %v < took2 %v", took1, took2) - } -} - -func newUnixAddr() string { - now := time.Now().UnixNano() - addr := fmt.Sprintf("%X%X.unix-conn", now, rand.Intn(35000)) - os.RemoveAll(addr) - return addr -} - -func listen(t *testing.T, scheme, addr string, tlsInfo transport.TLSInfo) (ln net.Listener) { - var err error - if !tlsInfo.Empty() { - ln, err = transport.NewListener(addr, scheme, &tlsInfo) - } else { - ln, err = net.Listen(scheme, addr) - } - if err != nil { - t.Fatal(err) - } - return ln -} + }() -func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSInfo) { - var out net.Conn - var err error - if !tlsInfo.Empty() { - tp, terr := transport.NewTransport(tlsInfo, 3*time.Second) - if terr != nil { - t.Fatal(terr) - } - out, err = tp.DialContext(context.Background(), scheme, addr) - } else { - out, err = net.Dial(scheme, addr) - } - if err != nil { - t.Fatal(err) - } - if _, err = out.Write(data); err != nil { - t.Fatal(err) - } - if err = out.Close(); err != nil { + s, _ := p.(*server) + if err := s.Close(); err != nil { t.Fatal(err) } -} + p = nil + time.Sleep(200 * time.Millisecond) -func receive(t *testing.T, ln net.Listener) (data []byte) { - buf := bytes.NewBuffer(make([]byte, 0, 1024)) - for { - in, err := ln.Accept() - if err != nil { - t.Fatal(err) - } - var n int64 - n, err = buf.ReadFrom(in) - if err != nil { - t.Fatal(err) - } - if n > 0 { - break - } - } - return buf.Bytes() -} + data := []byte("Hello World!") + sendData(data) -// Waits until a proxy is ready to serve. -// Aborts test on proxy start-up error. -func waitForServer(t *testing.T, s Server) { select { - case <-s.Ready(): - case err := <-s.Error(): - t.Fatal(err) + case d := <-recvc: + if bytes.Equal(data, d) { + t.Fatalf("expected nothing, got %q", string(d)) + } + case <-time.After(2 * time.Second): + t.Log("nothing was received, proxy server seems to be closed so no traffic is forwarded") } } diff --git a/tests/e2e/blackhole_test.go b/tests/e2e/blackhole_test.go new file mode 100644 index 000000000000..31478f9f1973 --- /dev/null +++ b/tests/e2e/blackhole_test.go @@ -0,0 +1,105 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestBlackholeByMockingPartitionLeader(t *testing.T) { + blackholeTestByMockingPartition(t, 3, true) +} + +func TestBlackholeByMockingPartitionFollower(t *testing.T) { + blackholeTestByMockingPartition(t, 3, false) +} + +func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLeader bool) { + e2e.BeforeTest(t) + + t.Logf("Create an etcd cluster with %d member\n", clusterSize) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, + e2e.WithClusterSize(clusterSize), + e2e.WithSnapshotCount(10), + e2e.WithSnapshotCatchUpEntries(10), + e2e.WithIsPeerTLS(true), + e2e.WithPeerProxy(true), + ) + require.NoError(t, err, "failed to start etcd cluster: %v", err) + defer func() { + require.NoError(t, epc.Close(), "failed to close etcd cluster") + }() + + leaderID := epc.WaitLeader(t) + mockPartitionNodeIndex := leaderID + if !partitionLeader { + mockPartitionNodeIndex = (leaderID + 1) % (clusterSize) + } + partitionedMember := epc.Procs[mockPartitionNodeIndex] + // Mock partition + t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name) + epc.BlackholePeer(partitionedMember) + + t.Logf("Wait 1s for any open connections to expire") + time.Sleep(1 * time.Second) + + t.Logf("Wait for new leader election with remaining members") + leaderEPC := epc.Procs[waitLeader(t, epc, mockPartitionNodeIndex)] + t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot.)") + writeKVs(t, leaderEPC.Etcdctl(), 0, 20) + e2e.AssertProcessLogs(t, leaderEPC, "saved snapshot") + + t.Log("Verifying the partitionedMember is missing new writes") + assertRevision(t, leaderEPC, 21) + assertRevision(t, partitionedMember, 1) + + // Wait for some time to restore the network + time.Sleep(1 * time.Second) + t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name) + epc.UnblackholePeer(partitionedMember) + + leaderEPC = epc.Procs[epc.WaitLeader(t)] + time.Sleep(1 * time.Second) + assertRevision(t, leaderEPC, 21) + assertRevision(t, partitionedMember, 21) +} + +func waitLeader(t testing.TB, epc *e2e.EtcdProcessCluster, excludeNode int) int { + var membs []e2e.EtcdProcess + for i := 0; i < len(epc.Procs); i++ { + if i == excludeNode { + continue + } + membs = append(membs, epc.Procs[i]) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + return epc.WaitMembersForLeader(ctx, t, membs) +} + +func assertRevision(t testing.TB, member e2e.EtcdProcess, expectedRevision int64) { + responses, err := member.Etcdctl().Status(context.TODO()) + require.NoError(t, err) + assert.Equal(t, expectedRevision, responses[0].Header.Revision, "revision mismatch") +} diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go index 8aa2694344f5..baa5ad81110f 100644 --- a/tests/e2e/http_health_check_test.go +++ b/tests/e2e/http_health_check_test.go @@ -384,10 +384,10 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { member := clus.Procs[0] - proxy := member.PeerProxy() + forwardProxy := member.PeerForwardProxy() t.Logf("Blackholing traffic from and to member %q", member.Config().Name) - proxy.BlackholeTx() - proxy.BlackholeRx() + forwardProxy.BlackholeTx() + forwardProxy.BlackholeRx() } func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 23d422aa3130..b9b03d9aa07e 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -513,10 +513,10 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in var curl string port := cfg.BasePort + 5*i clientPort := port - peerPort := port + 1 + peerPort := port + 1 // the port that the peer actually listens on metricsPort := port + 2 - peer2Port := port + 3 - clientHTTPPort := port + 4 + clientHTTPPort := port + 3 + forwardProxyPort := port + 4 if cfg.Client.ConnectionType == ClientTLSAndNonTLS { curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) @@ -528,17 +528,23 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} - var proxyCfg *proxy.ServerConfig + var forwardProxyCfg *proxy.ServerConfig if cfg.PeerProxy { if !cfg.IsPeerTLS { panic("Can't use peer proxy without peer TLS as it can result in malformed packets") } - peerAdvertiseURL.Host = fmt.Sprintf("localhost:%d", peer2Port) - proxyCfg = &proxy.ServerConfig{ + + // setup forward proxy + forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)} + forwardProxyCfg = &proxy.ServerConfig{ Logger: zap.NewNop(), - To: peerListenURL, - From: peerAdvertiseURL, + Listen: forwardProxyURL, + } + + if cfg.EnvVars == nil { + cfg.EnvVars = make(map[string]string) } + cfg.EnvVars["E2E_TEST_FORWARD_PROXY_IP"] = fmt.Sprintf("http://127.0.0.1:%d", forwardProxyPort) } name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i) @@ -660,7 +666,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in InitialToken: cfg.ServerConfig.InitialClusterToken, GoFailPort: gofailPort, GoFailClientTimeout: cfg.GoFailClientTimeout, - Proxy: proxyCfg, + ForwardProxy: forwardProxyCfg, LazyFSEnabled: cfg.LazyFSEnabled, } } @@ -910,6 +916,38 @@ func (epc *EtcdProcessCluster) Restart(ctx context.Context) error { return epc.start(func(ep EtcdProcess) error { return ep.Restart(ctx) }) } +func (epc *EtcdProcessCluster) BlackholePeer(blackholePeer EtcdProcess) error { + blackholePeer.PeerForwardProxy().BlackholeRx() + blackholePeer.PeerForwardProxy().BlackholeTx() + + for _, peer := range epc.Procs { + if peer.Config().Name == blackholePeer.Config().Name { + continue + } + + peer.PeerForwardProxy().BlackholePeerRx(blackholePeer.Config().PeerURL) + peer.PeerForwardProxy().BlackholePeerTx(blackholePeer.Config().PeerURL) + } + + return nil +} + +func (epc *EtcdProcessCluster) UnblackholePeer(blackholePeer EtcdProcess) error { + blackholePeer.PeerForwardProxy().UnblackholeRx() + blackholePeer.PeerForwardProxy().UnblackholeTx() + + for _, peer := range epc.Procs { + if peer.Config().Name == blackholePeer.Config().Name { + continue + } + + peer.PeerForwardProxy().UnblackholePeerRx(blackholePeer.Config().PeerURL) + peer.PeerForwardProxy().UnblackholePeerTx(blackholePeer.Config().PeerURL) + } + + return nil +} + func (epc *EtcdProcessCluster) start(f func(ep EtcdProcess) error) error { readyC := make(chan error, len(epc.Procs)) for i := range epc.Procs { diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index b55ef9e90a62..55ab70536866 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -55,7 +55,7 @@ type EtcdProcess interface { Stop() error Close() error Config() *EtcdServerProcessConfig - PeerProxy() proxy.Server + PeerForwardProxy() proxy.Server Failpoints() *BinaryFailpoints LazyFS() *LazyFS Logs() LogsExpect @@ -69,12 +69,12 @@ type LogsExpect interface { } type EtcdServerProcess struct { - cfg *EtcdServerProcessConfig - proc *expect.ExpectProcess - proxy proxy.Server - lazyfs *LazyFS - failpoints *BinaryFailpoints - donec chan struct{} // closed when Interact() terminates + cfg *EtcdServerProcessConfig + proc *expect.ExpectProcess + forwardProxy proxy.Server + lazyfs *LazyFS + failpoints *BinaryFailpoints + donec chan struct{} // closed when Interact() terminates } type EtcdServerProcessConfig struct { @@ -101,7 +101,7 @@ type EtcdServerProcessConfig struct { GoFailClientTimeout time.Duration LazyFSEnabled bool - Proxy *proxy.ServerConfig + ForwardProxy *proxy.ServerConfig } func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { @@ -151,12 +151,13 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { if ep.proc != nil { panic("already started") } - if ep.cfg.Proxy != nil && ep.proxy == nil { - ep.cfg.lg.Info("starting proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.Proxy.From.String()), zap.String("to", ep.cfg.Proxy.To.String())) - ep.proxy = proxy.NewServer(*ep.cfg.Proxy) + + if ep.cfg.ForwardProxy != nil && ep.forwardProxy == nil { + ep.cfg.lg.Info("starting forward proxy...", zap.String("name", ep.cfg.Name), zap.String("listen on", ep.cfg.ForwardProxy.Listen.String())) + ep.forwardProxy = proxy.NewServer(*ep.cfg.ForwardProxy) select { - case <-ep.proxy.Ready(): - case err := <-ep.proxy.Error(): + case <-ep.forwardProxy.Ready(): + case err := <-ep.forwardProxy.Error(): return err } } @@ -221,10 +222,10 @@ func (ep *EtcdServerProcess) Stop() (err error) { } } ep.cfg.lg.Info("stopped server.", zap.String("name", ep.cfg.Name)) - if ep.proxy != nil { - ep.cfg.lg.Info("stopping proxy...", zap.String("name", ep.cfg.Name)) - err = ep.proxy.Close() - ep.proxy = nil + if ep.forwardProxy != nil { + ep.cfg.lg.Info("stopping forward proxy...", zap.String("name", ep.cfg.Name)) + err = ep.forwardProxy.Close() + ep.forwardProxy = nil if err != nil { return err } @@ -330,8 +331,8 @@ func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) { } } -func (ep *EtcdServerProcess) PeerProxy() proxy.Server { - return ep.proxy +func (ep *EtcdServerProcess) PeerForwardProxy() proxy.Server { + return ep.forwardProxy } func (ep *EtcdServerProcess) LazyFS() *LazyFS { diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index 27504c396b95..67f8520dfe91 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -63,23 +63,17 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces if tb.waitTillSnapshot && (entriesToGuaranteeSnapshot(config) > 200 || !e2e.CouldSetSnapshotCatchupEntries(process.Config().ExecPath)) { return false } - return config.ClusterSize > 1 && process.PeerProxy() != nil + return config.ClusterSize > 1 && process.PeerForwardProxy() != nil } func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { - proxy := member.PeerProxy() - - // Blackholing will cause peers to not be able to use streamWriters registered with member - // but peer traffic is still possible because member has 'pipeline' with peers - // TODO: find a way to stop all traffic t.Logf("Blackholing traffic from and to member %q", member.Config().Name) - proxy.BlackholeTx() - proxy.BlackholeRx() + clus.BlackholePeer(member) defer func() { t.Logf("Traffic restored from and to member %q", member.Config().Name) - proxy.UnblackholeTx() - proxy.UnblackholeRx() + clus.UnblackholePeer(member) }() + if shouldWaitTillSnapshot { return waitTillSnapshot(ctx, t, clus, member) } @@ -164,15 +158,15 @@ type delayPeerNetworkFailpoint struct { func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] - proxy := member.PeerProxy() + forwardProxy := member.PeerForwardProxy() - proxy.DelayRx(f.baseLatency, f.randomizedLatency) - proxy.DelayTx(f.baseLatency, f.randomizedLatency) + forwardProxy.DelayRx(f.baseLatency, f.randomizedLatency) + forwardProxy.DelayTx(f.baseLatency, f.randomizedLatency) lg.Info("Delaying traffic from and to member", zap.String("member", member.Config().Name), zap.Duration("baseLatency", f.baseLatency), zap.Duration("randomizedLatency", f.randomizedLatency)) time.Sleep(f.duration) lg.Info("Traffic delay removed", zap.String("member", member.Config().Name)) - proxy.UndelayRx() - proxy.UndelayTx() + forwardProxy.UndelayRx() + forwardProxy.UndelayTx() return nil, nil } @@ -181,7 +175,7 @@ func (f delayPeerNetworkFailpoint) Name() string { } func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess, profile traffic.Profile) bool { - return config.ClusterSize > 1 && clus.PeerProxy() != nil + return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil } type dropPeerNetworkFailpoint struct { @@ -191,15 +185,15 @@ type dropPeerNetworkFailpoint struct { func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] - proxy := member.PeerProxy() + forwardProxy := member.PeerForwardProxy() - proxy.ModifyRx(f.modifyPacket) - proxy.ModifyTx(f.modifyPacket) + forwardProxy.ModifyRx(f.modifyPacket) + forwardProxy.ModifyTx(f.modifyPacket) lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent)) time.Sleep(f.duration) lg.Info("Traffic drop removed", zap.String("member", member.Config().Name)) - proxy.UnmodifyRx() - proxy.UnmodifyTx() + forwardProxy.UnmodifyRx() + forwardProxy.UnmodifyTx() return nil, nil } @@ -215,5 +209,5 @@ func (f dropPeerNetworkFailpoint) Name() string { } func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess, profile traffic.Profile) bool { - return config.ClusterSize > 1 && clus.PeerProxy() != nil + return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil }