From 0df1446225865217f8f6bd98b0d03f638af91b9c Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Sun, 12 May 2024 12:18:03 +0200 Subject: [PATCH] Remove reverse proxy and keep only forward proxy for all peers There is no need of reverse proxy after the introduction of forward proxy, as the forward proxy holds the information of the destination, we can filter connection traffic from there, and this can reduce 1 hop when the proxy is turned on. Signed-off-by: Chun-Hung Tseng --- pkg/proxy/server.go | 144 ++++++++++++++++++++++++-- tests/e2e/blackhole_test.go | 12 +-- tests/e2e/http_health_check_test.go | 3 - tests/framework/e2e/cluster.go | 49 ++++++--- tests/framework/e2e/etcd_process.go | 28 ----- tests/robustness/failpoint/network.go | 29 +----- 6 files changed, 179 insertions(+), 86 deletions(-) diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 890eec4b9b14..c90600df5708 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "math/bits" mrand "math/rand" "net" "net/http" @@ -116,6 +117,16 @@ type Server interface { // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() + // ?? + BlackholePeerTx(peer url.URL) + // ?? + UnblackholePeerTx(peer url.URL) + + // ?? + BlackholePeerRx(peer url.URL) + // ?? + UnblackholePeerRx(peer url.URL) + // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. PauseTx() // UnpauseTx removes "forwarding" pause operation. @@ -142,6 +153,12 @@ type ServerConfig struct { IsForwardProxy bool } +const ( + blackholePeerTypeNone uint8 = iota + blackholePeerTypeTx + blackholePeerTypeRx +) + type server struct { lg *zap.Logger @@ -191,6 +208,9 @@ type server struct { latencyRxMu sync.RWMutex latencyRx time.Duration + + blackholePeerMap map[int]uint8 // port number, blackhole type + blackholePeerMapMu sync.RWMutex } // NewServer returns a proxy implementation with no iptables/tc dependencies. @@ -217,6 +237,8 @@ func NewServer(cfg ServerConfig) Server { pauseAcceptc: make(chan struct{}), pauseTxc: make(chan struct{}), pauseRxc: make(chan struct{}), + + blackholePeerMap: make(map[int]uint8), } if _, fromPort, err := net.SplitHostPort(cfg.From.Host); err == nil { @@ -455,30 +477,60 @@ func (s *server) listenAndServe() { continue } + var dstPort int + dstPort, err = getPort(out.RemoteAddr()) + if err != nil { + select { + case s.errc <- err: + select { + case <-s.donec: + return + default: + } + case <-s.donec: + return + } + s.lg.Debug("failed to parse port in transmit", zap.Error(err)) + return + } + s.closeWg.Add(2) go func() { defer s.closeWg.Done() // read incoming bytes from listener, dispatch to outgoing connection - s.transmit(out, in) + s.transmit(out, in, dstPort) out.Close() in.Close() }() go func() { defer s.closeWg.Done() // read response from outgoing connection, write back to listener - s.receive(in, out) + s.receive(in, out, dstPort) in.Close() out.Close() }() } } -func (s *server) transmit(dst io.Writer, src io.Reader) { - s.ioCopy(dst, src, proxyTx) +func getPort(addr net.Addr) (int, error) { + switch addr := addr.(type) { + case *net.TCPAddr: + return addr.Port, nil + case *net.UDPAddr: + return addr.Port, nil + case *net.UnixAddr: + return -1, nil + default: + return 0, fmt.Errorf("unsupported address type: %T", addr) + } +} + +func (s *server) transmit(dst, src net.Conn, port int) { + s.ioCopy(dst, src, proxyTx, port) } -func (s *server) receive(dst io.Writer, src io.Reader) { - s.ioCopy(dst, src, proxyRx) +func (s *server) receive(dst, src net.Conn, port int) { + s.ioCopy(dst, src, proxyRx, port) } type proxyType uint8 @@ -488,7 +540,7 @@ const ( proxyRx ) -func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { +func (s *server) ioCopy(dst, src net.Conn, ptype proxyType, peerPort int) { buf := make([]byte, s.bufferSize) for { nr1, err := src.Read(buf) @@ -529,12 +581,30 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { data = s.modifyTx(data) } s.modifyTxMu.RUnlock() + + s.blackholePeerMapMu.RLock() + // Tx from other peers is Rx for the target peer + if val, exist := s.blackholePeerMap[peerPort]; exist { + if (val & blackholePeerTypeRx) > 0 { + data = nil + } + } + s.blackholePeerMapMu.RUnlock() case proxyRx: s.modifyRxMu.RLock() if s.modifyRx != nil { data = s.modifyRx(data) } s.modifyRxMu.RUnlock() + + s.blackholePeerMapMu.RLock() + // Rx from other peers is Tx for the target peer + if val, exist := s.blackholePeerMap[peerPort]; exist { + if (val & blackholePeerTypeTx) > 0 { + data = nil + } + } + s.blackholePeerMapMu.RUnlock() default: panic("unknown proxy type") } @@ -962,6 +1032,66 @@ func (s *server) UnblackholeRx() { ) } +func (s *server) BlackholePeerTx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val |= blackholePeerTypeTx + s.blackholePeerMap[port] = val + } else { + s.blackholePeerMap[port] = blackholePeerTypeTx + } +} + +func (s *server) UnblackholePeerTx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val &= bits.Reverse8(blackholePeerTypeTx) + s.blackholePeerMap[port] = val + } +} + +func (s *server) BlackholePeerRx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val |= blackholePeerTypeRx + s.blackholePeerMap[port] = val + } else { + s.blackholePeerMap[port] = blackholePeerTypeTx + } +} + +func (s *server) UnblackholePeerRx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val &= bits.Reverse8(blackholePeerTypeRx) + s.blackholePeerMap[port] = val + } +} + func (s *server) PauseTx() { s.pauseTxMu.Lock() s.pauseTxc = make(chan struct{}) diff --git a/tests/e2e/blackhole_test.go b/tests/e2e/blackhole_test.go index ee3f41c14453..fdef56f2d67e 100644 --- a/tests/e2e/blackhole_test.go +++ b/tests/e2e/blackhole_test.go @@ -58,13 +58,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea } partitionedMember := epc.Procs[mockPartitionNodeIndex] // Mock partition - forwardProxy := partitionedMember.PeerForwardProxy() - reverseProxy := partitionedMember.PeerReverseProxy() t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name) - forwardProxy.BlackholeTx() - forwardProxy.BlackholeRx() - reverseProxy.BlackholeTx() - reverseProxy.BlackholeRx() + epc.BlackholePeer(partitionedMember) t.Logf("Wait 5s for any open connections to expire") time.Sleep(5 * time.Second) @@ -82,10 +77,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea // Wait for some time to restore the network time.Sleep(1 * time.Second) t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name) - forwardProxy.UnblackholeTx() - forwardProxy.UnblackholeRx() - reverseProxy.UnblackholeTx() - reverseProxy.UnblackholeRx() + epc.UnblackholePeer(partitionedMember) leaderEPC = epc.Procs[epc.WaitLeader(t)] time.Sleep(5 * time.Second) diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go index 74593f0784de..baa5ad81110f 100644 --- a/tests/e2e/http_health_check_test.go +++ b/tests/e2e/http_health_check_test.go @@ -385,12 +385,9 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { member := clus.Procs[0] forwardProxy := member.PeerForwardProxy() - reverseProxy := member.PeerReverseProxy() t.Logf("Blackholing traffic from and to member %q", member.Config().Name) forwardProxy.BlackholeTx() forwardProxy.BlackholeRx() - reverseProxy.BlackholeTx() - reverseProxy.BlackholeRx() } func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 70fbe5b9e0e0..36ce0d9fa60a 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -481,13 +481,12 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig { var curls []string var curl string - port := cfg.BasePort + 6*i + port := cfg.BasePort + 5*i clientPort := port peerPort := port + 1 // the port that the peer actually listens on metricsPort := port + 2 - reverseProxyPort := port + 3 // the port that the peer advertises - clientHTTPPort := port + 4 - forwardProxyPort := port + 5 + clientHTTPPort := port + 3 + forwardProxyPort := port + 4 if cfg.Client.ConnectionType == ClientTLSAndNonTLS { curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) @@ -500,20 +499,11 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} var forwardProxyCfg *proxy.ServerConfig - var reverseProxyCfg *proxy.ServerConfig if cfg.PeerProxy { if !cfg.IsPeerTLS { panic("Can't use peer proxy without peer TLS as it can result in malformed packets") } - // setup reverse proxy - peerAdvertiseURL.Host = fmt.Sprintf("localhost:%d", reverseProxyPort) - reverseProxyCfg = &proxy.ServerConfig{ - Logger: zap.NewNop(), - To: peerListenURL, - From: peerAdvertiseURL, - } - // setup forward proxy forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)} forwardProxyCfg = &proxy.ServerConfig{ @@ -647,7 +637,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in InitialToken: cfg.ServerConfig.InitialClusterToken, GoFailPort: gofailPort, GoFailClientTimeout: cfg.GoFailClientTimeout, - ReverseProxy: reverseProxyCfg, ForwardProxy: forwardProxyCfg, LazyFSEnabled: cfg.LazyFSEnabled, } @@ -898,6 +887,38 @@ func (epc *EtcdProcessCluster) Restart(ctx context.Context) error { return epc.start(func(ep EtcdProcess) error { return ep.Restart(ctx) }) } +func (epc *EtcdProcessCluster) BlackholePeer(blackholePeer EtcdProcess) error { + blackholePeer.PeerForwardProxy().BlackholeRx() + blackholePeer.PeerForwardProxy().BlackholeTx() + + for _, peer := range epc.Procs { + if peer.Config().Name == blackholePeer.Config().Name { + continue + } + + peer.PeerForwardProxy().BlackholePeerRx(blackholePeer.Config().PeerURL) + peer.PeerForwardProxy().BlackholePeerTx(blackholePeer.Config().PeerURL) + } + + return nil +} + +func (epc *EtcdProcessCluster) UnblackholePeer(blackholePeer EtcdProcess) error { + blackholePeer.PeerForwardProxy().UnblackholeRx() + blackholePeer.PeerForwardProxy().UnblackholeTx() + + for _, peer := range epc.Procs { + if peer.Config().Name == blackholePeer.Config().Name { + continue + } + + peer.PeerForwardProxy().UnblackholePeerRx(blackholePeer.Config().PeerURL) + peer.PeerForwardProxy().UnblackholePeerTx(blackholePeer.Config().PeerURL) + } + + return nil +} + func (epc *EtcdProcessCluster) start(f func(ep EtcdProcess) error) error { readyC := make(chan error, len(epc.Procs)) for i := range epc.Procs { diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index e84400d7239a..05f345e3dd5b 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -55,7 +55,6 @@ type EtcdProcess interface { Stop() error Close() error Config() *EtcdServerProcessConfig - PeerReverseProxy() proxy.Server PeerForwardProxy() proxy.Server Failpoints() *BinaryFailpoints LazyFS() *LazyFS @@ -73,7 +72,6 @@ type EtcdServerProcess struct { cfg *EtcdServerProcessConfig proc *expect.ExpectProcess forwardProxy proxy.Server - reverseProxy proxy.Server lazyfs *LazyFS failpoints *BinaryFailpoints donec chan struct{} // closed when Interact() terminates @@ -103,7 +101,6 @@ type EtcdServerProcessConfig struct { GoFailClientTimeout time.Duration LazyFSEnabled bool - ReverseProxy *proxy.ServerConfig ForwardProxy *proxy.ServerConfig } @@ -155,19 +152,6 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { panic("already started") } - if !((ep.cfg.ReverseProxy != nil && ep.cfg.ForwardProxy != nil) || (ep.cfg.ReverseProxy == nil && ep.cfg.ForwardProxy == nil)) { - panic("both forward and reverse proxy confiugration files must exist or not exist at the same time") - } - - if ep.cfg.ReverseProxy != nil && ep.reverseProxy == nil { - ep.cfg.lg.Info("starting reverse proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.ReverseProxy.From.String()), zap.String("to", ep.cfg.ReverseProxy.To.String())) - ep.reverseProxy = proxy.NewServer(*ep.cfg.ReverseProxy) - select { - case <-ep.reverseProxy.Ready(): - case err := <-ep.reverseProxy.Error(): - return err - } - } if ep.cfg.ForwardProxy != nil && ep.forwardProxy == nil { ep.cfg.lg.Info("starting forward proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.ForwardProxy.From.String()), zap.String("to", ep.cfg.ForwardProxy.To.String())) ep.forwardProxy = proxy.NewServer(*ep.cfg.ForwardProxy) @@ -246,14 +230,6 @@ func (ep *EtcdServerProcess) Stop() (err error) { return err } } - if ep.reverseProxy != nil { - ep.cfg.lg.Info("stopping reverse proxy...", zap.String("name", ep.cfg.Name)) - err = ep.reverseProxy.Close() - ep.reverseProxy = nil - if err != nil { - return err - } - } if ep.lazyfs != nil { ep.cfg.lg.Info("stopping lazyfs...", zap.String("name", ep.cfg.Name)) err = ep.lazyfs.Stop() @@ -351,10 +327,6 @@ func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) { } } -func (ep *EtcdServerProcess) PeerReverseProxy() proxy.Server { - return ep.reverseProxy -} - func (ep *EtcdServerProcess) PeerForwardProxy() proxy.Server { return ep.forwardProxy } diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index e3911ab7b7be..e955a140f30c 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -62,24 +62,15 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces if tb.waitTillSnapshot && entriesToGuaranteeSnapshot(config) > 200 { return false } - return config.ClusterSize > 1 && process.PeerForwardProxy() != nil && process.PeerReverseProxy() != nil + return config.ClusterSize > 1 && process.PeerForwardProxy() != nil } func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { - reverseProxy := member.PeerReverseProxy() - forwardProxy := member.PeerForwardProxy() - t.Logf("Blackholing traffic from and to member %q", member.Config().Name) - reverseProxy.BlackholeTx() - reverseProxy.BlackholeRx() - forwardProxy.BlackholeTx() - forwardProxy.BlackholeRx() + clus.BlackholePeer(member) defer func() { t.Logf("Traffic restored from and to member %q", member.Config().Name) - reverseProxy.UnblackholeTx() - reverseProxy.UnblackholeRx() - forwardProxy.UnblackholeTx() - forwardProxy.UnblackholeRx() + clus.UnblackholePeer(member) }() if shouldWaitTillSnapshot { @@ -166,18 +157,13 @@ type delayPeerNetworkFailpoint struct { func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] - reverseProxy := member.PeerReverseProxy() forwardProxy := member.PeerForwardProxy() - reverseProxy.DelayRx(f.baseLatency, f.randomizedLatency) - reverseProxy.DelayTx(f.baseLatency, f.randomizedLatency) forwardProxy.DelayRx(f.baseLatency, f.randomizedLatency) forwardProxy.DelayTx(f.baseLatency, f.randomizedLatency) lg.Info("Delaying traffic from and to member", zap.String("member", member.Config().Name), zap.Duration("baseLatency", f.baseLatency), zap.Duration("randomizedLatency", f.randomizedLatency)) time.Sleep(f.duration) lg.Info("Traffic delay removed", zap.String("member", member.Config().Name)) - reverseProxy.UndelayRx() - reverseProxy.UndelayTx() forwardProxy.UndelayRx() forwardProxy.UndelayTx() return nil, nil @@ -188,7 +174,7 @@ func (f delayPeerNetworkFailpoint) Name() string { } func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool { - return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil && clus.PeerReverseProxy() != nil + return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil } type dropPeerNetworkFailpoint struct { @@ -198,18 +184,13 @@ type dropPeerNetworkFailpoint struct { func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] - reverseProxy := member.PeerReverseProxy() forwardProxy := member.PeerForwardProxy() - reverseProxy.ModifyRx(f.modifyPacket) - reverseProxy.ModifyTx(f.modifyPacket) forwardProxy.ModifyRx(f.modifyPacket) forwardProxy.ModifyTx(f.modifyPacket) lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent)) time.Sleep(f.duration) lg.Info("Traffic drop removed", zap.String("member", member.Config().Name)) - reverseProxy.UnmodifyRx() - reverseProxy.UnmodifyTx() forwardProxy.UnmodifyRx() forwardProxy.UnmodifyTx() return nil, nil @@ -227,5 +208,5 @@ func (f dropPeerNetworkFailpoint) Name() string { } func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool { - return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil && clus.PeerReverseProxy() != nil + return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil }