Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e test reverse proxy unused functions cleanup #18640

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 0 additions & 218 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,6 @@ type Server interface {
// Close closes listener and transport.
Close() error

// PauseAccept stops accepting new connections.
PauseAccept()
// UnpauseAccept removes pause operation on accepting new connections.
UnpauseAccept()

// DelayAccept adds latency ± random variable to accepting
// new incoming connections.
DelayAccept(latency, rv time.Duration)
// UndelayAccept removes sending latencies.
UndelayAccept()
// LatencyAccept returns current latency on accepting
// new incoming connections.
LatencyAccept() time.Duration

// DelayTx adds latency ± random variable for "outgoing" traffic
// in "sending" layer.
DelayTx(latency, rv time.Duration)
Expand Down Expand Up @@ -115,16 +101,6 @@ type Server interface {
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()

// PauseTx stops "forwarding" packets; "outgoing" traffic blocks.
PauseTx()
// UnpauseTx removes "forwarding" pause operation.
UnpauseTx()

// PauseRx stops "receiving" packets; "incoming" traffic blocks.
PauseRx()
// UnpauseRx removes "receiving" pause operation.
UnpauseRx()

// ResetListener closes and restarts listener.
ResetListener() error
}
Expand Down Expand Up @@ -164,24 +140,12 @@ type server struct {
listenerMu sync.RWMutex
listener net.Listener

pauseAcceptMu sync.Mutex
pauseAcceptc chan struct{}

latencyAcceptMu sync.RWMutex
latencyAccept time.Duration

modifyTxMu sync.RWMutex
modifyTx func(data []byte) []byte

modifyRxMu sync.RWMutex
modifyRx func(data []byte) []byte

pauseTxMu sync.Mutex
pauseTxc chan struct{}

pauseRxMu sync.Mutex
pauseRxc chan struct{}

latencyTxMu sync.RWMutex
latencyTx time.Duration

Expand All @@ -207,10 +171,6 @@ func NewServer(cfg ServerConfig) Server {
readyc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 16),

pauseAcceptc: make(chan struct{}),
pauseTxc: make(chan struct{}),
pauseRxc: make(chan struct{}),
}

_, fromPort, err := net.SplitHostPort(cfg.From.Host)
Expand All @@ -233,10 +193,6 @@ func NewServer(cfg ServerConfig) Server {
s.retryInterval = defaultRetryInterval
}

close(s.pauseAcceptc)
close(s.pauseTxc)
close(s.pauseRxc)

if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
Expand Down Expand Up @@ -290,26 +246,6 @@ func (s *server) listenAndServe() {
close(s.readyc)

for {
s.pauseAcceptMu.Lock()
pausec := s.pauseAcceptc
s.pauseAcceptMu.Unlock()
select {
case <-pausec:
case <-s.donec:
return
}

s.latencyAcceptMu.RLock()
lat := s.latencyAccept
s.latencyAcceptMu.RUnlock()
if lat > 0 {
select {
case <-time.After(lat):
case <-s.donec:
return
}
}

s.listenerMu.RLock()
ln := s.listener
s.listenerMu.RUnlock()
Expand Down Expand Up @@ -495,27 +431,6 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
panic("unknown proxy type")
}

// pause before packet dropping, blocking, and forwarding
var pausec chan struct{}
switch ptype {
case proxyTx:
s.pauseTxMu.Lock()
pausec = s.pauseTxc
s.pauseTxMu.Unlock()
case proxyRx:
s.pauseRxMu.Lock()
pausec = s.pauseRxc
s.pauseRxMu.Unlock()
default:
panic("unknown proxy type")
}
select {
case <-pausec:
case <-s.donec:
return
}

// pause first, and then drop packets
if nr2 == 0 {
continue
}
Expand Down Expand Up @@ -645,77 +560,6 @@ func (s *server) Close() (err error) {
return err
}

func (s *server) PauseAccept() {
s.pauseAcceptMu.Lock()
s.pauseAcceptc = make(chan struct{})
s.pauseAcceptMu.Unlock()

s.lg.Info(
"paused accept",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UnpauseAccept() {
s.pauseAcceptMu.Lock()
select {
case <-s.pauseAcceptc: // already unpaused
case <-s.donec:
s.pauseAcceptMu.Unlock()
return
default:
close(s.pauseAcceptc)
}
s.pauseAcceptMu.Unlock()

s.lg.Info(
"unpaused accept",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) DelayAccept(latency, rv time.Duration) {
if latency <= 0 {
return
}
d := computeLatency(latency, rv)
s.latencyAcceptMu.Lock()
s.latencyAccept = d
s.latencyAcceptMu.Unlock()

s.lg.Info(
"set accept latency",
zap.Duration("latency", d),
zap.Duration("given-latency", latency),
zap.Duration("given-latency-random-variable", rv),
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UndelayAccept() {
s.latencyAcceptMu.Lock()
d := s.latencyAccept
s.latencyAccept = 0
s.latencyAcceptMu.Unlock()

s.lg.Info(
"removed accept latency",
zap.Duration("latency", d),
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) LatencyAccept() time.Duration {
s.latencyAcceptMu.RLock()
d := s.latencyAccept
s.latencyAcceptMu.RUnlock()
return d
}

func (s *server) DelayTx(latency, rv time.Duration) {
if latency <= 0 {
return
Expand Down Expand Up @@ -897,68 +741,6 @@ func (s *server) UnblackholeRx() {
)
}

func (s *server) PauseTx() {
s.pauseTxMu.Lock()
s.pauseTxc = make(chan struct{})
s.pauseTxMu.Unlock()

s.lg.Info(
"paused tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) UnpauseTx() {
s.pauseTxMu.Lock()
select {
case <-s.pauseTxc: // already unpaused
case <-s.donec:
s.pauseTxMu.Unlock()
return
default:
close(s.pauseTxc)
}
s.pauseTxMu.Unlock()

s.lg.Info(
"unpaused tx",
zap.String("from", s.From()),
zap.String("to", s.To()),
)
}

func (s *server) PauseRx() {
s.pauseRxMu.Lock()
s.pauseRxc = make(chan struct{})
s.pauseRxMu.Unlock()

s.lg.Info(
"paused rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}

func (s *server) UnpauseRx() {
s.pauseRxMu.Lock()
select {
case <-s.pauseRxc: // already unpaused
case <-s.donec:
s.pauseRxMu.Unlock()
return
default:
close(s.pauseRxc)
}
s.pauseRxMu.Unlock()

s.lg.Info(
"unpaused rx",
zap.String("from", s.To()),
zap.String("to", s.From()),
)
}

func (s *server) ResetListener() error {
s.listenerMu.Lock()
defer s.listenerMu.Unlock()
Expand Down
Loading
Loading