From f6fe960534003f9ad1ef4f38cd969b2ee0a67347 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 27 Feb 2024 16:12:04 +0100 Subject: [PATCH 1/4] Reuse backoff waiting code in NATS client. --- natsclient.go | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/natsclient.go b/natsclient.go index 5cc17ba2..6a9feb7e 100644 --- a/natsclient.go +++ b/natsclient.go @@ -22,6 +22,7 @@ package signaling import ( + "context" "encoding/base64" "fmt" "log" @@ -74,33 +75,27 @@ func NewNatsClient(url string) (NatsClient, error) { return NewLoopbackNatsClient() } + backoff, err := NewExponentialBackoff(initialConnectInterval, maxConnectInterval) + if err != nil { + return nil, err + } + client := &natsClient{} - var err error client.nc, err = nats.Connect(url, nats.ClosedHandler(client.onClosed), nats.DisconnectHandler(client.onDisconnected), nats.ReconnectHandler(client.onReconnected)) - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - defer signal.Stop(interrupt) + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() - delay := initialConnectInterval - timer := time.NewTimer(delay) // The initial connect must succeed, so we retry in the case of an error. for err != nil { - log.Printf("Could not create connection (%s), will retry in %s", err, delay) - timer.Reset(delay) - select { - case <-interrupt: + log.Printf("Could not create connection (%s), will retry in %s", err, backoff.NextWait()) + backoff.Wait(ctx) + if ctx.Err() != nil { return nil, fmt.Errorf("interrupted") - case <-timer.C: - // Retry connection - delay = delay * 2 - if delay > maxConnectInterval { - delay = maxConnectInterval - } } client.nc, err = nats.Connect(url) From a68454ceeccb4137f26efc0f82c35e4098fea51e Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 27 Feb 2024 16:15:42 +0100 Subject: [PATCH 2/4] Reuse backoff waiting code for errors during GeoIP update. --- hub.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/hub.go b/hub.go index d8349c4a..301ac08c 100644 --- a/hub.go +++ b/hub.go @@ -417,19 +417,20 @@ func (h *Hub) updateGeoDatabase() { } defer h.geoipUpdating.Store(false) - delay := time.Second + backoff, err := NewExponentialBackoff(time.Second, 5*time.Minute) + if err != nil { + log.Printf("Could not create exponential backoff: %s", err) + return + } + for !h.closer.IsClosed() { err := h.geoip.Update() if err == nil { break } - log.Printf("Could not update GeoIP database, will retry later (%s)", err) - time.Sleep(delay) - delay = delay * 2 - if delay > 5*time.Minute { - delay = 5 * time.Minute - } + log.Printf("Could not update GeoIP database, will retry in %s (%s)", backoff.NextWait(), err) + backoff.Wait(context.Background()) } } From bde0b08eb19c6b6e248d2f4c956a348d80f7add1 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 27 Feb 2024 16:27:17 +0100 Subject: [PATCH 3/4] Reuse backoff waiting code in etcd clients. --- backend_storage_etcd.go | 19 ++++++++++--------- etcd_client.go | 24 ++++++++++++++---------- etcd_client_test.go | 5 ++++- grpc_client.go | 4 +++- proxy_config_etcd.go | 19 ++++++++++--------- 5 files changed, 41 insertions(+), 30 deletions(-) diff --git a/backend_storage_etcd.go b/backend_storage_etcd.go index 08717c28..de3e66d5 100644 --- a/backend_storage_etcd.go +++ b/backend_storage_etcd.go @@ -103,23 +103,24 @@ func (s *backendStorageEtcd) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } - waitDelay := initialWaitDelay + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + panic(err) + } for { response, err := s.getBackends(client, s.keyPrefix) if err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout getting initial list of backends, retry in %s", waitDelay) + log.Printf("Timeout getting initial list of backends, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not get initial list of backends, retry in %s: %s", waitDelay, err) + log.Printf("Could not get initial list of backends, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(context.Background()) continue } diff --git a/etcd_client.go b/etcd_client.go index 8da453d9..815a20f3 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -212,26 +212,30 @@ func (c *EtcdClient) RemoveListener(listener EtcdClientListener) { delete(c.listeners, listener) } -func (c *EtcdClient) WaitForConnection() { - waitDelay := initialWaitDelay +func (c *EtcdClient) WaitForConnection(ctx context.Context) error { + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + return err + } + for { + if err := ctx.Err(); err != nil { + return err + } + if err := c.syncClient(); err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", waitDelay) + log.Printf("Timeout waiting for etcd client to connect to the cluster, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", waitDelay, err) + log.Printf("Could not sync etcd client with the cluster, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(ctx) continue } log.Printf("Client synced, using endpoints %+v", c.getEtcdClient().Endpoints()) - return + return nil } } diff --git a/etcd_client_test.go b/etcd_client_test.go index dd85ddb6..d40bf28f 100644 --- a/etcd_client_test.go +++ b/etcd_client_test.go @@ -236,7 +236,10 @@ func (l *EtcdClientTestListener) EtcdClientCreated(client *EtcdClient) { go func() { defer close(l.initial) - client.WaitForConnection() + if err := client.WaitForConnection(l.ctx); err != nil { + l.t.Errorf("error waiting for connection: %s", err) + return + } ctx, cancel := context.WithTimeout(l.ctx, time.Second) defer cancel() diff --git a/grpc_client.go b/grpc_client.go index 8d502262..0461a239 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -594,7 +594,9 @@ func (c *GrpcClients) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } backoff, _ := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) for { diff --git a/proxy_config_etcd.go b/proxy_config_etcd.go index d9ce438c..b03ee0b5 100644 --- a/proxy_config_etcd.go +++ b/proxy_config_etcd.go @@ -93,23 +93,24 @@ func (p *proxyConfigEtcd) EtcdClientCreated(client *EtcdClient) { }() go func() { - client.WaitForConnection() + if err := client.WaitForConnection(context.Background()); err != nil { + panic(err) + } - waitDelay := initialWaitDelay + backoff, err := NewExponentialBackoff(initialWaitDelay, maxWaitDelay) + if err != nil { + panic(err) + } for { response, err := p.getProxyUrls(client, p.keyPrefix) if err != nil { if err == context.DeadlineExceeded { - log.Printf("Timeout getting initial list of proxy URLs, retry in %s", waitDelay) + log.Printf("Timeout getting initial list of proxy URLs, retry in %s", backoff.NextWait()) } else { - log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", waitDelay, err) + log.Printf("Could not get initial list of proxy URLs, retry in %s: %s", backoff.NextWait(), err) } - time.Sleep(waitDelay) - waitDelay = waitDelay * 2 - if waitDelay > maxWaitDelay { - waitDelay = maxWaitDelay - } + backoff.Wait(context.Background()) continue } From 1a8444ca71da4cc7993cbebacca8d8f856529d98 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Tue, 27 Feb 2024 16:41:57 +0100 Subject: [PATCH 4/4] Reuse backoff waiting code for initial proxy MCU connection. --- proxy/proxy_server.go | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/proxy/proxy_server.go b/proxy/proxy_server.go index decd89e2..6978c0e9 100644 --- a/proxy/proxy_server.go +++ b/proxy/proxy_server.go @@ -233,14 +233,15 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error { mcuType = signaling.McuTypeDefault } - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - defer signal.Stop(interrupt) + backoff, err := signaling.NewExponentialBackoff(initialMcuRetry, maxMcuRetry) + if err != nil { + return err + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() - var err error var mcu signaling.Mcu - mcuRetry := initialMcuRetry - mcuRetryTimer := time.NewTimer(mcuRetry) for { switch mcuType { case signaling.McuTypeJanus: @@ -263,17 +264,10 @@ func (s *ProxyServer) Start(config *goconf.ConfigFile) error { break } - log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, mcuRetry) - mcuRetryTimer.Reset(mcuRetry) - select { - case <-interrupt: + log.Printf("Could not initialize %s MCU at %s (%s) will retry in %s", mcuType, s.url, err, backoff.NextWait()) + backoff.Wait(ctx) + if ctx.Err() != nil { return fmt.Errorf("Cancelled") - case <-mcuRetryTimer.C: - // Retry connection - mcuRetry = mcuRetry * 2 - if mcuRetry > maxMcuRetry { - mcuRetry = maxMcuRetry - } } }