Skip to content

Commit

Permalink
Merge pull request #237 from dvonthenen/auto-flush
Browse files Browse the repository at this point in the history
Auto Finalize When Not Sending Audio
  • Loading branch information
dvonthenen authored Jun 11, 2024
2 parents a48acc8 + 1f722d2 commit 5868b91
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 36 deletions.
8 changes: 7 additions & 1 deletion examples/streaming/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ func main() {
// Go context
ctx := context.Background()

// client options
cOptions := &interfaces.ClientOptions{
EnableKeepAlive: true,
AutoFlushReplyDelta: 2000,
}

// set the Transcription options
tOptions := &interfaces.LiveTranscriptionOptions{
Model: "nova-2",
Expand All @@ -49,7 +55,7 @@ func main() {
}

// create a Deepgram client
dgClient, err := client.NewForDemo(ctx, tOptions)
dgClient, err := client.New(ctx, "", cOptions, tOptions, nil)
if err != nil {
fmt.Println("ERROR creating LiveTranscription connection:", err)
return
Expand Down
16 changes: 16 additions & 0 deletions pkg/api/live/v1/interfaces/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ import (
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces"
)

/***********************************/
// MessageType is the header to bootstrap you way unmarshalling other messages
/***********************************/
/*
Example:
{
"type": "message",
"message": {
...
}
}
*/
type MessageType struct {
Type string `json:"type"`
}

/***********************************/
// shared/common structs
/***********************************/
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/live/v1/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (r *MessageRouter) Message(byMsg []byte) error {
klog.V(5).Infof("Raw Message:\n%s\n", string(byMsg))
}

var mt MessageType
var mt interfaces.MessageType
if err := json.Unmarshal(byMsg, &mt); err != nil {
klog.V(1).Infof("json.Unmarshal(MessageType) failed. Err: %v\n", err)
klog.V(6).Infof("router.Message LEAVE\n")
Expand Down
19 changes: 0 additions & 19 deletions pkg/api/live/v1/types.go

This file was deleted.

15 changes: 15 additions & 0 deletions pkg/client/interfaces/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package interfaces

import (
"os"
"strconv"
"strings"

klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -61,9 +62,23 @@ func (o *ClientOptions) Parse() error {
o.EnableKeepAlive = strings.EqualFold(strings.ToLower(v), "true")
}

// these require inspecting messages, therefore you must update the InspectMessage() method
if v := os.Getenv("DEEPGRAM_WEBSOCKET_AUTO_FLUSH"); v != "" {
klog.V(3).Infof("DEEPGRAM_WEBSOCKET_AUTO_FLUSH found")
i, err := strconv.ParseInt(v, 10, 64)
if err == nil {
klog.V(3).Infof("DEEPGRAM_WEBSOCKET_AUTO_FLUSH set to %d", i)
o.AutoFlushReplyDelta = i
}
}

return nil
}

func (c *ClientOptions) InspectMessage() bool {

Check failure on line 78 in pkg/client/interfaces/options.go

View workflow job for this annotation

GitHub Actions / Change to Main/Release Branch

ST1016: methods on the same type should have the same receiver name (seen 1x "c", 1x "o") (stylecheck)
return c.AutoFlushReplyDelta != 0
}

func (o *PreRecordedTranscriptionOptions) Check() error {
// checks
// currently no op
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/interfaces/types-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ type ClientOptions struct {
// shared client options
SkipServerAuth bool // keeps the client from authenticating with the server

// prerecorded client options

// live client options
RedirectService bool // allows HTTP redirects to be followed
EnableKeepAlive bool // enables the keep alive feature

// these require inspecting messages, therefore you must update the InspectMessage() method
AutoFlushReplyDelta int64 // enables the auto flush feature
}
173 changes: 159 additions & 14 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
klog.V(5).Infof("Connecting to %s\n", url)

// a single connection attempt
c.mu.Lock()
// Note: not using defer here because we arent leaving the scope of the function
c.muConn.Lock()

// perform the websocket connection
ws, res, err := dialer.DialContext(c.ctx, url, myHeader)
Expand All @@ -240,7 +241,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
if err != nil {
klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host)
klog.V(1).Infof("Dialer failed. Err: %v\n", err)
c.mu.Unlock()
c.muConn.Unlock()
continue
}

Expand All @@ -249,13 +250,16 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
c.retry = true

// unlock the connection
c.mu.Unlock()
c.muConn.Unlock()

// kick off threads to listen for messages and ping/keepalive
go c.listen()
if c.cOptions.EnableKeepAlive {
go c.ping()
}
if c.cOptions.AutoFlushReplyDelta != 0 {
go c.flush()
}

// fire off open connection
err = c.router.OpenHelper(&msginterfaces.OpenResponse{
Expand Down Expand Up @@ -374,6 +378,15 @@ func (c *Client) listen() {
continue
}

// inspect the message
if c.cOptions.InspectMessage() {
err := c.inspect(byMsg)
if err != nil {
klog.V(1).Infof("listen: inspect failed. Err: %v\n", err)
}
}

// callback!
if c.callback != nil {
err := c.router.Message(byMsg)
if err != nil {
Expand Down Expand Up @@ -453,8 +466,8 @@ func (c *Client) WriteBinary(byData []byte) error {
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()
c.muConn.Lock()
defer c.muConn.Unlock()

if err := ws.WriteMessage(
websocket.BinaryMessage,
Expand Down Expand Up @@ -497,8 +510,8 @@ func (c *Client) WriteJSON(payload interface{}) error {
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()
c.muConn.Lock()
defer c.muConn.Unlock()

if err := ws.WriteMessage(
websocket.TextMessage,
Expand Down Expand Up @@ -549,8 +562,8 @@ func (c *Client) Finalize() error {
}

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()
c.muConn.Lock()
defer c.muConn.Unlock()

err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Finalize\" }"))

Expand All @@ -574,8 +587,8 @@ func (c *Client) closeWs(fatal bool) {
klog.V(6).Infof("live.closeWs() closing channels...\n")

// doing a write, need to lock
c.mu.Lock()
defer c.mu.Unlock()
c.muConn.Lock()
defer c.muConn.Unlock()

if c.wsconn != nil && !fatal {
// deepgram requires a close message to be sent
Expand Down Expand Up @@ -644,8 +657,9 @@ func (c *Client) ping() {
return
}

// doing a write, need to lock
c.mu.Lock()
// doing a write, need to lock.
// Note: not using defer here because we arent leaving the scope of the function
c.muConn.Lock()

// deepgram keepalive message
klog.V(5).Infof("Sending Deepgram KeepAlive message...\n")
Expand All @@ -657,7 +671,66 @@ func (c *Client) ping() {
}

// release
c.mu.Unlock()
c.muConn.Unlock()
}
}
}

func (c *Client) flush() {
klog.V(6).Infof("live.flush() ENTER\n")

ticker := time.NewTicker(flushPeriod)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
klog.V(3).Infof("live.flush() Exiting\n")

// exit gracefully
c.closeWs(false)

klog.V(6).Infof("live.flush() LEAVE\n")
return
case <-ticker.C:
ws := c.internalConnect()
if ws == nil {
klog.V(1).Infof("flush Connection is not valid\n")
klog.V(6).Infof("live.flush() LEAVE\n")
return
}

// doing a read, need to lock.
c.muFinal.Lock()

// have we received anything? no, then skip
if c.lastDatagram == nil {
klog.V(7).Infof("No datagram received. Skipping...\n")
c.muFinal.Unlock()
continue
}

// we have received something, but is it recent?
trigger := c.lastDatagram.Add(time.Millisecond * time.Duration(c.cOptions.AutoFlushReplyDelta))
now := time.Now()
klog.V(7).Infof("Time (Last): %s\n", trigger.String())
klog.V(7).Infof("Time (Now ): %s\n", now.String())
bNeedFlush := trigger.Before(now)
if bNeedFlush {
c.lastDatagram = nil
}

// release
c.muFinal.Unlock()

if bNeedFlush {
klog.V(5).Infof("Sending Finalize message...\n")
err := c.Finalize()
if err == nil {
klog.V(5).Infof("Finalize sent!")
} else {
klog.V(1).Infof("Failed to send Finalize. Err: %v\n", err)
}
}
}
}
}
Expand Down Expand Up @@ -700,3 +773,75 @@ func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse {
}
return response
}

// inspectMessage inspects the message and determines the type to
// see if we should do anything with those types of messages
func (c *Client) inspect(byMsg []byte) error {
klog.V(7).Infof("client.inspect() ENTER\n")

var mt msginterfaces.MessageType
if err := json.Unmarshal(byMsg, &mt); err != nil {
klog.V(1).Infof("json.Unmarshal(MessageType) failed. Err: %v\n", err)
klog.V(7).Infof("client.inspect() LEAVE\n")
return err
}

switch mt.Type {
case msginterfaces.TypeMessageResponse:
klog.V(7).Infof("TypeMessageResponse\n")

// convert to MessageResponse
var mr msginterfaces.MessageResponse
if err := json.Unmarshal(byMsg, &mr); err != nil {
klog.V(1).Infof("json.Unmarshal(MessageResponse) failed. Err: %v\n", err)
klog.V(7).Infof("client.inspect() LEAVE\n")
return err
}

// inspect the message
err := c.inspectMessage(&mr)
if err != nil {
klog.V(1).Infof("inspectMessage() failed. Err: %v\n", err)
klog.V(7).Infof("client.inspect() LEAVE\n")
return err
}
default:
klog.V(7).Infof("MessageType: %s\n", mt.Type)
}

klog.V(7).Info("inspect() succeeded\n")
klog.V(7).Infof("client.inspect() LEAVE\n")
return nil
}

func (c *Client) inspectMessage(mr *msginterfaces.MessageResponse) error {
klog.V(7).Infof("client.inspectMessage() ENTER\n")

sentence := strings.TrimSpace(mr.Channel.Alternatives[0].Transcript)
if len(mr.Channel.Alternatives) == 0 || len(sentence) == 0 {

Check failure on line 821 in pkg/client/live/client.go

View workflow job for this annotation

GitHub Actions / Change to Main/Release Branch

emptyStringTest: replace `len(sentence) == 0` with `sentence == ""` (gocritic)
klog.V(7).Info("inspectMessage is empty\n")
klog.V(7).Infof("client.inspectMessage() LEAVE\n")
return nil
}

if mr.IsFinal {
klog.V(7).Infof("IsFinal received: %s\n", time.Now().String())

// doing a write, need to lock
c.muFinal.Lock()
c.lastDatagram = nil
c.muFinal.Unlock()
} else {
klog.V(7).Infof("Interim received: %s\n", time.Now().String())

// last datagram received
c.muFinal.Lock()
now := time.Now()
c.lastDatagram = &now
c.muFinal.Unlock()
}

klog.V(7).Info("inspectMessage() succeeded\n")
klog.V(7).Infof("client.inspectMessage() LEAVE\n")
return nil
}
3 changes: 3 additions & 0 deletions pkg/client/live/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (

// internal constants for retry, waits, back-off, etc.
const (
flushPeriod = 500 * time.Millisecond
flashInitialDelay = 3 * time.Second

Check failure on line 16 in pkg/client/live/constants.go

View workflow job for this annotation

GitHub Actions / Change to Main/Release Branch

`flashInitialDelay` is unused (deadcode)

pingPeriod = 5 * time.Second

defaultDelayBetweenRetry int64 = 2
Expand Down
Loading

0 comments on commit 5868b91

Please sign in to comment.