Skip to content

Commit

Permalink
Remove reverse proxy and keep only forward proxy for all peers
Browse files Browse the repository at this point in the history
There is no need of reverse proxy after the introduction of forward
proxy, as the forward proxy holds the information of the destination,
we can filter connection traffic from there, and this can reduce 1 hop
when the proxy is turned on.

Signed-off-by: Chun-Hung Tseng <[email protected]>
  • Loading branch information
henrybear327 committed May 18, 2024
1 parent ff5824f commit 0df1446
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 86 deletions.
144 changes: 137 additions & 7 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"math/bits"
mrand "math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -116,6 +117,16 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// ??
BlackholePeerTx(peer url.URL)
// ??
UnblackholePeerTx(peer url.URL)

// ??
BlackholePeerRx(peer url.URL)
// ??
UnblackholePeerRx(peer url.URL)

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
Expand All @@ -142,6 +153,12 @@ type ServerConfig struct {
IsForwardProxy bool
}

const (
blackholePeerTypeNone uint8 = iota
blackholePeerTypeTx
blackholePeerTypeRx
)

type server struct {
lg *zap.Logger

Expand Down Expand Up @@ -191,6 +208,9 @@ type server struct {

latencyRxMu sync.RWMutex
latencyRx time.Duration

blackholePeerMap map[int]uint8 // port number, blackhole type
blackholePeerMapMu sync.RWMutex
}

// NewServer returns a proxy implementation with no iptables/tc dependencies.
Expand All @@ -217,6 +237,8 @@ func NewServer(cfg ServerConfig) Server {
pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),

blackholePeerMap: make(map[int]uint8),
}

if _, fromPort, err := net.SplitHostPort(cfg.From.Host); err == nil {
Expand Down Expand Up @@ -455,30 +477,60 @@ func (s *server) listenAndServe() {
continue
}

var dstPort int
dstPort, err = getPort(out.RemoteAddr())
if err != nil {
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
s.lg.Debug("failed to parse port in transmit", zap.Error(err))
return
}

s.closeWg.Add(2)
go func() {
defer s.closeWg.Done()
// read incoming bytes from listener, dispatch to outgoing connection
s.transmit(out, in)
s.transmit(out, in, dstPort)
out.Close()
in.Close()
}()
go func() {
defer s.closeWg.Done()
// read response from outgoing connection, write back to listener
s.receive(in, out)
s.receive(in, out, dstPort)
in.Close()
out.Close()
}()
}
}

func (s *server) transmit(dst io.Writer, src io.Reader) {
s.ioCopy(dst, src, proxyTx)
func getPort(addr net.Addr) (int, error) {
switch addr := addr.(type) {
case *net.TCPAddr:
return addr.Port, nil
case *net.UDPAddr:
return addr.Port, nil
case *net.UnixAddr:
return -1, nil
default:
return 0, fmt.Errorf("unsupported address type: %T", addr)
}
}

func (s *server) transmit(dst, src net.Conn, port int) {
s.ioCopy(dst, src, proxyTx, port)
}

func (s *server) receive(dst io.Writer, src io.Reader) {
s.ioCopy(dst, src, proxyRx)
func (s *server) receive(dst, src net.Conn, port int) {
s.ioCopy(dst, src, proxyRx, port)
}

type proxyType uint8
Expand All @@ -488,7 +540,7 @@ const (
proxyRx
)

func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
func (s *server) ioCopy(dst, src net.Conn, ptype proxyType, peerPort int) {
buf := make([]byte, s.bufferSize)
for {
nr1, err := src.Read(buf)
Expand Down Expand Up @@ -529,12 +581,30 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
data = s.modifyTx(data)
}
s.modifyTxMu.RUnlock()

s.blackholePeerMapMu.RLock()
// Tx from other peers is Rx for the target peer
if val, exist := s.blackholePeerMap[peerPort]; exist {
if (val & blackholePeerTypeRx) > 0 {
data = nil
}
}
s.blackholePeerMapMu.RUnlock()
case proxyRx:
s.modifyRxMu.RLock()
if s.modifyRx != nil {
data = s.modifyRx(data)
}
s.modifyRxMu.RUnlock()

s.blackholePeerMapMu.RLock()
// Rx from other peers is Tx for the target peer
if val, exist := s.blackholePeerMap[peerPort]; exist {
if (val & blackholePeerTypeTx) > 0 {
data = nil
}
}
s.blackholePeerMapMu.RUnlock()
default:
panic("unknown proxy type")
}
Expand Down Expand Up @@ -962,6 +1032,66 @@ func (s *server) UnblackholeRx() {
)
}

func (s *server) BlackholePeerTx(peer url.URL) {
s.blackholePeerMapMu.Lock()
defer s.blackholePeerMapMu.Unlock()

port, err := strconv.Atoi(peer.Port())
if err != nil {
panic("port parsing failed")
}
if val, exist := s.blackholePeerMap[port]; exist {
val |= blackholePeerTypeTx
s.blackholePeerMap[port] = val
} else {
s.blackholePeerMap[port] = blackholePeerTypeTx
}
}

func (s *server) UnblackholePeerTx(peer url.URL) {
s.blackholePeerMapMu.Lock()
defer s.blackholePeerMapMu.Unlock()

port, err := strconv.Atoi(peer.Port())
if err != nil {
panic("port parsing failed")
}
if val, exist := s.blackholePeerMap[port]; exist {
val &= bits.Reverse8(blackholePeerTypeTx)
s.blackholePeerMap[port] = val
}
}

func (s *server) BlackholePeerRx(peer url.URL) {
s.blackholePeerMapMu.Lock()
defer s.blackholePeerMapMu.Unlock()

port, err := strconv.Atoi(peer.Port())
if err != nil {
panic("port parsing failed")
}
if val, exist := s.blackholePeerMap[port]; exist {
val |= blackholePeerTypeRx
s.blackholePeerMap[port] = val
} else {
s.blackholePeerMap[port] = blackholePeerTypeTx
}
}

func (s *server) UnblackholePeerRx(peer url.URL) {
s.blackholePeerMapMu.Lock()
defer s.blackholePeerMapMu.Unlock()

port, err := strconv.Atoi(peer.Port())
if err != nil {
panic("port parsing failed")
}
if val, exist := s.blackholePeerMap[port]; exist {
val &= bits.Reverse8(blackholePeerTypeRx)
s.blackholePeerMap[port] = val
}
}

func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
Expand Down
12 changes: 2 additions & 10 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
}
partitionedMember := epc.Procs[mockPartitionNodeIndex]
// Mock partition
forwardProxy := partitionedMember.PeerForwardProxy()
reverseProxy := partitionedMember.PeerReverseProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
reverseProxy.BlackholeTx()
reverseProxy.BlackholeRx()
epc.BlackholePeer(partitionedMember)

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -82,10 +77,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
// Wait for some time to restore the network
time.Sleep(1 * time.Second)
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()
reverseProxy.UnblackholeTx()
reverseProxy.UnblackholeRx()
epc.UnblackholePeer(partitionedMember)

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
3 changes: 0 additions & 3 deletions tests/e2e/http_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,9 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl
func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
member := clus.Procs[0]
forwardProxy := member.PeerForwardProxy()
reverseProxy := member.PeerReverseProxy()
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
reverseProxy.BlackholeTx()
reverseProxy.BlackholeRx()
}

func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
Expand Down
49 changes: 35 additions & 14 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,13 +481,12 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer
func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig {
var curls []string
var curl string
port := cfg.BasePort + 6*i
port := cfg.BasePort + 5*i
clientPort := port
peerPort := port + 1 // the port that the peer actually listens on
metricsPort := port + 2
reverseProxyPort := port + 3 // the port that the peer advertises
clientHTTPPort := port + 4
forwardProxyPort := port + 5
clientHTTPPort := port + 3
forwardProxyPort := port + 4

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
Expand All @@ -500,20 +499,11 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var forwardProxyCfg *proxy.ServerConfig
var reverseProxyCfg *proxy.ServerConfig
if cfg.PeerProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
}

// setup reverse proxy
peerAdvertiseURL.Host = fmt.Sprintf("localhost:%d", reverseProxyPort)
reverseProxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
To: peerListenURL,
From: peerAdvertiseURL,
}

// setup forward proxy
forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)}
forwardProxyCfg = &proxy.ServerConfig{
Expand Down Expand Up @@ -647,7 +637,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
InitialToken: cfg.ServerConfig.InitialClusterToken,
GoFailPort: gofailPort,
GoFailClientTimeout: cfg.GoFailClientTimeout,
ReverseProxy: reverseProxyCfg,
ForwardProxy: forwardProxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
}
Expand Down Expand Up @@ -898,6 +887,38 @@ func (epc *EtcdProcessCluster) Restart(ctx context.Context) error {
return epc.start(func(ep EtcdProcess) error { return ep.Restart(ctx) })
}

func (epc *EtcdProcessCluster) BlackholePeer(blackholePeer EtcdProcess) error {
blackholePeer.PeerForwardProxy().BlackholeRx()
blackholePeer.PeerForwardProxy().BlackholeTx()

for _, peer := range epc.Procs {
if peer.Config().Name == blackholePeer.Config().Name {
continue
}

peer.PeerForwardProxy().BlackholePeerRx(blackholePeer.Config().PeerURL)
peer.PeerForwardProxy().BlackholePeerTx(blackholePeer.Config().PeerURL)
}

return nil
}

func (epc *EtcdProcessCluster) UnblackholePeer(blackholePeer EtcdProcess) error {
blackholePeer.PeerForwardProxy().UnblackholeRx()
blackholePeer.PeerForwardProxy().UnblackholeTx()

for _, peer := range epc.Procs {
if peer.Config().Name == blackholePeer.Config().Name {
continue
}

peer.PeerForwardProxy().UnblackholePeerRx(blackholePeer.Config().PeerURL)
peer.PeerForwardProxy().UnblackholePeerTx(blackholePeer.Config().PeerURL)
}

return nil
}

func (epc *EtcdProcessCluster) start(f func(ep EtcdProcess) error) error {
readyC := make(chan error, len(epc.Procs))
for i := range epc.Procs {
Expand Down
Loading

0 comments on commit 0df1446

Please sign in to comment.