Skip to content

Commit

Permalink
Process entity events in exclusively and in parallel (#1654)
Browse files Browse the repository at this point in the history
* feat: aggregate events before executing them

This adds yet another component to mediator called the EEA, which stands for the
Event Execution Aggregator. As the name suggests, it aggregates events so we
should process less of them in the end. We'll process an initial triggering event,
with successive events being batched into one until the executor is ready.

This was done by using a pessimistic soft-locking mechanism based on time which locks
execution for a single entity. If an entity execution is locked, we add the event to
a queue which effectively aggregates them into one. These events will be triggered
once a flush happens.

* go executor go!

This limits the executor event handling code to just parse the message. It
subsequently spawns a goroutine to do the actual evaluation.

The intent is to not block the message handler when we're receiving events
so we'd have faster executions.

Add mechanism for executor to indicate when it's done.

Add graceful termination to executor

This makes sure that the executor cancels any profile runs based on
a per execution timeout or the server itself shutting down.

* logging

* Skip flushing if entity no longer exists
  • Loading branch information
JAORMX authored Nov 16, 2023
1 parent 9a391fd commit 3d4d115
Show file tree
Hide file tree
Showing 26 changed files with 1,666 additions and 78 deletions.
16 changes: 15 additions & 1 deletion cmd/server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stacklok/minder/internal/config"
"github.com/stacklok/minder/internal/controlplane"
"github.com/stacklok/minder/internal/db"
"github.com/stacklok/minder/internal/eea"
"github.com/stacklok/minder/internal/engine"
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/logger"
Expand Down Expand Up @@ -109,7 +110,13 @@ var serveCmd = &cobra.Command{
return fmt.Errorf("unable to create server: %w", err)
}

exec, err := engine.NewExecutor(store, &cfg.Auth, engine.WithProviderMetrics(providerMetrics))
aggr := eea.NewEEA(store, evt, &cfg.Events.Aggregator)

s.ConsumeEvents(aggr)

exec, err := engine.NewExecutor(ctx, store, &cfg.Auth, evt,
engine.WithProviderMetrics(providerMetrics),
engine.WithAggregatorMiddleware(aggr))
if err != nil {
return fmt.Errorf("unable to create executor: %w", err)
}
Expand All @@ -134,6 +141,13 @@ var serveCmd = &cobra.Command{

errg.Go(s.HandleEvents(ctx))

// Wait for event handlers to start running
<-evt.Running()

if err := aggr.FlushAll(ctx); err != nil {
return fmt.Errorf("error flushing cache: %w", err)
}

return errg.Wait()
},
}
Expand Down
19 changes: 19 additions & 0 deletions database/migrations/000010_entity_execution_lock.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Copyright 2023 Stacklok, Inc
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

DROP INDEX IF EXISTS flush_cache_idx;
DROP INDEX IF EXISTS entity_execution_lock_idx;

DROP TABLE IF EXISTS flush_cache;
DROP TABLE IF EXISTS entity_execution_lock;
51 changes: 51 additions & 0 deletions database/migrations/000010_entity_execution_lock.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
-- Copyright 2023 Stacklok, Inc
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.


--- This implements two tables:
--- * The entity execution lock table, which is used to prevent multiple
--- instances of the same entity from running at the same time.
--- * The flush cache table, which is used to cache entities to be executed
--- once the lock is released.

CREATE TABLE IF NOT EXISTS entity_execution_lock (
id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY,
entity entities NOT NULL,
locked_by UUID NOT NULL,
last_lock_time TIMESTAMP NOT NULL,
repository_id UUID NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
artifact_id UUID REFERENCES artifacts(id) ON DELETE CASCADE,
pull_request_id UUID REFERENCES pull_requests(id) ON DELETE CASCADE
);

CREATE UNIQUE INDEX IF NOT EXISTS entity_execution_lock_idx ON entity_execution_lock(
entity,
repository_id,
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID),
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID));

CREATE TABLE IF NOT EXISTS flush_cache (
id UUID NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY,
entity entities NOT NULL,
repository_id UUID NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
artifact_id UUID REFERENCES artifacts(id) ON DELETE CASCADE,
pull_request_id UUID REFERENCES pull_requests(id) ON DELETE CASCADE,
queued_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE UNIQUE INDEX IF NOT EXISTS flush_cache_idx ON flush_cache(
entity,
repository_id,
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID),
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID));
103 changes: 103 additions & 0 deletions database/mock/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions database/query/entity_execution_lock.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
-- LockIfThresholdNotExceeded is used to lock an entity for execution. It will
-- attempt to insert or update the entity_execution_lock table only if the
-- last_lock_time is older than the threshold. If the lock is successful, it
-- will return the lock record. If the lock is unsuccessful, it will return
-- NULL.

-- name: LockIfThresholdNotExceeded :one
INSERT INTO entity_execution_lock(
entity,
locked_by,
last_lock_time,
repository_id,
artifact_id,
pull_request_id
) VALUES(
sqlc.arg(entity)::entities,
gen_random_uuid(),
NOW(),
sqlc.arg(repository_id)::UUID,
sqlc.narg(artifact_id)::UUID,
sqlc.narg(pull_request_id)::UUID
) ON CONFLICT(entity, repository_id, COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID))
DO UPDATE SET
locked_by = gen_random_uuid(),
last_lock_time = NOW()
WHERE entity_execution_lock.last_lock_time < (NOW() - (@interval::TEXT || ' seconds')::interval)
RETURNING *;

-- ReleaseLock is used to release a lock on an entity. It will delete the
-- entity_execution_lock record if the lock is held by the given locked_by
-- value.

-- name: ReleaseLock :exec
DELETE FROM entity_execution_lock
WHERE entity = sqlc.arg(entity)::entities AND repository_id = sqlc.arg(repository_id)::UUID AND
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND
locked_by = sqlc.arg(locked_by)::UUID;

-- name: UpdateLease :exec
UPDATE entity_execution_lock SET last_lock_time = NOW()
WHERE entity = $1 AND repository_id = $2 AND
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND
locked_by = sqlc.arg(locked_by)::UUID;

-- name: EnqueueFlush :one
INSERT INTO flush_cache(
entity,
repository_id,
artifact_id,
pull_request_id
) VALUES(
sqlc.arg(entity)::entities,
sqlc.arg(repository_id)::UUID,
sqlc.narg(artifact_id)::UUID,
sqlc.narg(pull_request_id)::UUID
) ON CONFLICT(entity, repository_id, COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID), COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID))
DO NOTHING
RETURNING *;

-- name: FlushCache :one
DELETE FROM flush_cache
WHERE entity = $1 AND repository_id = $2 AND
COALESCE(artifact_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(artifact_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID) AND
COALESCE(pull_request_id, '00000000-0000-0000-0000-000000000000'::UUID) = COALESCE(sqlc.narg(pull_request_id)::UUID, '00000000-0000-0000-0000-000000000000'::UUID)
RETURNING *;

-- name: ListFlushCache :many
SELECT * FROM flush_cache;
4 changes: 4 additions & 0 deletions database/query/pull_requests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ RETURNING *;
SELECT * FROM pull_requests
WHERE repository_id = $1 AND pr_number = $2;

-- name: GetPullRequestByID :one
SELECT * FROM pull_requests
WHERE id = $1;

-- name: DeletePullRequest :exec
DELETE FROM pull_requests
WHERE repository_id = $1 AND pr_number = $2;
12 changes: 12 additions & 0 deletions internal/config/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type EventConfig struct {
RouterCloseTimeout int64 `mapstructure:"router_close_timeout" default:"10"`
// GoChannel is the configuration for the go channel event driver
GoChannel GoChannelEventConfig `mapstructure:"go-channel" default:"{}"`
// Aggregator is the configuration for the event aggregator middleware
Aggregator AggregatorConfig `mapstructure:"aggregator" default:"{}"`
}

// GoChannelEventConfig is the configuration for the go channel event driver
Expand All @@ -32,4 +34,14 @@ type GoChannelEventConfig struct {
BufferSize int64 `mapstructure:"buffer_size" default:"0"`
// PersistEvents is whether or not to persist events to the channel
PersistEvents bool `mapstructure:"persist_events" default:"false"`
// BlockPublishUntilSubscriberAck is whether or not to block publishing until
// the subscriber acks the message. This is useful for testing.
BlockPublishUntilSubscriberAck bool `mapstructure:"block_publish_until_subscriber_ack" default:"false"`
}

// AggregatorConfig is the configuration for the event aggregator middleware
type AggregatorConfig struct {
// LockInterval is the interval for locking events in seconds.
// This is the threshold between rule evaluations + actions.
LockInterval int64 `mapstructure:"lock_interval" default:"30"`
}
2 changes: 1 addition & 1 deletion internal/controlplane/handlers_githubwebhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *Server) HandleGitHubWebHook() http.HandlerFunc {

wes.accepted = true

if err := s.evt.Publish(engine.InternalEntityEventTopic, m); err != nil {
if err := s.evt.Publish(engine.ExecuteEntityEventTopic, m); err != nil {
wes.error = true
log.Printf("Error publishing message: %v", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
Loading

0 comments on commit 3d4d115

Please sign in to comment.