From ca5a3f8d19204d043a2bbb46b0fdcbe2bbc34da5 Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Thu, 20 Jun 2024 17:03:36 -0700 Subject: [PATCH] Drain the WebSocket Queue of Messages --- pkg/client/live/client.go | 238 +++++++++++++++++++++----------------- 1 file changed, 130 insertions(+), 108 deletions(-) diff --git a/pkg/client/live/client.go b/pkg/client/live/client.go index 5f540a03..b5fc831d 100644 --- a/pkg/client/live/client.go +++ b/pkg/client/live/client.go @@ -152,7 +152,7 @@ func (c *Client) internalConnect() *websocket.Conn { //nolint:funlen // this is a complex function. keep as is func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn { - klog.V(7).Infof("live.Connect() ENTER\n") + klog.V(7).Infof("live.internalConnectWithCancel() ENTER\n") // set the context c.ctx = ctx @@ -162,7 +162,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex // we explicitly stopped and should not attempt to reconnect if !c.retry { klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.") - klog.V(7).Infof("live.Connect() LEAVE\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") return nil } @@ -178,14 +178,22 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex select { case <-c.ctx.Done(): klog.V(1).Infof("Connection is not valid\n") - klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") return nil default: - klog.V(7).Infof("Connection is good. Return object.") - klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") return c.wsconn } + } else { + select { + case <-c.ctx.Done(): + klog.V(1).Infof("Context is not valid. Has been canceled.\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") + return nil + default: + klog.V(3).Infof("Context is still valid. Retry...\n") + } } dialer := websocket.Dialer{ @@ -235,7 +243,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex url, err := version.GetLiveAPI(c.ctx, c.cOptions.Host, c.cOptions.APIVersion, c.cOptions.Path, c.tOptions) if err != nil { klog.V(1).Infof("version.GetLiveAPI failed. Err: %v\n", err) - klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") return nil // no point in retrying because this is going to fail on every retry } klog.V(5).Infof("Connecting to %s\n", url) @@ -274,14 +282,14 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex } klog.V(3).Infof("WebSocket Connection Successful!") - klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") return c.wsconn } // if we get here, we failed to connect klog.V(1).Infof("Failed to connect to websocket: %s\n", c.cOptions.Host) - klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n") + klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n") return nil } @@ -290,7 +298,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex func (c *Client) listen() { klog.V(6).Infof("live.listen() ENTER\n") - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(250 * time.Millisecond) defer ticker.Stop() for { select { @@ -300,119 +308,133 @@ func (c *Client) listen() { klog.V(6).Infof("live.listen() LEAVE\n") return case <-ticker.C: - ws := c.internalConnect() - if ws == nil { - klog.V(3).Infof("listen: Connection is not valid\n") - klog.V(6).Infof("live.listen() LEAVE\n") - return - } - - msgType, byMsg, err := ws.ReadMessage() - if err != nil { - errStr := err.Error() - switch { - case strings.Contains(errStr, SuccessfulSocketErr): - klog.V(3).Infof("Graceful websocket close\n") - - // graceful close - c.closeWs(false) - - klog.V(6).Infof("live.listen() LEAVE\n") - return - case strings.Contains(errStr, UseOfClosedSocket): - klog.V(3).Infof("Probable graceful websocket close: %v\n", err) - - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) - } - - // fatal close - c.closeWs(false) - - klog.V(6).Infof("live.listen() LEAVE\n") - return - case strings.Contains(errStr, FatalReadSocketErr): - klog.V(1).Infof("Fatal socket error: %v\n", err) - - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) - } - - // fatal close - c.closeWs(true) + for { + // doing a read, need to lock + c.muConn.Lock() - klog.V(6).Infof("live.listen() LEAVE\n") - return - case strings.Contains(errStr, "Deepgram"): - klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err) - - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("listen: Deepgram ErrorMsg. Err: %v\n", sendErr) - } - - // close the connection - c.closeWs(false) + // get the connection + ws := c.internalConnect() + if ws == nil { + // release + c.muConn.Unlock() + klog.V(3).Infof("listen: Connection is not valid\n") klog.V(6).Infof("live.listen() LEAVE\n") return - case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: - klog.V(3).Infof("stream object EOF\n") - - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr) - } + } - // close the connection - c.closeWs(true) + // release the lock + c.muConn.Unlock() - klog.V(6).Infof("live.listen() LEAVE\n") - return - default: - klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err) + // read the message + msgType, byMsg, err := ws.ReadMessage() - // send error on callback - sendErr := c.sendError(err) - if sendErr != nil { - klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr) + if err != nil { + errStr := err.Error() + switch { + case strings.Contains(errStr, SuccessfulSocketErr): + klog.V(3).Infof("Graceful websocket close\n") + + // graceful close + c.closeWs(false) + + klog.V(6).Infof("live.listen() LEAVE\n") + return + case strings.Contains(errStr, UseOfClosedSocket): + klog.V(3).Infof("Probable graceful websocket close: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(false) + + klog.V(6).Infof("live.listen() LEAVE\n") + return + case strings.Contains(errStr, FatalReadSocketErr): + klog.V(1).Infof("Fatal socket error: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(true) + + klog.V(6).Infof("live.listen() LEAVE\n") + return + case strings.Contains(errStr, "Deepgram"): + klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Deepgram ErrorMsg. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(false) + + klog.V(6).Infof("live.listen() LEAVE\n") + return + case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: + klog.V(3).Infof("stream object EOF\n") + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("live.listen() LEAVE\n") + return + default: + klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("live.listen() LEAVE\n") + return } - - // close the connection - c.closeWs(true) - - klog.V(6).Infof("live.listen() LEAVE\n") - return } - } - if len(byMsg) == 0 { - klog.V(7).Infof("listen: message empty") - continue - } + if len(byMsg) == 0 { + klog.V(7).Infof("listen: message empty") + continue + } - // inspect the message - if c.cOptions.InspectMessage() { - err := c.inspect(byMsg) - if err != nil { - klog.V(1).Infof("listen: inspect failed. Err: %v\n", err) + // inspect the message + if c.cOptions.InspectMessage() { + err := c.inspect(byMsg) + if err != nil { + klog.V(1).Infof("listen: inspect failed. Err: %v\n", err) + } } - } - // callback! - if c.callback != nil { - err := c.router.Message(byMsg) - if err != nil { - klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err) + // callback! + if c.callback != nil { + err := c.router.Message(byMsg) + if err != nil { + klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err) + } + } else { + klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg)) } - } else { - klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg)) } } }