Skip to content

Commit

Permalink
add a serial write for step run events (#990)
Browse files Browse the repository at this point in the history
* add a serial write for step run events

* update other problematic queries

* tmp: don't upsert queue

* add SerialBuffer to the config

* revert the change to config

* fix: add back queue upsert

* add statement timeout to upsert queue

---------

Co-authored-by: Sean Reilly <[email protected]>
Co-authored-by: Alexander Belanger <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2024
1 parent 509542b commit 9f4b638
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 92 deletions.
1 change: 1 addition & 0 deletions pkg/config/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func BindAllEnv(v *viper.Viper) {

_ = v.BindEnv("runtime.eventBuffer.flushPeriodMilliseconds", "SERVER_EVENTBUFFER_FLUSH_PERIOD_MILLISECONDS")
_ = v.BindEnv("runtime.eventBuffer.flushItemsThreshold", "SERVER_EVENTBUFFER_FLUSH_ITEMS_THRESHOLD")
_ = v.BindEnv("runtime.eventBuffer.serialBuffer", "SERVER_EVENTBUFFER_SERIAL_BUFFER")

_ = v.BindEnv("runtime.releaseSemaphoreBuffer.flushPeriodMilliseconds", "SERVER_RELEASESEMAPHOREBUFFER_FLUSH_PERIOD_MILLISECONDS")
_ = v.BindEnv("runtime.releaseSemaphoreBuffer.flushItemsThreshold", "SERVER_RELEASESEMAPHOREBUFFER_FLUSH_ITEMS_THRESHOLD")
Expand Down
62 changes: 56 additions & 6 deletions pkg/repository/buffer/bulk_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ func NewBulkEventWriter(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Lo
}

eventBufOpts := TenantBufManagerOpts[*repository.CreateStepRunEventOpts, int]{
Name: "step_run_event_buffer",
OutputFunc: w.BulkWriteStepRunEvents,
SizeFunc: sizeOfEventData,
L: w.l,
V: w.v,
Config: conf,
Name: "step_run_event_buffer",
SizeFunc: sizeOfEventData,
L: w.l,
V: w.v,
Config: conf,
}

if conf.SerialBuffer {
l.Warn().Msg("using serial buffer for step run events")
eventBufOpts.OutputFunc = w.SerialWriteStepRunEvent
} else {
eventBufOpts.OutputFunc = w.BulkWriteStepRunEvents
}

manager, err := NewTenantBufManager(eventBufOpts)
Expand Down Expand Up @@ -80,6 +86,50 @@ func sortByStepRunId(opts []*repository.CreateStepRunEventOpts) []*repository.Cr
return opts
}

func (w *BulkEventWriter) SerialWriteStepRunEvent(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error) {
res := make([]int, 0, len(opts))

for i, item := range opts {

res = append(res, i)
var eventData []byte
var err error

if item.EventData != nil {
eventData, err = json.Marshal(item.EventData)

if err != nil {
return nil, fmt.Errorf("could not marshal step run event data: %w", err)
}
}

tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, w.pool, w.l, 1000)
if err != nil {
return nil, fmt.Errorf("could not prepare transaction: %w", err)
}
defer rollback()
err = w.queries.CreateStepRunEvent(ctx, tx, dbsqlc.CreateStepRunEventParams{
Steprunid: sqlchelpers.UUIDFromStr(item.StepRunId),
Reason: *item.EventReason,
Severity: *item.EventSeverity,
Message: *item.EventMessage,
Data: eventData,
})

if err != nil {
return nil, fmt.Errorf("could not create step run event: %w", err)
}
err = commit(ctx)
if err != nil {
return nil, fmt.Errorf("could not commit transaction: %w", err)
}

}

return res, nil

}

func (w *BulkEventWriter) BulkWriteStepRunEvents(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error) {
res := make([]int, 0, len(opts))
eventTimeSeen := make([]pgtype.Timestamp, 0, len(opts))
Expand Down
3 changes: 3 additions & 0 deletions pkg/repository/buffer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ type ConfigFileBuffer struct {

// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`

// SerialBuffer is a flag to determine if the buffer should be serial or bulk
SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"`
}
141 changes: 105 additions & 36 deletions pkg/repository/prisma/dbsqlc/queue.sql
Original file line number Diff line number Diff line change
@@ -1,40 +1,52 @@
-- name: UpsertQueue :exec
WITH queue_exists AS (
SELECT
1
FROM
"Queue"
WHERE
"tenantId" = @tenantId::uuid
AND "name" = @name::text
), queue_to_update AS (
SELECT
*
FROM
"Queue"
WHERE
EXISTS (
SELECT
1
FROM
queue_exists
)
AND "tenantId" = @tenantId::uuid
AND "name" = @name::text
FOR UPDATE SKIP LOCKED
), update_queue AS (
UPDATE
"Queue"
SET
"lastActive" = NOW()
FROM
queue_to_update
WHERE
"Queue"."tenantId" = queue_to_update."tenantId"
AND "Queue"."name" = queue_to_update."name"
)
INSERT INTO
"Queue" (
"tenantId",
"name",
"lastActive"
)
VALUES
(
@tenantId::uuid,
@name::text,
NOW()
)
ON CONFLICT ("tenantId", "name") DO UPDATE
SET
"lastActive" = NOW();

-- name: UpsertQueues :exec
WITH input_data AS (
SELECT
UNNEST(@tenantIds::uuid[]) AS tenantId,
UNNEST(@names::text[]) AS name
)
INSERT INTO "Queue" (
"tenantId",
"name",
"lastActive"
)
SELECT
input_data.tenantId,
input_data.name,
@tenantId::uuid,
@name::text,
NOW()
FROM
input_data
ON CONFLICT ("tenantId", "name") DO UPDATE
SET
"lastActive" = NOW();
WHERE NOT EXISTS (
SELECT 1 FROM queue_exists
)
ON CONFLICT ("tenantId", "name") DO NOTHING;

-- name: ListQueues :many
SELECT
Expand Down Expand Up @@ -118,16 +130,73 @@ GROUP BY
qi."queue";

-- name: GetMinUnprocessedQueueItemId :one
WITH priority_1 AS (
SELECT
"id"
FROM
"QueueItem"
WHERE
"isQueued" = 't'
AND "tenantId" = @tenantId::uuid
AND "queue" = @queue::text
AND "priority" = 1
ORDER BY
"id" ASC
LIMIT 1
),
priority_2 AS (
SELECT
"id"
FROM
"QueueItem"
WHERE
"isQueued" = 't'
AND "tenantId" = @tenantId::uuid
AND "queue" = @queue::text
AND "priority" = 2
ORDER BY
"id" ASC
LIMIT 1
),
priority_3 AS (
SELECT
"id"
FROM
"QueueItem"
WHERE
"isQueued" = 't'
AND "tenantId" = @tenantId::uuid
AND "queue" = @queue::text
AND "priority" = 3
ORDER BY
"id" ASC
LIMIT 1
),
priority_4 AS (
SELECT
"id"
FROM
"QueueItem"
WHERE
"isQueued" = 't'
AND "tenantId" = @tenantId::uuid
AND "queue" = @queue::text
AND "priority" = 4
ORDER BY
"id" ASC
LIMIT 1
)
SELECT
COALESCE(MIN("id"), 0)::bigint AS "minId"
FROM
"QueueItem"
WHERE
"isQueued" = 't'
AND "tenantId" = @tenantId::uuid
AND "queue" = @queue::text
-- Added to ensure that the index is used
AND "priority" >= 1 AND "priority" <= 4;
FROM (
SELECT "id" FROM priority_1
UNION ALL
SELECT "id" FROM priority_2
UNION ALL
SELECT "id" FROM priority_3
UNION ALL
SELECT "id" FROM priority_4
) AS combined_priorities;

-- name: GetMinMaxProcessedQueueItems :one
SELECT
Expand Down
Loading

0 comments on commit 9f4b638

Please sign in to comment.