Skip to content

Commit

Permalink
fix: reconnecting workers should get new dispatcher id, ticker active…
Browse files Browse the repository at this point in the history
… bug (#91)
  • Loading branch information
abelanger5 authored Jan 8, 2024
1 parent fe76c72 commit 078e235
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
6 changes: 3 additions & 3 deletions internal/repository/prisma/dbsqlc/tickers.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
-- name: ListStaleTickers :many
-- name: ListNewlyStaleTickers :many
SELECT
sqlc.embed(tickers)
FROM "Ticker" as tickers
WHERE
-- last heartbeat older than 15 seconds
"lastHeartbeatAt" < NOW () - INTERVAL '15 seconds'
-- not active
AND "isActive" = false;
-- active
AND "isActive" = true;

-- name: ListActiveTickers :many
SELECT
Expand Down
16 changes: 8 additions & 8 deletions internal/repository/prisma/dbsqlc/tickers.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/repository/prisma/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (t *tickerRepository) UpdateStaleTickers(onStale func(tickerId string, getV

defer tx.Rollback(context.Background())

staleTickers, err := t.queries.ListStaleTickers(context.Background(), tx)
staleTickers, err := t.queries.ListNewlyStaleTickers(context.Background(), tx)

if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions internal/repository/prisma/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ func (w *workerRepository) UpdateWorker(tenantId, workerId string, opts *reposit
optionals = append(optionals, db.Worker.LastHeartbeatAt.Set(*opts.LastHeartbeatAt))
}

if opts.DispatcherId != nil {
optionals = append(optionals, db.Worker.Dispatcher.Link(
db.Dispatcher.ID.Equals(*opts.DispatcherId),
))
}

if len(opts.Actions) > 0 {
for _, action := range opts.Actions {
txs = append(txs, w.client.Action.UpsertOne(
Expand Down
3 changes: 3 additions & 0 deletions internal/repository/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type CreateWorkerOpts struct {
}

type UpdateWorkerOpts struct {
// The id of the dispatcher
DispatcherId *string `validate:"omitempty,uuid"`

// The status of the worker
Status *db.WorkerStatus

Expand Down
19 changes: 19 additions & 0 deletions internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ func (s *DispatcherImpl) Register(ctx context.Context, request *contracts.Worker
func (s *DispatcherImpl) Listen(request *contracts.WorkerListenRequest, stream contracts.Dispatcher_ListenServer) error {
s.l.Debug().Msgf("Received subscribe request from ID: %s", request.WorkerId)

worker, err := s.repo.Worker().GetWorkerById(request.WorkerId)

if err != nil {
s.l.Error().Err(err).Msgf("could not get worker %s", request.WorkerId)
return err
}

// check the worker's dispatcher against the current dispatcher. if they don't match, then update the worker
if worker.DispatcherID != s.dispatcherId {
_, err = s.repo.Worker().UpdateWorker(request.TenantId, request.WorkerId, &repository.UpdateWorkerOpts{
DispatcherId: &s.dispatcherId,
})

if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s dispatcher", request.WorkerId)
return err
}
}

fin := make(chan bool)

s.workers.Store(request.WorkerId, subscribedWorker{stream: stream, finished: fin})
Expand Down

0 comments on commit 078e235

Please sign in to comment.