Skip to content

Commit

Permalink
Merge pull request #245 from dvonthenen/drain-queue-when-signaled
Browse files Browse the repository at this point in the history
Drain the WebSocket Queue of Messages
  • Loading branch information
dvonthenen authored Jun 21, 2024
2 parents 3992263 + ca5a3f8 commit 6cb2efd
Showing 1 changed file with 130 additions and 108 deletions.
238 changes: 130 additions & 108 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (c *Client) internalConnect() *websocket.Conn {

//nolint:funlen // this is a complex function. keep as is
func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn {
klog.V(7).Infof("live.Connect() ENTER\n")
klog.V(7).Infof("live.internalConnectWithCancel() ENTER\n")

// set the context
c.ctx = ctx
Expand All @@ -162,7 +162,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
// we explicitly stopped and should not attempt to reconnect
if !c.retry {
klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.")
klog.V(7).Infof("live.Connect() LEAVE\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
return nil
}

Expand All @@ -178,14 +178,22 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
select {
case <-c.ctx.Done():
klog.V(1).Infof("Connection is not valid\n")
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
return nil
default:

klog.V(7).Infof("Connection is good. Return object.")
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
return c.wsconn
}
} else {
select {
case <-c.ctx.Done():
klog.V(1).Infof("Context is not valid. Has been canceled.\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
return nil
default:
klog.V(3).Infof("Context is still valid. Retry...\n")
}
}

dialer := websocket.Dialer{
Expand Down Expand Up @@ -235,7 +243,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
url, err := version.GetLiveAPI(c.ctx, c.cOptions.Host, c.cOptions.APIVersion, c.cOptions.Path, c.tOptions)
if err != nil {
klog.V(1).Infof("version.GetLiveAPI failed. Err: %v\n", err)
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
return nil // no point in retrying because this is going to fail on every retry
}
klog.V(5).Infof("Connecting to %s\n", url)
Expand Down Expand Up @@ -274,14 +282,14 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
}

klog.V(3).Infof("WebSocket Connection Successful!")
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")

return c.wsconn
}

// if we get here, we failed to connect
klog.V(1).Infof("Failed to connect to websocket: %s\n", c.cOptions.Host)
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")

return nil
}
Expand All @@ -290,7 +298,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
func (c *Client) listen() {
klog.V(6).Infof("live.listen() ENTER\n")

ticker := time.NewTicker(500 * time.Millisecond)
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
Expand All @@ -300,119 +308,133 @@ func (c *Client) listen() {
klog.V(6).Infof("live.listen() LEAVE\n")
return
case <-ticker.C:
ws := c.internalConnect()
if ws == nil {
klog.V(3).Infof("listen: Connection is not valid\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return
}

msgType, byMsg, err := ws.ReadMessage()
if err != nil {
errStr := err.Error()
switch {
case strings.Contains(errStr, SuccessfulSocketErr):
klog.V(3).Infof("Graceful websocket close\n")

// graceful close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, UseOfClosedSocket):
klog.V(3).Infof("Probable graceful websocket close: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr)
}

// fatal close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr)
}

// fatal close
c.closeWs(true)
for {
// doing a read, need to lock
c.muConn.Lock()

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, "Deepgram"):
klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Deepgram ErrorMsg. Err: %v\n", sendErr)
}

// close the connection
c.closeWs(false)
// get the connection
ws := c.internalConnect()
if ws == nil {
// release
c.muConn.Unlock()

klog.V(3).Infof("listen: Connection is not valid\n")
klog.V(6).Infof("live.listen() LEAVE\n")
return
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr)
}
}

// close the connection
c.closeWs(true)
// release the lock
c.muConn.Unlock()

klog.V(6).Infof("live.listen() LEAVE\n")
return
default:
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)
// read the message
msgType, byMsg, err := ws.ReadMessage()

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr)
if err != nil {
errStr := err.Error()
switch {
case strings.Contains(errStr, SuccessfulSocketErr):
klog.V(3).Infof("Graceful websocket close\n")

// graceful close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, UseOfClosedSocket):
klog.V(3).Infof("Probable graceful websocket close: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr)
}

// fatal close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, FatalReadSocketErr):
klog.V(1).Infof("Fatal socket error: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr)
}

// fatal close
c.closeWs(true)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, "Deepgram"):
klog.V(1).Infof("listen: Deepgram error. Err: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Deepgram ErrorMsg. Err: %v\n", sendErr)
}

// close the connection
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
klog.V(3).Infof("stream object EOF\n")

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr)
}

// close the connection
c.closeWs(true)

klog.V(6).Infof("live.listen() LEAVE\n")
return
default:
klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr)
}

// close the connection
c.closeWs(true)

klog.V(6).Infof("live.listen() LEAVE\n")
return
}

// close the connection
c.closeWs(true)

klog.V(6).Infof("live.listen() LEAVE\n")
return
}
}

if len(byMsg) == 0 {
klog.V(7).Infof("listen: message empty")
continue
}
if len(byMsg) == 0 {
klog.V(7).Infof("listen: message empty")
continue
}

// inspect the message
if c.cOptions.InspectMessage() {
err := c.inspect(byMsg)
if err != nil {
klog.V(1).Infof("listen: inspect failed. Err: %v\n", err)
// inspect the message
if c.cOptions.InspectMessage() {
err := c.inspect(byMsg)
if err != nil {
klog.V(1).Infof("listen: inspect failed. Err: %v\n", err)
}
}
}

// callback!
if c.callback != nil {
err := c.router.Message(byMsg)
if err != nil {
klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err)
// callback!
if c.callback != nil {
err := c.router.Message(byMsg)
if err != nil {
klog.V(1).Infof("listen: router.Message failed. Err: %v\n", err)
}
} else {
klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg))
}
} else {
klog.V(7).Infof("listen: msg recv (type %d): %s\n", msgType, string(byMsg))
}
}
}
Expand Down

0 comments on commit 6cb2efd

Please sign in to comment.