Skip to content

Commit

Permalink
Remove reverse proxy and keep only forward proxy for all peers
Browse files Browse the repository at this point in the history
  • Loading branch information
henrybear327 committed May 12, 2024
1 parent 9df3252 commit 0e061d2
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 80 deletions.
142 changes: 135 additions & 7 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"math/bits"
mrand "math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -116,6 +117,16 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// ??
BlackholePeerTx(peer url.URL)
// ??
UnblackholePeerTx(peer url.URL)

// ??
BlackholePeerRx(peer url.URL)
// ??
UnblackholePeerRx(peer url.URL)

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
Expand All @@ -142,6 +153,12 @@ type ServerConfig struct {
IsForwardProxy bool
}

const (
blackholePeerTypeNone uint8 = iota
blackholePeerTypeTx
blackholePeerTypeRx
)

type server struct {
lg *zap.Logger

Expand Down Expand Up @@ -191,6 +208,9 @@ type server struct {

latencyRxMu sync.RWMutex
latencyRx time.Duration

blackholePeerMap map[int]uint8 // port number, blackhole type
blackholePeerMapMu sync.RWMutex
}

// NewServer returns a proxy implementation with no iptables/tc dependencies.
Expand All @@ -217,6 +237,8 @@ func NewServer(cfg ServerConfig) Server {
pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),

blackholePeerMap: make(map[int]uint8),
}

if _, fromPort, err := net.SplitHostPort(cfg.From.Host); err == nil {
Expand Down Expand Up @@ -455,30 +477,58 @@ func (s *server) listenAndServe() {
continue
}

var dstPort int
dstPort, err = getPort(out.RemoteAddr())
if err != nil {
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
s.lg.Debug("failed to parse port in transmit", zap.Error(err))
return
}

s.closeWg.Add(2)
go func() {
defer s.closeWg.Done()
// read incoming bytes from listener, dispatch to outgoing connection
s.transmit(out, in)
s.transmit(out, in, dstPort)
out.Close()
in.Close()
}()
go func() {
defer s.closeWg.Done()
// read response from outgoing connection, write back to listener
s.receive(in, out)
s.receive(in, out, dstPort)
in.Close()
out.Close()
}()
}
}

func (s *server) transmit(dst io.Writer, src io.Reader) {
s.ioCopy(dst, src, proxyTx)
func getPort(addr net.Addr) (int, error) {
switch addr := addr.(type) {
case *net.TCPAddr:
return addr.Port, nil
case *net.UDPAddr:
return addr.Port, nil
default:
return 0, fmt.Errorf("unsupported address type: %T", addr)
}
}

func (s *server) transmit(dst, src net.Conn, port int) {
s.ioCopy(dst, src, proxyTx, port)
}

func (s *server) receive(dst io.Writer, src io.Reader) {
s.ioCopy(dst, src, proxyRx)
func (s *server) receive(dst, src net.Conn, port int) {
s.ioCopy(dst, src, proxyRx, port)
}

type proxyType uint8
Expand All @@ -488,7 +538,7 @@ const (
proxyRx
)

func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
func (s *server) ioCopy(dst, src net.Conn, ptype proxyType, peerPort int) {
buf := make([]byte, s.bufferSize)
for {
nr1, err := src.Read(buf)
Expand Down Expand Up @@ -529,12 +579,30 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
data = s.modifyTx(data)
}
s.modifyTxMu.RUnlock()

s.blackholePeerMapMu.RLock()
// Tx from other peers is Rx for the target peer
if val, exist := s.blackholePeerMap[peerPort]; exist {
if (val & blackholePeerTypeRx) > 0 {
data = nil
}
}
s.blackholePeerMapMu.RUnlock()
case proxyRx:
s.modifyRxMu.RLock()
if s.modifyRx != nil {
data = s.modifyRx(data)
}
s.modifyRxMu.RUnlock()

s.blackholePeerMapMu.RLock()
// Rx from other peers is Tx for the target peer
if val, exist := s.blackholePeerMap[peerPort]; exist {
if (val & blackholePeerTypeTx) > 0 {
data = nil
}
}
s.blackholePeerMapMu.RUnlock()
default:
panic("unknown proxy type")
}
Expand Down Expand Up @@ -962,6 +1030,66 @@ func (s *server) UnblackholeRx() {
)
}

func (s *server) BlackholePeerTx(peer url.URL) {
s.blackholePeerMapMu.Lock()
defer s.blackholePeerMapMu.Unlock()

port, err := strconv.Atoi(peer.Port())
if err != nil {
panic("port parsing failed")
}
if val, exist := s.blackholePeerMap[port]; exist {
val |= blackholePeerTypeTx
s.blackholePeerMap[port] = val
} else {
s.blackholePeerMap[port] = blackholePeerTypeTx
}
}

func (s *server) UnblackholePeerTx(peer url.URL) {
s.blackholePeerMapMu.Lock()
defer s.blackholePeerMapMu.Unlock()

port, err := strconv.Atoi(peer.Port())
if err != nil {
panic("port parsing failed")
}
if val, exist := s.blackholePeerMap[port]; exist {
val &= bits.Reverse8(blackholePeerTypeTx)
s.blackholePeerMap[port] = val
}
}

func (s *server) BlackholePeerRx(peer url.URL) {
s.blackholePeerMapMu.Lock()
defer s.blackholePeerMapMu.Unlock()

port, err := strconv.Atoi(peer.Port())
if err != nil {
panic("port parsing failed")
}
if val, exist := s.blackholePeerMap[port]; exist {
val |= blackholePeerTypeRx
s.blackholePeerMap[port] = val
} else {
s.blackholePeerMap[port] = blackholePeerTypeTx
}
}

func (s *server) UnblackholePeerRx(peer url.URL) {
s.blackholePeerMapMu.Lock()
defer s.blackholePeerMapMu.Unlock()

port, err := strconv.Atoi(peer.Port())
if err != nil {
panic("port parsing failed")
}
if val, exist := s.blackholePeerMap[port]; exist {
val &= bits.Reverse8(blackholePeerTypeRx)
s.blackholePeerMap[port] = val
}
}

func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
Expand Down
12 changes: 2 additions & 10 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
}
partitionedMember := epc.Procs[mockPartitionNodeIndex]
// Mock partition
forwardProxy := partitionedMember.PeerForwardProxy()
reverseProxy := partitionedMember.PeerReverseProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
reverseProxy.BlackholeTx()
reverseProxy.BlackholeRx()
epc.BlackholePeer(partitionedMember)

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -82,10 +77,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
// Wait for some time to restore the network
time.Sleep(1 * time.Second)
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()
reverseProxy.UnblackholeTx()
reverseProxy.UnblackholeRx()
epc.UnblackholePeer(partitionedMember)

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
3 changes: 0 additions & 3 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,9 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl
func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
member := clus.Procs[0]
forwardProxy := member.PeerForwardProxy()
reverseProxy := member.PeerReverseProxy()
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
reverseProxy.BlackholeTx()
reverseProxy.BlackholeRx()
}

func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
Expand Down
58 changes: 44 additions & 14 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,13 +481,12 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer
func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig {
var curls []string
var curl string
port := cfg.BasePort + 6*i
port := cfg.BasePort + 5*i
clientPort := port
peerPort := port + 1 // the port that the peer actually listens on
metricsPort := port + 2
reverseProxyPort := port + 3 // the port that the peer advertises
clientHTTPPort := port + 4
forwardProxyPort := port + 5
clientHTTPPort := port + 3
forwardProxyPort := port + 4

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
Expand All @@ -500,20 +499,11 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var forwardProxyCfg *proxy.ServerConfig
var reverseProxyCfg *proxy.ServerConfig
if cfg.PeerProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
}

// setup reverse proxy
peerAdvertiseURL.Host = fmt.Sprintf("localhost:%d", reverseProxyPort)
reverseProxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
To: peerListenURL,
From: peerAdvertiseURL,
}

// setup forward proxy
forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)}
forwardProxyCfg = &proxy.ServerConfig{
Expand Down Expand Up @@ -647,7 +637,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
InitialToken: cfg.ServerConfig.InitialClusterToken,
GoFailPort: gofailPort,
GoFailClientTimeout: cfg.GoFailClientTimeout,
ReverseProxy: reverseProxyCfg,
ForwardProxy: forwardProxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
}
Expand Down Expand Up @@ -898,6 +887,47 @@ func (epc *EtcdProcessCluster) Restart(ctx context.Context) error {
return epc.start(func(ep EtcdProcess) error { return ep.Restart(ctx) })
}

/*
There are stopAll flag, and peerURL blocking map
stopAll flag takes precedent over the peeerURL blocking map
*/
func (epc *EtcdProcessCluster) BlackholePeer(blackholePeer EtcdProcess) error {
// for the peer itself, the proxy will stop all TX and RX by setting a flag
blackholePeer.PeerForwardProxy().BlackholeRx()
blackholePeer.PeerForwardProxy().BlackholeTx()

// for all the other peers, we black the TX and RX based on the dst and src
for _, peer := range epc.Procs {
if peer.Config().Name == blackholePeer.Config().Name {
continue
}

peer.PeerForwardProxy().BlackholePeerRx(blackholePeer.Config().PeerURL)
peer.PeerForwardProxy().BlackholePeerTx(blackholePeer.Config().PeerURL)
}

return nil
}

func (epc *EtcdProcessCluster) UnblackholePeer(blackholePeer EtcdProcess) error {
// for the peer itself, unset the flag
blackholePeer.PeerForwardProxy().UnblackholeRx()
blackholePeer.PeerForwardProxy().UnblackholeTx()

// for all the other peers, we black the TX and RX based on the dst and src
for _, peer := range epc.Procs {
if peer.Config().Name == blackholePeer.Config().Name {
continue
}

peer.PeerForwardProxy().UnblackholePeerRx(blackholePeer.Config().PeerURL)
peer.PeerForwardProxy().UnblackholePeerTx(blackholePeer.Config().PeerURL)
}

return nil
}

func (epc *EtcdProcessCluster) start(f func(ep EtcdProcess) error) error {
readyC := make(chan error, len(epc.Procs))
for i := range epc.Procs {
Expand Down
Loading

0 comments on commit 0e061d2

Please sign in to comment.