From 078e235adeef3be5c8c7802ea5f4f3cc0fbc8c25 Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Mon, 8 Jan 2024 15:52:45 -0500 Subject: [PATCH] fix: reconnecting workers should get new dispatcher id, ticker active bug (#91) --- internal/repository/prisma/dbsqlc/tickers.sql | 6 +++--- .../repository/prisma/dbsqlc/tickers.sql.go | 16 ++++++++-------- internal/repository/prisma/ticker.go | 2 +- internal/repository/prisma/worker.go | 6 ++++++ internal/repository/worker.go | 3 +++ internal/services/dispatcher/server.go | 19 +++++++++++++++++++ 6 files changed, 40 insertions(+), 12 deletions(-) diff --git a/internal/repository/prisma/dbsqlc/tickers.sql b/internal/repository/prisma/dbsqlc/tickers.sql index 4c422d790..fee9b5004 100644 --- a/internal/repository/prisma/dbsqlc/tickers.sql +++ b/internal/repository/prisma/dbsqlc/tickers.sql @@ -1,12 +1,12 @@ --- name: ListStaleTickers :many +-- name: ListNewlyStaleTickers :many SELECT sqlc.embed(tickers) FROM "Ticker" as tickers WHERE -- last heartbeat older than 15 seconds "lastHeartbeatAt" < NOW () - INTERVAL '15 seconds' - -- not active - AND "isActive" = false; + -- active + AND "isActive" = true; -- name: ListActiveTickers :many SELECT diff --git a/internal/repository/prisma/dbsqlc/tickers.sql.go b/internal/repository/prisma/dbsqlc/tickers.sql.go index 2cfaea5c8..7c690313a 100644 --- a/internal/repository/prisma/dbsqlc/tickers.sql.go +++ b/internal/repository/prisma/dbsqlc/tickers.sql.go @@ -52,30 +52,30 @@ func (q *Queries) ListActiveTickers(ctx context.Context, db DBTX) ([]*ListActive return items, nil } -const listStaleTickers = `-- name: ListStaleTickers :many +const listNewlyStaleTickers = `-- name: ListNewlyStaleTickers :many SELECT tickers.id, tickers."createdAt", tickers."updatedAt", tickers."lastHeartbeatAt", tickers."isActive" FROM "Ticker" as tickers WHERE -- last heartbeat older than 15 seconds "lastHeartbeatAt" < NOW () - INTERVAL '15 seconds' - -- not active - AND "isActive" = false + -- active + AND "isActive" = true ` -type ListStaleTickersRow struct { +type ListNewlyStaleTickersRow struct { Ticker Ticker `json:"ticker"` } -func (q *Queries) ListStaleTickers(ctx context.Context, db DBTX) ([]*ListStaleTickersRow, error) { - rows, err := db.Query(ctx, listStaleTickers) +func (q *Queries) ListNewlyStaleTickers(ctx context.Context, db DBTX) ([]*ListNewlyStaleTickersRow, error) { + rows, err := db.Query(ctx, listNewlyStaleTickers) if err != nil { return nil, err } defer rows.Close() - var items []*ListStaleTickersRow + var items []*ListNewlyStaleTickersRow for rows.Next() { - var i ListStaleTickersRow + var i ListNewlyStaleTickersRow if err := rows.Scan( &i.Ticker.ID, &i.Ticker.CreatedAt, diff --git a/internal/repository/prisma/ticker.go b/internal/repository/prisma/ticker.go index baba31e08..1ac0ccb5d 100644 --- a/internal/repository/prisma/ticker.go +++ b/internal/repository/prisma/ticker.go @@ -174,7 +174,7 @@ func (t *tickerRepository) UpdateStaleTickers(onStale func(tickerId string, getV defer tx.Rollback(context.Background()) - staleTickers, err := t.queries.ListStaleTickers(context.Background(), tx) + staleTickers, err := t.queries.ListNewlyStaleTickers(context.Background(), tx) if err != nil { return err diff --git a/internal/repository/prisma/worker.go b/internal/repository/prisma/worker.go index 3f15fb79d..7d4e157b5 100644 --- a/internal/repository/prisma/worker.go +++ b/internal/repository/prisma/worker.go @@ -255,6 +255,12 @@ func (w *workerRepository) UpdateWorker(tenantId, workerId string, opts *reposit optionals = append(optionals, db.Worker.LastHeartbeatAt.Set(*opts.LastHeartbeatAt)) } + if opts.DispatcherId != nil { + optionals = append(optionals, db.Worker.Dispatcher.Link( + db.Dispatcher.ID.Equals(*opts.DispatcherId), + )) + } + if len(opts.Actions) > 0 { for _, action := range opts.Actions { txs = append(txs, w.client.Action.UpsertOne( diff --git a/internal/repository/worker.go b/internal/repository/worker.go index d3fdd2f19..24c65aff2 100644 --- a/internal/repository/worker.go +++ b/internal/repository/worker.go @@ -21,6 +21,9 @@ type CreateWorkerOpts struct { } type UpdateWorkerOpts struct { + // The id of the dispatcher + DispatcherId *string `validate:"omitempty,uuid"` + // The status of the worker Status *db.WorkerStatus diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 7a9ebd3e7..bd6b8cfe3 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -118,6 +118,25 @@ func (s *DispatcherImpl) Register(ctx context.Context, request *contracts.Worker func (s *DispatcherImpl) Listen(request *contracts.WorkerListenRequest, stream contracts.Dispatcher_ListenServer) error { s.l.Debug().Msgf("Received subscribe request from ID: %s", request.WorkerId) + worker, err := s.repo.Worker().GetWorkerById(request.WorkerId) + + if err != nil { + s.l.Error().Err(err).Msgf("could not get worker %s", request.WorkerId) + return err + } + + // check the worker's dispatcher against the current dispatcher. if they don't match, then update the worker + if worker.DispatcherID != s.dispatcherId { + _, err = s.repo.Worker().UpdateWorker(request.TenantId, request.WorkerId, &repository.UpdateWorkerOpts{ + DispatcherId: &s.dispatcherId, + }) + + if err != nil { + s.l.Error().Err(err).Msgf("could not update worker %s dispatcher", request.WorkerId) + return err + } + } + fin := make(chan bool) s.workers.Store(request.WorkerId, subscribedWorker{stream: stream, finished: fin})