diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 890eec4b9b14..4b564a46a75e 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "math/bits" mrand "math/rand" "net" "net/http" @@ -116,6 +117,16 @@ type Server interface { // UnblackholeRx removes blackhole operation on "receiving". UnblackholeRx() + // ?? + BlackholePeerTx(peer url.URL) + // ?? + UnblackholePeerTx(peer url.URL) + + // ?? + BlackholePeerRx(peer url.URL) + // ?? + UnblackholePeerRx(peer url.URL) + // PauseTx stops "forwarding" packets; "outgoing" traffic blocks. PauseTx() // UnpauseTx removes "forwarding" pause operation. @@ -142,6 +153,12 @@ type ServerConfig struct { IsForwardProxy bool } +const ( + blackholePeerTypeNone uint8 = iota + blackholePeerTypeTx + blackholePeerTypeRx +) + type server struct { lg *zap.Logger @@ -191,6 +208,9 @@ type server struct { latencyRxMu sync.RWMutex latencyRx time.Duration + + blackholePeerMap map[int]uint8 // port number, blackhole type + blackholePeerMapMu sync.RWMutex } // NewServer returns a proxy implementation with no iptables/tc dependencies. @@ -217,6 +237,8 @@ func NewServer(cfg ServerConfig) Server { pauseAcceptc: make(chan struct{}), pauseTxc: make(chan struct{}), pauseRxc: make(chan struct{}), + + blackholePeerMap: make(map[int]uint8), } if _, fromPort, err := net.SplitHostPort(cfg.From.Host); err == nil { @@ -455,30 +477,58 @@ func (s *server) listenAndServe() { continue } + var dstPort int + dstPort, err = getPort(out.RemoteAddr()) + if err != nil { + select { + case s.errc <- err: + select { + case <-s.donec: + return + default: + } + case <-s.donec: + return + } + s.lg.Debug("failed to parse port in transmit", zap.Error(err)) + return + } + s.closeWg.Add(2) go func() { defer s.closeWg.Done() // read incoming bytes from listener, dispatch to outgoing connection - s.transmit(out, in) + s.transmit(out, in, dstPort) out.Close() in.Close() }() go func() { defer s.closeWg.Done() // read response from outgoing connection, write back to listener - s.receive(in, out) + s.receive(in, out, dstPort) in.Close() out.Close() }() } } -func (s *server) transmit(dst io.Writer, src io.Reader) { - s.ioCopy(dst, src, proxyTx) +func getPort(addr net.Addr) (int, error) { + switch addr := addr.(type) { + case *net.TCPAddr: + return addr.Port, nil + case *net.UDPAddr: + return addr.Port, nil + default: + return 0, fmt.Errorf("unsupported address type: %T", addr) + } +} + +func (s *server) transmit(dst, src net.Conn, port int) { + s.ioCopy(dst, src, proxyTx, port) } -func (s *server) receive(dst io.Writer, src io.Reader) { - s.ioCopy(dst, src, proxyRx) +func (s *server) receive(dst, src net.Conn, port int) { + s.ioCopy(dst, src, proxyRx, port) } type proxyType uint8 @@ -488,7 +538,7 @@ const ( proxyRx ) -func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { +func (s *server) ioCopy(dst, src net.Conn, ptype proxyType, peerPort int) { buf := make([]byte, s.bufferSize) for { nr1, err := src.Read(buf) @@ -529,12 +579,30 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { data = s.modifyTx(data) } s.modifyTxMu.RUnlock() + + s.blackholePeerMapMu.RLock() + // Tx from other peers is Rx for the target peer + if val, exist := s.blackholePeerMap[peerPort]; exist { + if (val & blackholePeerTypeRx) > 0 { + data = nil + } + } + s.blackholePeerMapMu.RUnlock() case proxyRx: s.modifyRxMu.RLock() if s.modifyRx != nil { data = s.modifyRx(data) } s.modifyRxMu.RUnlock() + + s.blackholePeerMapMu.RLock() + // Rx from other peers is Tx for the target peer + if val, exist := s.blackholePeerMap[peerPort]; exist { + if (val & blackholePeerTypeTx) > 0 { + data = nil + } + } + s.blackholePeerMapMu.RUnlock() default: panic("unknown proxy type") } @@ -962,6 +1030,66 @@ func (s *server) UnblackholeRx() { ) } +func (s *server) BlackholePeerTx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val |= blackholePeerTypeTx + s.blackholePeerMap[port] = val + } else { + s.blackholePeerMap[port] = blackholePeerTypeTx + } +} + +func (s *server) UnblackholePeerTx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val &= bits.Reverse8(blackholePeerTypeTx) + s.blackholePeerMap[port] = val + } +} + +func (s *server) BlackholePeerRx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val |= blackholePeerTypeRx + s.blackholePeerMap[port] = val + } else { + s.blackholePeerMap[port] = blackholePeerTypeTx + } +} + +func (s *server) UnblackholePeerRx(peer url.URL) { + s.blackholePeerMapMu.Lock() + defer s.blackholePeerMapMu.Unlock() + + port, err := strconv.Atoi(peer.Port()) + if err != nil { + panic("port parsing failed") + } + if val, exist := s.blackholePeerMap[port]; exist { + val &= bits.Reverse8(blackholePeerTypeRx) + s.blackholePeerMap[port] = val + } +} + func (s *server) PauseTx() { s.pauseTxMu.Lock() s.pauseTxc = make(chan struct{}) diff --git a/tests/e2e/blackhole_test.go b/tests/e2e/blackhole_test.go index ee3f41c14453..fdef56f2d67e 100644 --- a/tests/e2e/blackhole_test.go +++ b/tests/e2e/blackhole_test.go @@ -58,13 +58,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea } partitionedMember := epc.Procs[mockPartitionNodeIndex] // Mock partition - forwardProxy := partitionedMember.PeerForwardProxy() - reverseProxy := partitionedMember.PeerReverseProxy() t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name) - forwardProxy.BlackholeTx() - forwardProxy.BlackholeRx() - reverseProxy.BlackholeTx() - reverseProxy.BlackholeRx() + epc.BlackholePeer(partitionedMember) t.Logf("Wait 5s for any open connections to expire") time.Sleep(5 * time.Second) @@ -82,10 +77,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea // Wait for some time to restore the network time.Sleep(1 * time.Second) t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name) - forwardProxy.UnblackholeTx() - forwardProxy.UnblackholeRx() - reverseProxy.UnblackholeTx() - reverseProxy.UnblackholeRx() + epc.UnblackholePeer(partitionedMember) leaderEPC = epc.Procs[epc.WaitLeader(t)] time.Sleep(5 * time.Second) diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go index 74593f0784de..baa5ad81110f 100644 --- a/tests/e2e/http_health_check_test.go +++ b/tests/e2e/http_health_check_test.go @@ -385,12 +385,9 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) { member := clus.Procs[0] forwardProxy := member.PeerForwardProxy() - reverseProxy := member.PeerReverseProxy() t.Logf("Blackholing traffic from and to member %q", member.Config().Name) forwardProxy.BlackholeTx() forwardProxy.BlackholeRx() - reverseProxy.BlackholeTx() - reverseProxy.BlackholeRx() } func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) { diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 70fbe5b9e0e0..2333132da819 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -481,13 +481,12 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig { var curls []string var curl string - port := cfg.BasePort + 6*i + port := cfg.BasePort + 5*i clientPort := port peerPort := port + 1 // the port that the peer actually listens on metricsPort := port + 2 - reverseProxyPort := port + 3 // the port that the peer advertises - clientHTTPPort := port + 4 - forwardProxyPort := port + 5 + clientHTTPPort := port + 3 + forwardProxyPort := port + 4 if cfg.Client.ConnectionType == ClientTLSAndNonTLS { curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) @@ -500,20 +499,11 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} var forwardProxyCfg *proxy.ServerConfig - var reverseProxyCfg *proxy.ServerConfig if cfg.PeerProxy { if !cfg.IsPeerTLS { panic("Can't use peer proxy without peer TLS as it can result in malformed packets") } - // setup reverse proxy - peerAdvertiseURL.Host = fmt.Sprintf("localhost:%d", reverseProxyPort) - reverseProxyCfg = &proxy.ServerConfig{ - Logger: zap.NewNop(), - To: peerListenURL, - From: peerAdvertiseURL, - } - // setup forward proxy forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)} forwardProxyCfg = &proxy.ServerConfig{ @@ -647,7 +637,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in InitialToken: cfg.ServerConfig.InitialClusterToken, GoFailPort: gofailPort, GoFailClientTimeout: cfg.GoFailClientTimeout, - ReverseProxy: reverseProxyCfg, ForwardProxy: forwardProxyCfg, LazyFSEnabled: cfg.LazyFSEnabled, } @@ -898,6 +887,47 @@ func (epc *EtcdProcessCluster) Restart(ctx context.Context) error { return epc.start(func(ep EtcdProcess) error { return ep.Restart(ctx) }) } +/* +There are stopAll flag, and peerURL blocking map + +stopAll flag takes precedent over the peeerURL blocking map +*/ +func (epc *EtcdProcessCluster) BlackholePeer(blackholePeer EtcdProcess) error { + // for the peer itself, the proxy will stop all TX and RX by setting a flag + blackholePeer.PeerForwardProxy().BlackholeRx() + blackholePeer.PeerForwardProxy().BlackholeTx() + + // for all the other peers, we black the TX and RX based on the dst and src + for _, peer := range epc.Procs { + if peer.Config().Name == blackholePeer.Config().Name { + continue + } + + peer.PeerForwardProxy().BlackholePeerRx(blackholePeer.Config().PeerURL) + peer.PeerForwardProxy().BlackholePeerTx(blackholePeer.Config().PeerURL) + } + + return nil +} + +func (epc *EtcdProcessCluster) UnblackholePeer(blackholePeer EtcdProcess) error { + // for the peer itself, unset the flag + blackholePeer.PeerForwardProxy().UnblackholeRx() + blackholePeer.PeerForwardProxy().UnblackholeTx() + + // for all the other peers, we black the TX and RX based on the dst and src + for _, peer := range epc.Procs { + if peer.Config().Name == blackholePeer.Config().Name { + continue + } + + peer.PeerForwardProxy().UnblackholePeerRx(blackholePeer.Config().PeerURL) + peer.PeerForwardProxy().UnblackholePeerTx(blackholePeer.Config().PeerURL) + } + + return nil +} + func (epc *EtcdProcessCluster) start(f func(ep EtcdProcess) error) error { readyC := make(chan error, len(epc.Procs)) for i := range epc.Procs { diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index e84400d7239a..05f345e3dd5b 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -55,7 +55,6 @@ type EtcdProcess interface { Stop() error Close() error Config() *EtcdServerProcessConfig - PeerReverseProxy() proxy.Server PeerForwardProxy() proxy.Server Failpoints() *BinaryFailpoints LazyFS() *LazyFS @@ -73,7 +72,6 @@ type EtcdServerProcess struct { cfg *EtcdServerProcessConfig proc *expect.ExpectProcess forwardProxy proxy.Server - reverseProxy proxy.Server lazyfs *LazyFS failpoints *BinaryFailpoints donec chan struct{} // closed when Interact() terminates @@ -103,7 +101,6 @@ type EtcdServerProcessConfig struct { GoFailClientTimeout time.Duration LazyFSEnabled bool - ReverseProxy *proxy.ServerConfig ForwardProxy *proxy.ServerConfig } @@ -155,19 +152,6 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { panic("already started") } - if !((ep.cfg.ReverseProxy != nil && ep.cfg.ForwardProxy != nil) || (ep.cfg.ReverseProxy == nil && ep.cfg.ForwardProxy == nil)) { - panic("both forward and reverse proxy confiugration files must exist or not exist at the same time") - } - - if ep.cfg.ReverseProxy != nil && ep.reverseProxy == nil { - ep.cfg.lg.Info("starting reverse proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.ReverseProxy.From.String()), zap.String("to", ep.cfg.ReverseProxy.To.String())) - ep.reverseProxy = proxy.NewServer(*ep.cfg.ReverseProxy) - select { - case <-ep.reverseProxy.Ready(): - case err := <-ep.reverseProxy.Error(): - return err - } - } if ep.cfg.ForwardProxy != nil && ep.forwardProxy == nil { ep.cfg.lg.Info("starting forward proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.ForwardProxy.From.String()), zap.String("to", ep.cfg.ForwardProxy.To.String())) ep.forwardProxy = proxy.NewServer(*ep.cfg.ForwardProxy) @@ -246,14 +230,6 @@ func (ep *EtcdServerProcess) Stop() (err error) { return err } } - if ep.reverseProxy != nil { - ep.cfg.lg.Info("stopping reverse proxy...", zap.String("name", ep.cfg.Name)) - err = ep.reverseProxy.Close() - ep.reverseProxy = nil - if err != nil { - return err - } - } if ep.lazyfs != nil { ep.cfg.lg.Info("stopping lazyfs...", zap.String("name", ep.cfg.Name)) err = ep.lazyfs.Stop() @@ -351,10 +327,6 @@ func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) { } } -func (ep *EtcdServerProcess) PeerReverseProxy() proxy.Server { - return ep.reverseProxy -} - func (ep *EtcdServerProcess) PeerForwardProxy() proxy.Server { return ep.forwardProxy } diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index 4740efcf5d83..48dae31eb15e 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -57,22 +57,17 @@ func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e } func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool { - return config.ClusterSize > 1 && process.PeerForwardProxy() != nil && process.PeerReverseProxy() != nil + return config.ClusterSize > 1 && process.PeerForwardProxy() != nil } func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { - reverseProxy := member.PeerReverseProxy() forwardProxy := member.PeerForwardProxy() t.Logf("Blackholing traffic from and to member %q", member.Config().Name) - reverseProxy.BlackholeTx() - reverseProxy.BlackholeRx() forwardProxy.BlackholeTx() forwardProxy.BlackholeRx() defer func() { t.Logf("Traffic restored from and to member %q", member.Config().Name) - reverseProxy.UnblackholeTx() - reverseProxy.UnblackholeRx() forwardProxy.UnblackholeTx() forwardProxy.UnblackholeRx() }() @@ -157,18 +152,13 @@ type delayPeerNetworkFailpoint struct { func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] - reverseProxy := member.PeerReverseProxy() forwardProxy := member.PeerForwardProxy() - reverseProxy.DelayRx(f.baseLatency, f.randomizedLatency) - reverseProxy.DelayTx(f.baseLatency, f.randomizedLatency) forwardProxy.DelayRx(f.baseLatency, f.randomizedLatency) forwardProxy.DelayTx(f.baseLatency, f.randomizedLatency) lg.Info("Delaying traffic from and to member", zap.String("member", member.Config().Name), zap.Duration("baseLatency", f.baseLatency), zap.Duration("randomizedLatency", f.randomizedLatency)) time.Sleep(f.duration) lg.Info("Traffic delay removed", zap.String("member", member.Config().Name)) - reverseProxy.UndelayRx() - reverseProxy.UndelayTx() forwardProxy.UndelayRx() forwardProxy.UndelayTx() return nil, nil @@ -179,7 +169,7 @@ func (f delayPeerNetworkFailpoint) Name() string { } func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool { - return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil && clus.PeerReverseProxy() != nil + return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil } type dropPeerNetworkFailpoint struct { @@ -189,18 +179,13 @@ type dropPeerNetworkFailpoint struct { func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] - reverseProxy := member.PeerReverseProxy() forwardProxy := member.PeerForwardProxy() - reverseProxy.ModifyRx(f.modifyPacket) - reverseProxy.ModifyTx(f.modifyPacket) forwardProxy.ModifyRx(f.modifyPacket) forwardProxy.ModifyTx(f.modifyPacket) lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent)) time.Sleep(f.duration) lg.Info("Traffic drop removed", zap.String("member", member.Config().Name)) - reverseProxy.UnmodifyRx() - reverseProxy.UnmodifyTx() forwardProxy.UnmodifyRx() forwardProxy.UnmodifyTx() return nil, nil @@ -218,5 +203,5 @@ func (f dropPeerNetworkFailpoint) Name() string { } func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool { - return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil && clus.PeerReverseProxy() != nil + return config.ClusterSize > 1 && clus.PeerForwardProxy() != nil }