From 1f722d2ee3ae27e3fd7dc4b4adaaeb0456e798b3 Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Fri, 7 Jun 2024 16:11:28 -0700 Subject: [PATCH] Auto Finalize When Not Sending Audio --- examples/streaming/test/main.go | 8 +- pkg/api/live/v1/interfaces/types.go | 16 +++ pkg/api/live/v1/router.go | 2 +- pkg/api/live/v1/types.go | 19 --- pkg/client/interfaces/options.go | 15 +++ pkg/client/interfaces/types-client.go | 5 + pkg/client/live/client.go | 173 +++++++++++++++++++++++--- pkg/client/live/constants.go | 3 + pkg/client/live/types.go | 7 +- 9 files changed, 212 insertions(+), 36 deletions(-) delete mode 100644 pkg/api/live/v1/types.go diff --git a/examples/streaming/test/main.go b/examples/streaming/test/main.go index 6c0892c9..b847b6dd 100644 --- a/examples/streaming/test/main.go +++ b/examples/streaming/test/main.go @@ -32,6 +32,12 @@ func main() { // Go context ctx := context.Background() + // client options + cOptions := &interfaces.ClientOptions{ + EnableKeepAlive: true, + AutoFlushReplyDelta: 2000, + } + // set the Transcription options tOptions := &interfaces.LiveTranscriptionOptions{ Model: "nova-2", @@ -49,7 +55,7 @@ func main() { } // create a Deepgram client - dgClient, err := client.NewForDemo(ctx, tOptions) + dgClient, err := client.New(ctx, "", cOptions, tOptions, nil) if err != nil { fmt.Println("ERROR creating LiveTranscription connection:", err) return diff --git a/pkg/api/live/v1/interfaces/types.go b/pkg/api/live/v1/interfaces/types.go index 16ccdb17..f73a4ab7 100644 --- a/pkg/api/live/v1/interfaces/types.go +++ b/pkg/api/live/v1/interfaces/types.go @@ -8,6 +8,22 @@ import ( interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" ) +/***********************************/ +// MessageType is the header to bootstrap you way unmarshalling other messages +/***********************************/ +/* + Example: + { + "type": "message", + "message": { + ... + } + } +*/ +type MessageType struct { + Type string `json:"type"` +} + /***********************************/ // shared/common structs /***********************************/ diff --git a/pkg/api/live/v1/router.go b/pkg/api/live/v1/router.go index 2de29b47..fdf3bb24 100644 --- a/pkg/api/live/v1/router.go +++ b/pkg/api/live/v1/router.go @@ -139,7 +139,7 @@ func (r *MessageRouter) Message(byMsg []byte) error { klog.V(5).Infof("Raw Message:\n%s\n", string(byMsg)) } - var mt MessageType + var mt interfaces.MessageType if err := json.Unmarshal(byMsg, &mt); err != nil { klog.V(1).Infof("json.Unmarshal(MessageType) failed. Err: %v\n", err) klog.V(6).Infof("router.Message LEAVE\n") diff --git a/pkg/api/live/v1/types.go b/pkg/api/live/v1/types.go deleted file mode 100644 index 3e30f205..00000000 --- a/pkg/api/live/v1/types.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. -// Use of this source code is governed by a MIT license that can be found in the LICENSE file. -// SPDX-License-Identifier: MIT - -package live - -// MessageType is the header to bootstrap you way unmarshalling other messages -/* - Example: - { - "type": "message", - "message": { - ... - } - } -*/ -type MessageType struct { - Type string `json:"type"` -} diff --git a/pkg/client/interfaces/options.go b/pkg/client/interfaces/options.go index 04f3c595..9764e02d 100644 --- a/pkg/client/interfaces/options.go +++ b/pkg/client/interfaces/options.go @@ -6,6 +6,7 @@ package interfaces import ( "os" + "strconv" "strings" klog "k8s.io/klog/v2" @@ -61,9 +62,23 @@ func (o *ClientOptions) Parse() error { o.EnableKeepAlive = strings.EqualFold(strings.ToLower(v), "true") } + // these require inspecting messages, therefore you must update the InspectMessage() method + if v := os.Getenv("DEEPGRAM_WEBSOCKET_AUTO_FLUSH"); v != "" { + klog.V(3).Infof("DEEPGRAM_WEBSOCKET_AUTO_FLUSH found") + i, err := strconv.ParseInt(v, 10, 64) + if err == nil { + klog.V(3).Infof("DEEPGRAM_WEBSOCKET_AUTO_FLUSH set to %d", i) + o.AutoFlushReplyDelta = i + } + } + return nil } +func (c *ClientOptions) InspectMessage() bool { + return c.AutoFlushReplyDelta != 0 +} + func (o *PreRecordedTranscriptionOptions) Check() error { // checks // currently no op diff --git a/pkg/client/interfaces/types-client.go b/pkg/client/interfaces/types-client.go index 0ca799eb..0c29a8cd 100644 --- a/pkg/client/interfaces/types-client.go +++ b/pkg/client/interfaces/types-client.go @@ -15,7 +15,12 @@ type ClientOptions struct { // shared client options SkipServerAuth bool // keeps the client from authenticating with the server + // prerecorded client options + // live client options RedirectService bool // allows HTTP redirects to be followed EnableKeepAlive bool // enables the keep alive feature + + // these require inspecting messages, therefore you must update the InspectMessage() method + AutoFlushReplyDelta int64 // enables the auto flush feature } diff --git a/pkg/client/live/client.go b/pkg/client/live/client.go index 366ba1cf..a4b2d774 100644 --- a/pkg/client/live/client.go +++ b/pkg/client/live/client.go @@ -229,7 +229,8 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex klog.V(5).Infof("Connecting to %s\n", url) // a single connection attempt - c.mu.Lock() + // Note: not using defer here because we arent leaving the scope of the function + c.muConn.Lock() // perform the websocket connection ws, res, err := dialer.DialContext(c.ctx, url, myHeader) @@ -240,7 +241,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex if err != nil { klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host) klog.V(1).Infof("Dialer failed. Err: %v\n", err) - c.mu.Unlock() + c.muConn.Unlock() continue } @@ -249,13 +250,16 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex c.retry = true // unlock the connection - c.mu.Unlock() + c.muConn.Unlock() // kick off threads to listen for messages and ping/keepalive go c.listen() if c.cOptions.EnableKeepAlive { go c.ping() } + if c.cOptions.AutoFlushReplyDelta != 0 { + go c.flush() + } // fire off open connection err = c.router.OpenHelper(&msginterfaces.OpenResponse{ @@ -374,6 +378,15 @@ func (c *Client) listen() { continue } + // inspect the message + if c.cOptions.InspectMessage() { + err := c.inspect(byMsg) + if err != nil { + klog.V(1).Infof("listen: inspect failed. Err: %v\n", err) + } + } + + // callback! if c.callback != nil { err := c.router.Message(byMsg) if err != nil { @@ -453,8 +466,8 @@ func (c *Client) WriteBinary(byData []byte) error { } // doing a write, need to lock - c.mu.Lock() - defer c.mu.Unlock() + c.muConn.Lock() + defer c.muConn.Unlock() if err := ws.WriteMessage( websocket.BinaryMessage, @@ -497,8 +510,8 @@ func (c *Client) WriteJSON(payload interface{}) error { } // doing a write, need to lock - c.mu.Lock() - defer c.mu.Unlock() + c.muConn.Lock() + defer c.muConn.Unlock() if err := ws.WriteMessage( websocket.TextMessage, @@ -549,8 +562,8 @@ func (c *Client) Finalize() error { } // doing a write, need to lock - c.mu.Lock() - defer c.mu.Unlock() + c.muConn.Lock() + defer c.muConn.Unlock() err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Finalize\" }")) @@ -574,8 +587,8 @@ func (c *Client) closeWs(fatal bool) { klog.V(6).Infof("live.closeWs() closing channels...\n") // doing a write, need to lock - c.mu.Lock() - defer c.mu.Unlock() + c.muConn.Lock() + defer c.muConn.Unlock() if c.wsconn != nil && !fatal { // deepgram requires a close message to be sent @@ -644,8 +657,9 @@ func (c *Client) ping() { return } - // doing a write, need to lock - c.mu.Lock() + // doing a write, need to lock. + // Note: not using defer here because we arent leaving the scope of the function + c.muConn.Lock() // deepgram keepalive message klog.V(5).Infof("Sending Deepgram KeepAlive message...\n") @@ -657,7 +671,66 @@ func (c *Client) ping() { } // release - c.mu.Unlock() + c.muConn.Unlock() + } + } +} + +func (c *Client) flush() { + klog.V(6).Infof("live.flush() ENTER\n") + + ticker := time.NewTicker(flushPeriod) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + klog.V(3).Infof("live.flush() Exiting\n") + + // exit gracefully + c.closeWs(false) + + klog.V(6).Infof("live.flush() LEAVE\n") + return + case <-ticker.C: + ws := c.internalConnect() + if ws == nil { + klog.V(1).Infof("flush Connection is not valid\n") + klog.V(6).Infof("live.flush() LEAVE\n") + return + } + + // doing a read, need to lock. + c.muFinal.Lock() + + // have we received anything? no, then skip + if c.lastDatagram == nil { + klog.V(7).Infof("No datagram received. Skipping...\n") + c.muFinal.Unlock() + continue + } + + // we have received something, but is it recent? + trigger := c.lastDatagram.Add(time.Millisecond * time.Duration(c.cOptions.AutoFlushReplyDelta)) + now := time.Now() + klog.V(7).Infof("Time (Last): %s\n", trigger.String()) + klog.V(7).Infof("Time (Now ): %s\n", now.String()) + bNeedFlush := trigger.Before(now) + if bNeedFlush { + c.lastDatagram = nil + } + + // release + c.muFinal.Unlock() + + if bNeedFlush { + klog.V(5).Infof("Sending Finalize message...\n") + err := c.Finalize() + if err == nil { + klog.V(5).Infof("Finalize sent!") + } else { + klog.V(1).Infof("Failed to send Finalize. Err: %v\n", err) + } + } } } } @@ -700,3 +773,75 @@ func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse { } return response } + +// inspectMessage inspects the message and determines the type to +// see if we should do anything with those types of messages +func (c *Client) inspect(byMsg []byte) error { + klog.V(7).Infof("client.inspect() ENTER\n") + + var mt msginterfaces.MessageType + if err := json.Unmarshal(byMsg, &mt); err != nil { + klog.V(1).Infof("json.Unmarshal(MessageType) failed. Err: %v\n", err) + klog.V(7).Infof("client.inspect() LEAVE\n") + return err + } + + switch mt.Type { + case msginterfaces.TypeMessageResponse: + klog.V(7).Infof("TypeMessageResponse\n") + + // convert to MessageResponse + var mr msginterfaces.MessageResponse + if err := json.Unmarshal(byMsg, &mr); err != nil { + klog.V(1).Infof("json.Unmarshal(MessageResponse) failed. Err: %v\n", err) + klog.V(7).Infof("client.inspect() LEAVE\n") + return err + } + + // inspect the message + err := c.inspectMessage(&mr) + if err != nil { + klog.V(1).Infof("inspectMessage() failed. Err: %v\n", err) + klog.V(7).Infof("client.inspect() LEAVE\n") + return err + } + default: + klog.V(7).Infof("MessageType: %s\n", mt.Type) + } + + klog.V(7).Info("inspect() succeeded\n") + klog.V(7).Infof("client.inspect() LEAVE\n") + return nil +} + +func (c *Client) inspectMessage(mr *msginterfaces.MessageResponse) error { + klog.V(7).Infof("client.inspectMessage() ENTER\n") + + sentence := strings.TrimSpace(mr.Channel.Alternatives[0].Transcript) + if len(mr.Channel.Alternatives) == 0 || len(sentence) == 0 { + klog.V(7).Info("inspectMessage is empty\n") + klog.V(7).Infof("client.inspectMessage() LEAVE\n") + return nil + } + + if mr.IsFinal { + klog.V(7).Infof("IsFinal received: %s\n", time.Now().String()) + + // doing a write, need to lock + c.muFinal.Lock() + c.lastDatagram = nil + c.muFinal.Unlock() + } else { + klog.V(7).Infof("Interim received: %s\n", time.Now().String()) + + // last datagram received + c.muFinal.Lock() + now := time.Now() + c.lastDatagram = &now + c.muFinal.Unlock() + } + + klog.V(7).Info("inspectMessage() succeeded\n") + klog.V(7).Infof("client.inspectMessage() LEAVE\n") + return nil +} diff --git a/pkg/client/live/constants.go b/pkg/client/live/constants.go index 978f98b7..fd9d1939 100644 --- a/pkg/client/live/constants.go +++ b/pkg/client/live/constants.go @@ -12,6 +12,9 @@ import ( // internal constants for retry, waits, back-off, etc. const ( + flushPeriod = 500 * time.Millisecond + flashInitialDelay = 3 * time.Second + pingPeriod = 5 * time.Second defaultDelayBetweenRetry int64 = 2 diff --git a/pkg/client/live/types.go b/pkg/client/live/types.go index 31cba15f..0f442f20 100644 --- a/pkg/client/live/types.go +++ b/pkg/client/live/types.go @@ -7,6 +7,7 @@ package live import ( "context" "sync" + "time" "github.com/dvonthenen/websocket" @@ -24,11 +25,15 @@ type Client struct { ctx context.Context ctxCancel context.CancelFunc - mu sync.RWMutex + muConn sync.RWMutex wsconn *websocket.Conn retry bool retryCnt int64 callback msginterface.LiveMessageCallback router *live.MessageRouter + + // internal constants for retry, waits, back-off, etc. + lastDatagram *time.Time + muFinal sync.RWMutex }