Skip to content

Commit

Permalink
Merge pull request #673 from strukturag/reuse-backoff
Browse files Browse the repository at this point in the history
Reuse backoff waiting code where possible
  • Loading branch information
fancycode authored Feb 27, 2024
2 parents fe53c32 + 1a8444c commit df477a7
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 69 deletions.
19 changes: 10 additions & 9 deletions backend_storage_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,24 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) {
}()

go func() {
client.WaitForConnection()
if err := client.WaitForConnection(context.Background()); err != nil {
panic(err)
}

waitDelay := initialWaitDelay
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
if err != nil {
panic(err)
}
for {
response, err := s.getBackends(client, s.keyPrefix)
if err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout getting initial list of backends, retry in %s", waitDelay)
log.Printf("Timeout getting initial list of backends, retry in %s", backoff.NextWait())
} else {
log.Printf("Could not get initial list of backends, retry in %s: %s", waitDelay, err)
log.Printf("Could not get initial list of backends, retry in %s: %s", backoff.NextWait(), err)
}

time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
backoff.Wait(context.Background())
continue
}

Expand Down
24 changes: 14 additions & 10 deletions etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,26 +212,30 @@ func (c *EtcdClient) RemoveListener(listener EtcdClientListener) {
delete(c.listeners, listener)
}

func (c *EtcdClient) WaitForConnection() {
waitDelay := initialWaitDelay
func (c *EtcdClient) WaitForConnection(ctx context.Context) error {
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
if err != nil {
return err
}

for {
if err := ctx.Err(); err != nil {
return err
}

if err := c.syncClient(); err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay)
log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", backoff.NextWait())
} else {
log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err)
log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", backoff.NextWait(), err)
}

time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
backoff.Wait(ctx)
continue
}

log.Printf("Client synced, using endpoints %+v", c.getEtcdClient().Endpoints())
return
return nil
}
}

Expand Down
5 changes: 4 additions & 1 deletion etcd_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) {

go func() {
defer close(l.initial)
client.WaitForConnection()
if err := client.WaitForConnection(l.ctx); err != nil {
l.t.Errorf("error waiting for connection: %s", err)
return
}

ctx, cancel := context.WithTimeout(l.ctx, time.Second)
defer cancel()
Expand Down
4 changes: 3 additions & 1 deletion grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,9 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) {
}()

go func() {
client.WaitForConnection()
if err := client.WaitForConnection(context.Background()); err != nil {
panic(err)
}

backoff, _ := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
for {
Expand Down
15 changes: 8 additions & 7 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,19 +417,20 @@ func (h *Hub) updateGeoDatabase() {
}

defer h.geoipUpdating.Store(false)
delay := time.Second
backoff, err := NewExponentialBackoff(time.Second, 5*time.Minute)
if err != nil {
log.Printf("Could not create exponential backoff: %s", err)
return
}

for !h.closer.IsClosed() {
err := h.geoip.Update()
if err == nil {
break
}

log.Printf("Could not update GeoIP database, will retry later (%s)", err)
time.Sleep(delay)
delay = delay * 2
if delay > 5*time.Minute {
delay = 5 * time.Minute
}
log.Printf("Could not update GeoIP database, will retry in %s (%s)", backoff.NextWait(), err)
backoff.Wait(context.Background())
}
}

Expand Down
27 changes: 11 additions & 16 deletions natsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package signaling

import (
"context"
"encoding/base64"
"fmt"
"log"
Expand Down Expand Up @@ -74,33 +75,27 @@ func NewNatsClient(url string) (NatsClient, error) {
return NewLoopbackNatsClient()
}

backoff, err := NewExponentialBackoff(initialConnectInterval, maxConnectInterval)
if err != nil {
return nil, err
}

client := &natsClient{}

var err error
client.nc, err = nats.Connect(url,
nats.ClosedHandler(client.onClosed),
nats.DisconnectHandler(client.onDisconnected),
nats.ReconnectHandler(client.onReconnected))

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
defer signal.Stop(interrupt)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

delay := initialConnectInterval
timer := time.NewTimer(delay)
// The initial connect must succeed, so we retry in the case of an error.
for err != nil {
log.Printf("Could not create connection (%s), will retry in %s", err, delay)
timer.Reset(delay)
select {
case <-interrupt:
log.Printf("Could not create connection (%s), will retry in %s", err, backoff.NextWait())
backoff.Wait(ctx)
if ctx.Err() != nil {
return nil, fmt.Errorf("interrupted")
case <-timer.C:
// Retry connection
delay = delay * 2
if delay > maxConnectInterval {
delay = maxConnectInterval
}
}

client.nc, err = nats.Connect(url)
Expand Down
26 changes: 10 additions & 16 deletions proxy/proxy_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,15 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error {
mcuType = signaling.McuTypeDefault
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
defer signal.Stop(interrupt)
backoff, err := signaling.NewExponentialBackoff(initialMcuRetry, maxMcuRetry)
if err != nil {
return err
}

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

var err error
var mcu signaling.Mcu
mcuRetry := initialMcuRetry
mcuRetryTimer := time.NewTimer(mcuRetry)
for {
switch mcuType {
case signaling.McuTypeJanus:
Expand All @@ -263,17 +264,10 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error {
break
}

log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, mcuRetry)
mcuRetryTimer.Reset(mcuRetry)
select {
case <-interrupt:
log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, backoff.NextWait())
backoff.Wait(ctx)
if ctx.Err() != nil {
return fmt.Errorf("Cancelled")
case <-mcuRetryTimer.C:
// Retry connection
mcuRetry = mcuRetry * 2
if mcuRetry > maxMcuRetry {
mcuRetry = maxMcuRetry
}
}
}

Expand Down
19 changes: 10 additions & 9 deletions proxy_config_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,24 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) {
}()

go func() {
client.WaitForConnection()
if err := client.WaitForConnection(context.Background()); err != nil {
panic(err)
}

waitDelay := initialWaitDelay
backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay)
if err != nil {
panic(err)
}
for {
response, err := p.getProxyUrls(client, p.keyPrefix)
if err != nil {
if err == context.DeadlineExceeded {
log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay)
log.Printf("Timeout getting initial list of proxy URLs, retry in %s", backoff.NextWait())
} else {
log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err)
log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", backoff.NextWait(), err)
}

time.Sleep(waitDelay)
waitDelay = waitDelay * 2
if waitDelay > maxWaitDelay {
waitDelay = maxWaitDelay
}
backoff.Wait(context.Background())
continue
}

Expand Down

0 comments on commit df477a7

Please sign in to comment.