diff --git a/backend_storage_etcd.go b/backend_storage_etcd.go index 08717c28..de3e66d5 100644 --- a/backend_storage_etcd.go +++ b/backend_storage_etcd.go @@ -103,23 +103,24 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } - waitDelay := initialWaitDelay + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + panic(err) + } for { response, err := s.getBackends(client, s.keyPrefix) if err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout getting initial list of backends, retry in %s", waitDelay) + log.Printf("Timeout getting initial list of backends, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not get initial list of backends, retry in %s: %s", waitDelay, err) + log.Printf("Could not get initial list of backends, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(context.Background()) continue } diff --git a/etcd_client.go b/etcd_client.go index 8da453d9..815a20f3 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -212,26 +212,30 @@ func (c *EtcdClient) RemoveListener(listener EtcdClientListener) { delete(c.listeners, listener) } -func (c *EtcdClient) WaitForConnection() { - waitDelay := initialWaitDelay +func (c *EtcdClient) WaitForConnection(ctx context.Context) error { + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + return err + } + for { + if err := ctx.Err(); err != nil { + return err + } + if err := c.syncClient(); err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay) + log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err) + log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(ctx) continue } log.Printf("Client synced, using endpoints %+v", c.getEtcdClient().Endpoints()) - return + return nil } } diff --git a/etcd_client_test.go b/etcd_client_test.go index dd85ddb6..d40bf28f 100644 --- a/etcd_client_test.go +++ b/etcd_client_test.go @@ -236,7 +236,10 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { go func() { defer close(l.initial) - client.WaitForConnection() + if err := client.WaitForConnection(l.ctx); err != nil { + l.t.Errorf("error waiting for connection: %s", err) + return + } ctx, cancel := context.WithTimeout(l.ctx, time.Second) defer cancel() diff --git a/grpc_client.go b/grpc_client.go index 8d502262..0461a239 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -594,7 +594,9 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } backoff, _ := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) for { diff --git a/hub.go b/hub.go index d8349c4a..301ac08c 100644 --- a/hub.go +++ b/hub.go @@ -417,19 +417,20 @@ func (h *Hub) updateGeoDatabase() { } defer h.geoipUpdating.Store(false) - delay := time.Second + backoff, err := NewExponentialBackoff(time.Second, 5*time.Minute) + if err != nil { + log.Printf("Could not create exponential backoff: %s", err) + return + } + for !h.closer.IsClosed() { err := h.geoip.Update() if err == nil { break } - log.Printf("Could not update GeoIP database, will retry later (%s)", err) - time.Sleep(delay) - delay = delay * 2 - if delay > 5*time.Minute { - delay = 5 * time.Minute - } + log.Printf("Could not update GeoIP database, will retry in %s (%s)", backoff.NextWait(), err) + backoff.Wait(context.Background()) } } diff --git a/natsclient.go b/natsclient.go index 5cc17ba2..6a9feb7e 100644 --- a/natsclient.go +++ b/natsclient.go @@ -22,6 +22,7 @@ package signaling import ( + "context" "encoding/base64" "fmt" "log" @@ -74,33 +75,27 @@ func NewNatsClient(url string) (NatsClient, error) { return NewLoopbackNatsClient() } + backoff, err := NewExponentialBackoff(initialConnectInterval, maxConnectInterval) + if err != nil { + return nil, err + } + client := &natsClient{} - var err error client.nc, err = nats.Connect(url, nats.ClosedHandler(client.onClosed), nats.DisconnectHandler(client.onDisconnected), nats.ReconnectHandler(client.onReconnected)) - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - defer signal.Stop(interrupt) + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() - delay := initialConnectInterval - timer := time.NewTimer(delay) // The initial connect must succeed, so we retry in the case of an error. for err != nil { - log.Printf("Could not create connection (%s), will retry in %s", err, delay) - timer.Reset(delay) - select { - case <-interrupt: + log.Printf("Could not create connection (%s), will retry in %s", err, backoff.NextWait()) + backoff.Wait(ctx) + if ctx.Err() != nil { return nil, fmt.Errorf("interrupted") - case <-timer.C: - // Retry connection - delay = delay * 2 - if delay > maxConnectInterval { - delay = maxConnectInterval - } } client.nc, err = nats.Connect(url) diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index decd89e2..6978c0e9 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -233,14 +233,15 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error { mcuType = signaling.McuTypeDefault } - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - defer signal.Stop(interrupt) + backoff, err := signaling.NewExponentialBackoff(initialMcuRetry, maxMcuRetry) + if err != nil { + return err + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() - var err error var mcu signaling.Mcu - mcuRetry := initialMcuRetry - mcuRetryTimer := time.NewTimer(mcuRetry) for { switch mcuType { case signaling.McuTypeJanus: @@ -263,17 +264,10 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error { break } - log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, mcuRetry) - mcuRetryTimer.Reset(mcuRetry) - select { - case <-interrupt: + log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, backoff.NextWait()) + backoff.Wait(ctx) + if ctx.Err() != nil { return fmt.Errorf("Cancelled") - case <-mcuRetryTimer.C: - // Retry connection - mcuRetry = mcuRetry * 2 - if mcuRetry > maxMcuRetry { - mcuRetry = maxMcuRetry - } } } diff --git a/proxy_config_etcd.go b/proxy_config_etcd.go index d9ce438c..b03ee0b5 100644 --- a/proxy_config_etcd.go +++ b/proxy_config_etcd.go @@ -93,23 +93,24 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } - waitDelay := initialWaitDelay + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + panic(err) + } for { response, err := p.getProxyUrls(client, p.keyPrefix) if err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay) + log.Printf("Timeout getting initial list of proxy URLs, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err) + log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(context.Background()) continue }