diff --git a/internal/controlplane/handlers_entities_test.go b/internal/controlplane/handlers_entities_test.go index a6533f22f8..63d0ffcaf6 100644 --- a/internal/controlplane/handlers_entities_test.go +++ b/internal/controlplane/handlers_entities_test.go @@ -13,12 +13,12 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" - mockevents "github.com/mindersec/minder/internal/events/mock" mockgh "github.com/mindersec/minder/internal/providers/github/mock" "github.com/mindersec/minder/internal/providers/manager" mockmanager "github.com/mindersec/minder/internal/providers/manager/mock" rf "github.com/mindersec/minder/internal/repositories/mock/fixtures" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + mockevents "github.com/mindersec/minder/pkg/events/mock" ) func TestServer_ReconcileEntityRegistration(t *testing.T) { diff --git a/internal/controlplane/handlers_oauth_test.go b/internal/controlplane/handlers_oauth_test.go index 4bb2d04054..b02d3b3a24 100644 --- a/internal/controlplane/handlers_oauth_test.go +++ b/internal/controlplane/handlers_oauth_test.go @@ -38,7 +38,6 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" mockprops "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" "github.com/mindersec/minder/internal/providers/dockerhub" mockclients "github.com/mindersec/minder/internal/providers/github/clients/mock" @@ -51,6 +50,7 @@ import ( "github.com/mindersec/minder/internal/providers/session" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/events" provinfv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -378,7 +378,7 @@ func TestGetAuthorizationURL(t *testing.T) { store := mockdb.NewMockStore(ctrl) tc.buildStubs(store) - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := events.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -890,7 +890,7 @@ func TestHandleGitHubAppCallback(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := events.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -1046,7 +1046,7 @@ func TestVerifyProviderCredential(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := events.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/controlplane/handlers_user_test.go b/internal/controlplane/handlers_user_test.go index 84fda12104..ecf2ff1b29 100644 --- a/internal/controlplane/handlers_user_test.go +++ b/internal/controlplane/handlers_user_test.go @@ -29,7 +29,6 @@ import ( "github.com/mindersec/minder/internal/authz/mock" mockcrypto "github.com/mindersec/minder/internal/crypto/mock" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/flags" "github.com/mindersec/minder/internal/marketplaces" "github.com/mindersec/minder/internal/projects" @@ -37,6 +36,7 @@ import ( mockprov "github.com/mindersec/minder/internal/providers/github/service/mock" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) const ( @@ -439,7 +439,7 @@ func TestDeleteUser_gRPC(t *testing.T) { })) defer testServer.Close() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index e74e78c607..ad94455d78 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -47,7 +47,6 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" propSvc "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/history" "github.com/mindersec/minder/internal/invites" "github.com/mindersec/minder/internal/logger" @@ -64,6 +63,7 @@ import ( "github.com/mindersec/minder/internal/util" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" "github.com/mindersec/minder/pkg/ruletypes" ) @@ -81,7 +81,7 @@ var ( type Server struct { store db.Store cfg *serverconfig.Config - evt events.Publisher + evt pkgevents.Publisher mt metrics.Metrics grpcServer *grpc.Server jwt jwt.Validator @@ -126,7 +126,7 @@ type Server struct { // NewServer creates a new server instance func NewServer( store db.Store, - evt events.Publisher, + evt pkgevents.Publisher, cfg *serverconfig.Config, serverMetrics metrics.Metrics, jwtValidator jwt.Validator, diff --git a/internal/controlplane/server_test.go b/internal/controlplane/server_test.go index bd8879fb9a..11e5bb8cd3 100644 --- a/internal/controlplane/server_test.go +++ b/internal/controlplane/server_test.go @@ -33,13 +33,13 @@ import ( "github.com/mindersec/minder/internal/controlplane/metrics" "github.com/mindersec/minder/internal/crypto" mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" ghclient "github.com/mindersec/minder/internal/providers/github/clients" ghService "github.com/mindersec/minder/internal/providers/github/service" mock_reposvc "github.com/mindersec/minder/internal/repositories/mock" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) const bufSize = 1024 * 1024 @@ -78,7 +78,7 @@ func newDefaultServer( ) *Server { t.Helper() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/eea/eea.go b/internal/eea/eea.go index a88cce4167..283369c391 100644 --- a/internal/eea/eea.go +++ b/internal/eea/eea.go @@ -24,12 +24,13 @@ import ( "github.com/mindersec/minder/internal/providers/manager" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) // EEA is the Event Execution Aggregator type EEA struct { querier db.Store - evt events.Publisher + evt pkgevents.Publisher cfg *serverconfig.AggregatorConfig entityFetcher service.PropertiesService @@ -37,7 +38,7 @@ type EEA struct { } // NewEEA creates a new EEA -func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.AggregatorConfig, +func NewEEA(querier db.Store, evt pkgevents.Publisher, cfg *serverconfig.AggregatorConfig, ef service.PropertiesService, provMan manager.ProviderManager) *EEA { return &EEA{ querier: querier, @@ -49,7 +50,7 @@ func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.Aggregator } // Register implements the Consumer interface. -func (e *EEA) Register(r events.Registrar) { +func (e *EEA) Register(r pkgevents.Registrar) { r.Register(events.TopicQueueEntityFlush, e.FlushMessageHandler) } diff --git a/internal/eea/eea_test.go b/internal/eea/eea_test.go index dd1c19bd80..77696163b8 100644 --- a/internal/eea/eea_test.go +++ b/internal/eea/eea_test.go @@ -31,6 +31,7 @@ import ( mockmanager "github.com/mindersec/minder/internal/providers/manager/mock" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) const ( @@ -55,7 +56,7 @@ func TestAggregator(t *testing.T) { projectID, repoID := createNeededEntities(ctx, t, testQueries) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := pkgevents.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BufferSize: concurrentEvents, @@ -345,7 +346,7 @@ func TestFlushAll(t *testing.T) { propsvc := propsvcmock.NewMockPropertiesService(ctrl) provman := mockmanager.NewMockProviderManager(ctrl) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := pkgevents.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -394,7 +395,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) { require.NoError(t, err, "expected no error when creating embedded store") t.Cleanup(td) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := pkgevents.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -433,7 +434,7 @@ func TestFlushAllListFlushFails(t *testing.T) { flushedMessages := newTestPubSub() - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := pkgevents.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -483,7 +484,7 @@ func TestFlushAllListFlushListsARepoThatGetsDeletedLater(t *testing.T) { flushedMessages := newTestPubSub() - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := pkgevents.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/email/awsses/awsses.go b/internal/email/awsses/awsses.go index a17c178719..b686490a2b 100644 --- a/internal/email/awsses/awsses.go +++ b/internal/email/awsses/awsses.go @@ -17,7 +17,7 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" + pkgevents "github.com/mindersec/minder/pkg/events" ) const ( @@ -47,7 +47,7 @@ func New(ctx context.Context, sender, region string) (*awsSES, error) { } // Register implements the Consumer interface. -func (a *awsSES) Register(reg events.Registrar) { +func (a *awsSES) Register(reg pkgevents.Registrar) { reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error { var e email.MailEventPayload diff --git a/internal/email/noop/noop.go b/internal/email/noop/noop.go index 1f3474f78d..6f3eff1f95 100644 --- a/internal/email/noop/noop.go +++ b/internal/email/noop/noop.go @@ -12,7 +12,7 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" + pkgevents "github.com/mindersec/minder/pkg/events" ) type noop struct { @@ -24,7 +24,7 @@ func New() *noop { } // Register implements the Consumer interface. -func (_ *noop) Register(reg events.Registrar) { +func (_ *noop) Register(reg pkgevents.Registrar) { reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error { var e email.MailEventPayload diff --git a/internal/engine/entities/entity_event.go b/internal/engine/entities/entity_event.go index 0ef6b4ebc8..b5085c1d81 100644 --- a/internal/engine/entities/entity_event.go +++ b/internal/engine/entities/entity_event.go @@ -14,6 +14,7 @@ import ( "github.com/mindersec/minder/internal/events" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + pkgevents "github.com/mindersec/minder/pkg/events" ) // EntityInfoWrapper is a helper struct to gather information @@ -180,7 +181,7 @@ func (eiw *EntityInfoWrapper) BuildMessage() (*message.Message, error) { } // Publish builds a message.Message and publishes it to the event bus -func (eiw *EntityInfoWrapper) Publish(evt events.Publisher) error { +func (eiw *EntityInfoWrapper) Publish(evt pkgevents.Publisher) error { msg, err := eiw.BuildMessage() if err != nil { return err diff --git a/internal/engine/handler.go b/internal/engine/handler.go index 65d87612bf..ab74c1c6ad 100644 --- a/internal/engine/handler.go +++ b/internal/engine/handler.go @@ -18,6 +18,7 @@ import ( "github.com/mindersec/minder/internal/events" minderlogger "github.com/mindersec/minder/internal/logger" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + pkgevents "github.com/mindersec/minder/pkg/events" ) const ( @@ -32,7 +33,7 @@ const ( // ExecutorEventHandler is responsible for consuming entity events, passing // entities to the executor, and then publishing the results. type ExecutorEventHandler struct { - evt events.Publisher + evt pkgevents.Publisher handlerMiddleware []message.HandlerMiddleware wgEntityEventExecution *sync.WaitGroup executor Executor @@ -46,7 +47,7 @@ type ExecutorEventHandler struct { // NewExecutorEventHandler creates the event handler for the executor func NewExecutorEventHandler( ctx context.Context, - evt events.Publisher, + evt pkgevents.Publisher, handlerMiddleware []message.HandlerMiddleware, executor Executor, ) *ExecutorEventHandler { @@ -70,7 +71,7 @@ func NewExecutorEventHandler( } // Register implements the Consumer interface. -func (e *ExecutorEventHandler) Register(r events.Registrar) { +func (e *ExecutorEventHandler) Register(r pkgevents.Registrar) { r.Register(events.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...) } diff --git a/internal/engine/handler_test.go b/internal/engine/handler_test.go index 2b3a8f822f..6eb4b253d8 100644 --- a/internal/engine/handler_test.go +++ b/internal/engine/handler_test.go @@ -20,6 +20,7 @@ import ( "github.com/mindersec/minder/internal/util/testqueue" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { @@ -38,7 +39,7 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { // -- end expectations - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BlockPublishUntilSubscriberAck: true, diff --git a/internal/entities/handlers/handler.go b/internal/entities/handlers/handler.go index 6913c6e491..c658e51c2b 100644 --- a/internal/entities/handlers/handler.go +++ b/internal/entities/handlers/handler.go @@ -23,6 +23,7 @@ import ( "github.com/mindersec/minder/internal/projects/features" "github.com/mindersec/minder/internal/providers/manager" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + pkgevents "github.com/mindersec/minder/pkg/events" ) var ( @@ -32,7 +33,7 @@ var ( ) type handleEntityAndDoBase struct { - evt events.Publisher + evt pkgevents.Publisher store db.Store refreshEntity strategies.GetEntityStrategy @@ -45,7 +46,7 @@ type handleEntityAndDoBase struct { } // Register satisfies the events.Consumer interface. -func (b *handleEntityAndDoBase) Register(r events.Registrar) { +func (b *handleEntityAndDoBase) Register(r pkgevents.Registrar) { r.Register(b.handlerName, b.handleRefreshEntityAndDo, b.handlerMiddleware...) } @@ -192,12 +193,12 @@ func (b *handleEntityAndDoBase) repoPrivateOrArchivedCheck( // NewRefreshByIDAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. func NewRefreshByIDAndEvaluateHandler( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) pkgevents.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -214,12 +215,12 @@ func NewRefreshByIDAndEvaluateHandler( // NewRefreshEntityAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. func NewRefreshEntityAndEvaluateHandler( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) pkgevents.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -236,11 +237,11 @@ func NewRefreshEntityAndEvaluateHandler( // NewGetEntityAndDeleteHandler creates a new handler that gets an entity and deletes it. func NewGetEntityAndDeleteHandler( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc propertyService.PropertiesService, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) pkgevents.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -257,12 +258,12 @@ func NewGetEntityAndDeleteHandler( // NewAddOriginatingEntityHandler creates a new handler that adds an originating entity. func NewAddOriginatingEntityHandler( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) pkgevents.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -279,12 +280,12 @@ func NewAddOriginatingEntityHandler( // NewRemoveOriginatingEntityHandler creates a new handler that removes an originating entity. func NewRemoveOriginatingEntityHandler( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) pkgevents.Consumer { return &handleEntityAndDoBase{ evt: evt, diff --git a/internal/entities/handlers/handler_test.go b/internal/entities/handlers/handler_test.go index ef27c22ea4..0484c1874d 100644 --- a/internal/entities/handlers/handler_test.go +++ b/internal/entities/handlers/handler_test.go @@ -33,6 +33,7 @@ import ( provManFixtures "github.com/mindersec/minder/internal/providers/manager/mock/fixtures" "github.com/mindersec/minder/internal/reconcilers/messages" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + pkgevents "github.com/mindersec/minder/pkg/events" provifv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -186,54 +187,54 @@ func checkPullRequestMessage(t *testing.T, msg *watermill.Message) { } type handlerBuilder func( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer +) pkgevents.Consumer func refreshEntityHandlerBuilder( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) pkgevents.Consumer { return NewRefreshEntityAndEvaluateHandler(evt, store, propSvc, provMgr) } func refreshByIDHandlerBuilder( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) pkgevents.Consumer { return NewRefreshByIDAndEvaluateHandler(evt, store, propSvc, provMgr) } func addOriginatingEntityHandlerBuilder( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) pkgevents.Consumer { return NewAddOriginatingEntityHandler(evt, store, propSvc, provMgr) } func removeOriginatingEntityHandlerBuilder( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) pkgevents.Consumer { return NewRemoveOriginatingEntityHandler(evt, store, propSvc, provMgr) } func getAndDeleteEntityHandlerBuilder( - evt events.Publisher, + evt pkgevents.Publisher, store db.Store, propSvc service.PropertiesService, _ manager.ProviderManager, -) events.Consumer { +) pkgevents.Consumer { return NewGetEntityAndDeleteHandler(evt, store, propSvc) } diff --git a/internal/events/constants.go b/internal/events/constants.go index b2b73a1087..431f562b23 100644 --- a/internal/events/constants.go +++ b/internal/events/constants.go @@ -19,8 +19,10 @@ const ( ) const ( - metricsNamespace = "minder" - metricsSubsystem = "eventer" + // MetricsNamespace is the namespace for the metrics + MetricsNamespace = "minder" + // MetricsSubsystem is the subsystem for the metrics + MetricsSubsystem = "eventer" ) const ( diff --git a/internal/events/eventer.go b/internal/events/eventer.go index 7469ec11d8..e31bff698b 100644 --- a/internal/events/eventer.go +++ b/internal/events/eventer.go @@ -7,133 +7,20 @@ package events import ( "context" - "errors" "fmt" - "reflect" - "runtime" - "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/components/metrics" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" - "github.com/alexdrl/zerowater" - promgo "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" "github.com/mindersec/minder/internal/events/common" - gochannel "github.com/mindersec/minder/internal/events/gochannel" + "github.com/mindersec/minder/internal/events/gochannel" "github.com/mindersec/minder/internal/events/nats" eventersql "github.com/mindersec/minder/internal/events/sql" serverconfig "github.com/mindersec/minder/pkg/config/server" ) -// eventer is a wrapper over the relevant eventing objects in such -// a way that they can be easily accessible and configurable. -type eventer struct { - router *message.Router - // webhookPublisher will gather events coming into the webhook and publish them - webhookPublisher message.Publisher - // webhookSubscriber will subscribe to the webhook topic and handle incoming events - webhookSubscriber message.Subscriber - // TODO: We'll have a Final publisher that will publish to the final topic - msgInstruments *messageInstruments - - closer common.DriverCloser -} - -var _ Publisher = (*eventer)(nil) -var _ Service = (*eventer)(nil) - -type messageInstruments struct { - // message processing time duration histogram - messageProcessingTimeHistogram metric.Int64Histogram -} - -var _ Registrar = (*eventer)(nil) -var _ message.Publisher = (*eventer)(nil) - -// Setup creates an eventer object which isolates the watermill setup code -func Setup(ctx context.Context, cfg *serverconfig.EventConfig) (Interface, error) { - if cfg == nil { - return nil, errors.New("event config is nil") - } - - l := zerowater.NewZerologLoggerAdapter( - zerolog.Ctx(ctx).With().Str("component", "watermill").Logger()) - - router, err := message.NewRouter(message.RouterConfig{ - CloseTimeout: time.Duration(cfg.RouterCloseTimeout) * time.Second, - }, l) - if err != nil { - return nil, err - } - - metricsBuilder := metrics.NewPrometheusMetricsBuilder( - promgo.DefaultRegisterer, - metricsNamespace, - metricsSubsystem) - metricsBuilder.AddPrometheusRouterMetrics(router) - zerolog.Ctx(ctx).Info().Msg("Router Metrics registered") - - meter := otel.Meter("eventer") - metricInstruments, err := initMetricsInstruments(meter) - if err != nil { - return nil, err - } - zerolog.Ctx(ctx).Info().Msg("Metrics Instruments registered") - - pub, sub, cl, err := instantiateDriver(ctx, cfg.Driver, cfg) - if err != nil { - return nil, fmt.Errorf("failed instantiating driver: %w", err) - } - - poisonQueueMiddleware, err := middleware.PoisonQueue(pub, DeadLetterQueueTopic) - if err != nil { - return nil, fmt.Errorf("failed instantiating poison queue: %w", err) - } - // Router level middleware are executed for every message sent to the router - router.AddMiddleware( - recordMetrics(metricInstruments), - poisonQueueMiddleware, - middleware.Retry{ - MaxRetries: 3, - InitialInterval: time.Millisecond * 100, - Logger: l, - }.Middleware, - // CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages - middleware.CorrelationID, - ) - - pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub) - if err != nil { - return nil, fmt.Errorf("failed to decorate publisher: %w", err) - } - - subWithMetrics, err := metricsBuilder.DecorateSubscriber(sub) - if err != nil { - return nil, fmt.Errorf("failed to decorate subscriber: %w", err) - } - - return &eventer{ - router: router, - webhookPublisher: pubWithMetrics, - webhookSubscriber: subWithMetrics, - closer: func() { - //nolint:gosec // It's fine if there's an error as long as we close the router - pubWithMetrics.Close() - //nolint:gosec // It's fine if there's an error as long as we close the router - subWithMetrics.Close() - // driver close - cl() - }, - msgInstruments: metricInstruments, - }, nil -} - -func instantiateDriver( +// InstantiateDriver creates a new driver based on the driver string +func InstantiateDriver( ctx context.Context, driver string, cfg *serverconfig.EventConfig, @@ -153,89 +40,3 @@ func instantiateDriver( return nil, nil, nil, fmt.Errorf("unknown driver %s", driver) } } - -// Close closes the router -func (e *eventer) Close() error { - e.closer() - return e.router.Close() -} - -// Run runs the router, blocks until the router is closed -func (e *eventer) Run(ctx context.Context) error { - return e.router.Run(ctx) -} - -// Running returns a channel which allows you to wait until the -// event router has started. -func (e *eventer) Running() chan struct{} { - return e.router.Running() -} - -// Publish implements message.Publisher -func (e *eventer) Publish(topic string, messages ...*message.Message) error { - pc, _, _, ok := runtime.Caller(1) - details := runtime.FuncForPC(pc) - - if ok && details != nil { - for idx := range messages { - msg := messages[idx] - e.router.Logger().Debug("Publishing message", watermill.LogFields{ - "message_uuid": msg.UUID, - "topic": topic, - "handler": details.Name(), - "component": "eventer", - "function": "Publish", - }) - msg.Metadata.Set(PublishedKey, time.Now().Format(time.RFC3339)) - } - } - - return e.webhookPublisher.Publish(topic, messages...) -} - -// Register subscribes to a topic and handles incoming messages -func (e *eventer) Register( - topic string, - handler message.NoPublishHandlerFunc, - mdw ...message.HandlerMiddleware, -) { - // From https://stackoverflow.com/questions/7052693/how-to-get-the-name-of-a-function-in-go - funcName := fmt.Sprintf("%s-%s", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name(), topic) - hand := e.router.AddNoPublisherHandler( - funcName, - topic, - e.webhookSubscriber, - func(msg *message.Message) error { - if err := handler(msg); err != nil { - e.router.Logger().Error("Found error handling message", err, watermill.LogFields{ - "message_uuid": msg.UUID, - "topic": topic, - "handler": funcName, - "component": "eventer", - }) - - return err - } - - e.router.Logger().Info("Handled message", watermill.LogFields{ - "message_uuid": msg.UUID, - "topic": topic, - "handler": funcName, - "component": "eventer", - }) - - return nil - }, - ) - - for _, m := range mdw { - hand.AddMiddleware(m) - } -} - -// ConsumeEvents allows registration of multiple consumers easily -func (e *eventer) ConsumeEvents(consumers ...Consumer) { - for _, c := range consumers { - c.Register(e) - } -} diff --git a/internal/events/metrics.go b/internal/events/metrics.go index 5c573a5b58..7e700c45aa 100644 --- a/internal/events/metrics.go +++ b/internal/events/metrics.go @@ -13,7 +13,14 @@ import ( "go.opentelemetry.io/otel/metric" ) -func recordMetrics(instruments *messageInstruments) func(h message.HandlerFunc) message.HandlerFunc { +// MessageInstruments contains the metrics instruments for the events package +type MessageInstruments struct { + // message processing time duration histogram + messageProcessingTimeHistogram metric.Int64Histogram +} + +// RecordMetrics is a middleware that records metrics for message processing time +func RecordMetrics(instruments *MessageInstruments) func(h message.HandlerFunc) message.HandlerFunc { metricsFunc := func(h message.HandlerFunc) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { var processingTime time.Duration @@ -41,13 +48,14 @@ func recordMetrics(instruments *messageInstruments) func(h message.HandlerFunc) return metricsFunc } -func initMetricsInstruments(meter metric.Meter) (*messageInstruments, error) { +// InitMetricsInstruments initializes the metrics instruments for the events package +func InitMetricsInstruments(meter metric.Meter) (*MessageInstruments, error) { histogram, err := createProcessingLatencyHistogram(meter) if err != nil { return nil, err } - return &messageInstruments{ + return &MessageInstruments{ messageProcessingTimeHistogram: histogram, }, nil } diff --git a/internal/events/stubs/eventer.go b/internal/events/stubs/eventer.go index 85b62c9161..91f7040083 100644 --- a/internal/events/stubs/eventer.go +++ b/internal/events/stubs/eventer.go @@ -10,7 +10,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/events" ) // StubEventer is a stub implementation of events.Interface and the events.Publisher interface diff --git a/internal/invites/mock/service.go b/internal/invites/mock/service.go index f8218ad58c..f8e86c54a5 100644 --- a/internal/invites/mock/service.go +++ b/internal/invites/mock/service.go @@ -17,9 +17,9 @@ import ( auth "github.com/mindersec/minder/internal/auth" authz "github.com/mindersec/minder/internal/authz" db "github.com/mindersec/minder/internal/db" - events "github.com/mindersec/minder/internal/events" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" server "github.com/mindersec/minder/pkg/config/server" + events "github.com/mindersec/minder/pkg/events" gomock "go.uber.org/mock/gomock" ) diff --git a/internal/invites/service.go b/internal/invites/service.go index 56f1089b70..97b2b9a150 100644 --- a/internal/invites/service.go +++ b/internal/invites/service.go @@ -20,11 +20,11 @@ import ( "github.com/mindersec/minder/internal/authz" "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/projects" "github.com/mindersec/minder/internal/util" minder "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) //go:generate go run go.uber.org/mock/mockgen -package mock_$GOPACKAGE -destination=./mock/$GOFILE -source=./$GOFILE @@ -32,12 +32,12 @@ import ( // InviteService encapsulates the methods to manage user invites to a project type InviteService interface { // CreateInvite creates a new user invite - CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, + CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub pkgevents.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) // UpdateInvite updates the invite status - UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, + UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub pkgevents.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) @@ -55,7 +55,7 @@ func NewInviteService() InviteService { return &inviteService{} } -func (_ *inviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, +func (_ *inviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub pkgevents.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) { var userInvite db.UserInvite @@ -229,7 +229,7 @@ func (_ *inviteService) RemoveInvite(ctx context.Context, qtx db.Querier, idClie }, nil } -func (_ *inviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, +func (_ *inviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub pkgevents.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) { diff --git a/internal/invites/service_test.go b/internal/invites/service_test.go index 84701e003f..7c2b03cd2e 100644 --- a/internal/invites/service_test.go +++ b/internal/invites/service_test.go @@ -23,10 +23,10 @@ import ( "github.com/mindersec/minder/internal/db" dbf "github.com/mindersec/minder/internal/db/fixtures" "github.com/mindersec/minder/internal/email" - mockevents "github.com/mindersec/minder/internal/events/mock" "github.com/mindersec/minder/internal/projects" minder "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" "github.com/mindersec/minder/pkg/config/server" + mockevents "github.com/mindersec/minder/pkg/events/mock" ) func TestCreateInvite(t *testing.T) { diff --git a/internal/logger/telemetry_store_watermill_test.go b/internal/logger/telemetry_store_watermill_test.go index 71ae0105ee..cb2735256b 100644 --- a/internal/logger/telemetry_store_watermill_test.go +++ b/internal/logger/telemetry_store_watermill_test.go @@ -14,11 +14,11 @@ import ( "github.com/stretchr/testify/require" "github.com/mindersec/minder/internal/engine/entities" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/util/testqueue" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) func TestTelemetryStoreWMMiddlewareLogsRepositoryInfo(t *testing.T) { @@ -33,7 +33,7 @@ func TestTelemetryStoreWMMiddlewareLogsRepositoryInfo(t *testing.T) { pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BlockPublishUntilSubscriberAck: true, diff --git a/internal/profiles/service.go b/internal/profiles/service.go index e2770fa011..bbad28d569 100644 --- a/internal/profiles/service.go +++ b/internal/profiles/service.go @@ -30,6 +30,7 @@ import ( "github.com/mindersec/minder/internal/util/ptr" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" "github.com/mindersec/minder/pkg/engine/selectors" + pkgevents "github.com/mindersec/minder/pkg/events" ) //go:generate go run go.uber.org/mock/mockgen -package mock_$GOPACKAGE -destination=./mock/$GOFILE -source=./$GOFILE @@ -72,13 +73,13 @@ type ProfileService interface { } type profileService struct { - publisher events.Publisher + publisher pkgevents.Publisher validator *Validator } // NewProfileService creates an instance of ProfileService func NewProfileService( - publisher events.Publisher, + publisher pkgevents.Publisher, selChecker selectors.SelectionChecker, ) ProfileService { return &profileService{ diff --git a/internal/providers/github/installations/installations.go b/internal/providers/github/installations/installations.go index 65f9220b75..0ea6d28f94 100644 --- a/internal/providers/github/installations/installations.go +++ b/internal/providers/github/installations/installations.go @@ -16,8 +16,8 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/service" + pkgevents "github.com/mindersec/minder/pkg/events" ) const ( @@ -55,7 +55,7 @@ func NewInstallationManager( } // Register implements the Consumer interface. -func (im *InstallationManager) Register(reg events.Registrar) { +func (im *InstallationManager) Register(reg pkgevents.Registrar) { reg.Register(ProviderInstallationTopic, im.handleProviderInstallationEvent) } diff --git a/internal/providers/github/manager/manager.go b/internal/providers/github/manager/manager.go index 442f05a65e..6ca9ec6771 100644 --- a/internal/providers/github/manager/manager.go +++ b/internal/providers/github/manager/manager.go @@ -22,7 +22,6 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" propssvc "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" "github.com/mindersec/minder/internal/providers/credentials" "github.com/mindersec/minder/internal/providers/github/clients" @@ -31,6 +30,7 @@ import ( m "github.com/mindersec/minder/internal/providers/manager" "github.com/mindersec/minder/internal/providers/ratecache" "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" v1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -47,7 +47,7 @@ func NewGitHubProviderClassManager( ghService service.GitHubProviderService, propSvc propssvc.PropertiesService, mt metrics.Metrics, - publisher events.Publisher, + publisher pkgevents.Publisher, ) m.ProviderClassManager { return &githubProviderManager{ restClientCache: restClientCache, @@ -75,7 +75,7 @@ type githubProviderManager struct { store db.Store ghService service.GitHubProviderService mt metrics.Metrics - publisher events.Publisher + publisher pkgevents.Publisher } var ( diff --git a/internal/providers/github/webhook/app.go b/internal/providers/github/webhook/app.go index 45df6cf61b..f52091ad79 100644 --- a/internal/providers/github/webhook/app.go +++ b/internal/providers/github/webhook/app.go @@ -26,6 +26,7 @@ import ( "github.com/mindersec/minder/internal/providers/github/service" "github.com/mindersec/minder/internal/reconcilers/messages" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + pkgevents "github.com/mindersec/minder/pkg/events" ) // installationEvent are events related the GitHub App. Minder uses @@ -104,7 +105,7 @@ func HandleGitHubAppWebhook( store db.Store, ghService service.GitHubProviderService, mt metrics.Metrics, - publisher events.Publisher, + publisher pkgevents.Publisher, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() diff --git a/internal/providers/github/webhook/handlers_githubwebhooks_test.go b/internal/providers/github/webhook/handlers_githubwebhooks_test.go index ee712454cb..cc9c2bcd2b 100644 --- a/internal/providers/github/webhook/handlers_githubwebhooks_test.go +++ b/internal/providers/github/webhook/handlers_githubwebhooks_test.go @@ -50,6 +50,7 @@ import ( "github.com/mindersec/minder/internal/util/testqueue" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) //go:embed test-payloads/installation-deleted.json @@ -112,7 +113,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -170,7 +171,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -228,7 +229,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -331,7 +332,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -390,7 +391,7 @@ func (s *UnitTestSuite) TestNoopWebhookHandler() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -2654,7 +2655,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -3250,7 +3251,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { mockStore = mockdb.NewMockStore(ctrl) } - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := pkgevents.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/providers/github/webhook/hook.go b/internal/providers/github/webhook/hook.go index 304d97fac4..a267c1d07c 100644 --- a/internal/providers/github/webhook/hook.go +++ b/internal/providers/github/webhook/hook.go @@ -26,6 +26,7 @@ import ( "github.com/mindersec/minder/internal/providers/github/installations" "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) const ( @@ -66,7 +67,7 @@ type processingResult struct { // HandleWebhookEvent is the main entry point for processing github webhook events func HandleWebhookEvent( mt metrics.Metrics, - publisher events.Publisher, + publisher pkgevents.Publisher, whconfig *server.WebhookConfig, ) http.HandlerFunc { // the function handles incoming GitHub webhooks diff --git a/internal/providers/gitlab/manager/manager.go b/internal/providers/gitlab/manager/manager.go index eb03fb3a18..4b31b34f78 100644 --- a/internal/providers/gitlab/manager/manager.go +++ b/internal/providers/gitlab/manager/manager.go @@ -20,10 +20,10 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/credentials" "github.com/mindersec/minder/internal/providers/gitlab" "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" v1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -38,7 +38,7 @@ type providerClassManager struct { glpcfg *server.GitLabConfig webhookURL string parentContext context.Context - pub events.Publisher + pub pkgevents.Publisher // secrets for the webhook. These are stored in the // structure to allow efficient fetching. Rotation @@ -49,7 +49,7 @@ type providerClassManager struct { // NewGitLabProviderClassManager creates a new provider class manager for the dockerhub provider func NewGitLabProviderClassManager( - ctx context.Context, crypteng crypto.Engine, store db.Store, pub events.Publisher, + ctx context.Context, crypteng crypto.Engine, store db.Store, pub pkgevents.Publisher, cfg *server.GitLabConfig, wgCfg server.WebhookConfig, ) (*providerClassManager, error) { webhookURLBase := wgCfg.ExternalWebhookURL diff --git a/internal/reconcilers/entity_delete_test.go b/internal/reconcilers/entity_delete_test.go index e148dff875..7b80ba109d 100644 --- a/internal/reconcilers/entity_delete_test.go +++ b/internal/reconcilers/entity_delete_test.go @@ -16,12 +16,12 @@ import ( mockdb "github.com/mindersec/minder/database/mock" df "github.com/mindersec/minder/database/mock/fixtures" "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/reconcilers/messages" mockrepo "github.com/mindersec/minder/internal/repositories/mock" rf "github.com/mindersec/minder/internal/repositories/mock/fixtures" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/events" ) var ( @@ -160,7 +160,7 @@ func setUp(t *testing.T, tt testCase, ctrl *gomock.Controller) *Reconciler { repoService = tt.mockReposFunc(ctrl) } - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := events.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/reconcilers/reconcilers.go b/internal/reconcilers/reconcilers.go index 9bdfab616e..ffcbe45d43 100644 --- a/internal/reconcilers/reconcilers.go +++ b/internal/reconcilers/reconcilers.go @@ -11,12 +11,13 @@ import ( "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/manager" "github.com/mindersec/minder/internal/repositories" + pkgevents "github.com/mindersec/minder/pkg/events" ) // Reconciler is a helper that reconciles entities type Reconciler struct { store db.Store - evt events.Publisher + evt pkgevents.Publisher crypteng crypto.Engine providerManager manager.ProviderManager repos repositories.RepositoryService @@ -25,7 +26,7 @@ type Reconciler struct { // NewReconciler creates a new reconciler object func NewReconciler( store db.Store, - evt events.Publisher, + evt pkgevents.Publisher, cryptoEngine crypto.Engine, providerManager manager.ProviderManager, repositoryService repositories.RepositoryService, @@ -40,7 +41,7 @@ func NewReconciler( } // Register implements the Consumer interface. -func (r *Reconciler) Register(reg events.Registrar) { +func (r *Reconciler) Register(reg pkgevents.Registrar) { reg.Register(events.TopicQueueReconcileRepoInit, r.handleRepoReconcilerEvent) reg.Register(events.TopicQueueReconcileProfileInit, r.handleProfileInitEvent) reg.Register(events.TopicQueueReconcileEntityDelete, r.handleEntityDeleteEvent) diff --git a/internal/reminderprocessor/reminder_processor.go b/internal/reminderprocessor/reminder_processor.go index c9e3b6fab1..a105f2ea68 100644 --- a/internal/reminderprocessor/reminder_processor.go +++ b/internal/reminderprocessor/reminder_processor.go @@ -13,20 +13,21 @@ import ( "github.com/mindersec/minder/internal/events" reconcilermessages "github.com/mindersec/minder/internal/reconcilers/messages" remindermessages "github.com/mindersec/minder/internal/reminder/messages" + pkgevents "github.com/mindersec/minder/pkg/events" ) // ReminderProcessor processes the incoming reminders type ReminderProcessor struct { - evt events.Interface + evt pkgevents.Interface } // NewReminderProcessor creates a new ReminderProcessor -func NewReminderProcessor(evt events.Interface) *ReminderProcessor { +func NewReminderProcessor(evt pkgevents.Interface) *ReminderProcessor { return &ReminderProcessor{evt: evt} } // Register implements the Consumer interface. -func (rp *ReminderProcessor) Register(r events.Registrar) { +func (rp *ReminderProcessor) Register(r pkgevents.Registrar) { r.Register(events.TopicQueueRepoReminder, rp.reminderMessageHandler) } diff --git a/internal/repositories/service.go b/internal/repositories/service.go index 72f47a4ccf..dc17f8742a 100644 --- a/internal/repositories/service.go +++ b/internal/repositories/service.go @@ -25,6 +25,7 @@ import ( reconcilers "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/internal/util/ptr" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + pkgevents "github.com/mindersec/minder/pkg/events" provifv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -95,7 +96,7 @@ var ( type repositoryService struct { store db.Store - eventProducer events.Publisher + eventProducer pkgevents.Publisher providerManager manager.ProviderManager propSvc service.PropertiesService } @@ -104,7 +105,7 @@ type repositoryService struct { func NewRepositoryService( store db.Store, propSvc service.PropertiesService, - eventProducer events.Publisher, + eventProducer pkgevents.Publisher, providerManager manager.ProviderManager, ) RepositoryService { return &repositoryService{ diff --git a/internal/repositories/service_test.go b/internal/repositories/service_test.go index a152db53ba..5671676239 100644 --- a/internal/repositories/service_test.go +++ b/internal/repositories/service_test.go @@ -21,7 +21,6 @@ import ( "github.com/mindersec/minder/internal/entities/models" "github.com/mindersec/minder/internal/entities/properties" mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock" - mockevents "github.com/mindersec/minder/internal/events/mock" mockgithub "github.com/mindersec/minder/internal/providers/github/mock" ghprop "github.com/mindersec/minder/internal/providers/github/properties" "github.com/mindersec/minder/internal/providers/manager" @@ -29,6 +28,7 @@ import ( "github.com/mindersec/minder/internal/repositories" "github.com/mindersec/minder/internal/util/ptr" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + mockevents "github.com/mindersec/minder/pkg/events/mock" provinfv1 "github.com/mindersec/minder/pkg/providers/v1" ) diff --git a/internal/service/service.go b/internal/service/service.go index 2cd62d3379..82643b2127 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -25,7 +25,6 @@ import ( "github.com/mindersec/minder/internal/engine" "github.com/mindersec/minder/internal/entities/handlers" propService "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/flags" "github.com/mindersec/minder/internal/history" "github.com/mindersec/minder/internal/invites" @@ -51,6 +50,7 @@ import ( "github.com/mindersec/minder/internal/roles" serverconfig "github.com/mindersec/minder/pkg/config/server" "github.com/mindersec/minder/pkg/engine/selectors" + pkgevents "github.com/mindersec/minder/pkg/events" "github.com/mindersec/minder/pkg/ruletypes" ) @@ -73,7 +73,7 @@ func AllInOneServerService( ) error { errg, ctx := errgroup.WithContext(ctx) - evt, err := events.Setup(ctx, &cfg.Events) + evt, err := pkgevents.New(ctx, &cfg.Events) if err != nil { return fmt.Errorf("unable to setup eventer: %w", err) } @@ -270,7 +270,7 @@ func AllInOneServerService( evt.ConsumeEvents(getAndDeleteEntity) // Register the email manager to handle email invitations - var mailClient events.Consumer + var mailClient pkgevents.Consumer if cfg.Email.AWSSES.Region != "" && cfg.Email.AWSSES.Sender != "" { // If AWS SES is configured, use it to send emails mailClient, err = awsses.New(ctx, cfg.Email.AWSSES.Sender, cfg.Email.AWSSES.Region) diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000000..0b62d183a5 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,210 @@ +// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package events provide a high-level interface for interacting with the eventing system +package events + +import ( + "context" + "fmt" + "reflect" + "runtime" + "time" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/components/metrics" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "github.com/alexdrl/zerowater" + promgo "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + + "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/internal/events/common" + serverconfig "github.com/mindersec/minder/pkg/config/server" +) + +// eventer is a wrapper over the relevant eventing objects in such +// a way that they can be easily accessible and configurable. +type eventer struct { + router *message.Router + // webhookPublisher will gather events coming into the webhook and publish them + webhookPublisher message.Publisher + // webhookSubscriber will subscribe to the webhook topic and handle incoming events + webhookSubscriber message.Subscriber + // TODO: We'll have a Final publisher that will publish to the final topic + msgInstruments *events.MessageInstruments + + closer common.DriverCloser +} + +var _ Publisher = (*eventer)(nil) +var _ Service = (*eventer)(nil) + +var _ Registrar = (*eventer)(nil) +var _ message.Publisher = (*eventer)(nil) + +// New creates an eventer object which isolates the watermill setup code +func New(ctx context.Context, cfg *serverconfig.EventConfig) (Interface, error) { + if cfg == nil { + return nil, fmt.Errorf("event config is nil") + } + + l := zerowater.NewZerologLoggerAdapter( + zerolog.Ctx(ctx).With().Str("component", "watermill").Logger()) + + router, err := message.NewRouter(message.RouterConfig{ + CloseTimeout: time.Duration(cfg.RouterCloseTimeout) * time.Second, + }, l) + if err != nil { + return nil, err + } + + metricsBuilder := metrics.NewPrometheusMetricsBuilder( + promgo.DefaultRegisterer, + events.MetricsNamespace, + events.MetricsSubsystem) + metricsBuilder.AddPrometheusRouterMetrics(router) + zerolog.Ctx(ctx).Info().Msg("Router Metrics registered") + + meter := otel.Meter("eventer") + metricInstruments, err := events.InitMetricsInstruments(meter) + if err != nil { + return nil, err + } + zerolog.Ctx(ctx).Info().Msg("Metrics Instruments registered") + + pub, sub, cl, err := events.InstantiateDriver(ctx, cfg.Driver, cfg) + if err != nil { + return nil, fmt.Errorf("failed instantiating driver: %w", err) + } + + poisonQueueMiddleware, err := middleware.PoisonQueue(pub, events.DeadLetterQueueTopic) + if err != nil { + return nil, fmt.Errorf("failed instantiating poison queue: %w", err) + } + // Router level middleware are executed for every message sent to the router + router.AddMiddleware( + events.RecordMetrics(metricInstruments), + poisonQueueMiddleware, + middleware.Retry{ + MaxRetries: 3, + InitialInterval: time.Millisecond * 100, + Logger: l, + }.Middleware, + // CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages + middleware.CorrelationID, + ) + + pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub) + if err != nil { + return nil, fmt.Errorf("failed to decorate publisher: %w", err) + } + + subWithMetrics, err := metricsBuilder.DecorateSubscriber(sub) + if err != nil { + return nil, fmt.Errorf("failed to decorate subscriber: %w", err) + } + + return &eventer{ + router: router, + webhookPublisher: pubWithMetrics, + webhookSubscriber: subWithMetrics, + closer: func() { + //nolint:gosec // It's fine if there's an error as long as we close the router + pubWithMetrics.Close() + //nolint:gosec // It's fine if there's an error as long as we close the router + subWithMetrics.Close() + // driver close + cl() + }, + msgInstruments: metricInstruments, + }, nil +} + +// Close closes the router +func (e *eventer) Close() error { + e.closer() + return e.router.Close() +} + +// Run runs the router, blocks until the router is closed +func (e *eventer) Run(ctx context.Context) error { + return e.router.Run(ctx) +} + +// Running returns a channel which allows you to wait until the +// event router has started. +func (e *eventer) Running() chan struct{} { + return e.router.Running() +} + +// Publish implements the message.Publisher +func (e *eventer) Publish(topic string, messages ...*message.Message) error { + pc, _, _, ok := runtime.Caller(1) + details := runtime.FuncForPC(pc) + + if ok && details != nil { + for idx := range messages { + msg := messages[idx] + e.router.Logger().Debug("Publishing message", watermill.LogFields{ + "message_uuid": msg.UUID, + "topic": topic, + "handler": details.Name(), + "component": "eventer", + "function": "Publish", + }) + msg.Metadata.Set(events.PublishedKey, time.Now().Format(time.RFC3339)) + } + } + + return e.webhookPublisher.Publish(topic, messages...) +} + +// Register subscribes to a topic and handles incoming messages +func (e *eventer) Register( + topic string, + handler message.NoPublishHandlerFunc, + mdw ...message.HandlerMiddleware, +) { + // From https://stackoverflow.com/questions/7052693/how-to-get-the-name-of-a-function-in-go + funcName := fmt.Sprintf("%s-%s", runtime.FuncForPC(reflect.ValueOf(handler).Pointer()).Name(), topic) + hand := e.router.AddNoPublisherHandler( + funcName, + topic, + e.webhookSubscriber, + func(msg *message.Message) error { + if err := handler(msg); err != nil { + e.router.Logger().Error("Found error handling message", err, watermill.LogFields{ + "message_uuid": msg.UUID, + "topic": topic, + "handler": funcName, + "component": "eventer", + }) + + return err + } + + e.router.Logger().Info("Handled message", watermill.LogFields{ + "message_uuid": msg.UUID, + "topic": topic, + "handler": funcName, + "component": "eventer", + }) + + return nil + }, + ) + + for _, m := range mdw { + hand.AddMiddleware(m) + } +} + +// ConsumeEvents allows registration of multiple consumers easily +func (e *eventer) ConsumeEvents(consumers ...Consumer) { + for _, c := range consumers { + c.Register(e) + } +} diff --git a/internal/events/eventer_test.go b/pkg/events/events_test.go similarity index 92% rename from internal/events/eventer_test.go rename to pkg/events/events_test.go index f7f7ac3c09..b9994c0fa4 100644 --- a/internal/events/eventer_test.go +++ b/pkg/events/events_test.go @@ -17,11 +17,12 @@ import ( "github.com/mindersec/minder/internal/events" serverconfig "github.com/mindersec/minder/pkg/config/server" + pkgevents "github.com/mindersec/minder/pkg/events" ) type fakeConsumer struct { topics []string - makeHandler func(string, chan eventPair) events.Handler + makeHandler func(string, chan eventPair) pkgevents.Handler shouldFailHandler bool // Filled in by test later out chan eventPair @@ -39,13 +40,13 @@ func driverConfig() *serverconfig.EventConfig { } } -func (f *fakeConsumer) Register(r events.Registrar) { +func (f *fakeConsumer) Register(r pkgevents.Registrar) { for _, t := range f.topics { r.Register(t, f.makeHandler(t, f.out)) } } -func fakeHandler(id string, out chan eventPair) events.Handler { +func fakeHandler(id string, out chan eventPair) pkgevents.Handler { return func(msg *message.Message) error { ctx := msg.Context() select { @@ -56,7 +57,7 @@ func fakeHandler(id string, out chan eventPair) events.Handler { } } -func countFailuresHandler(counter *int) events.Handler { +func countFailuresHandler(counter *int) pkgevents.Handler { return func(_ *message.Message) error { *counter++ return errors.New("handler always fails") @@ -116,7 +117,7 @@ func TestEventer(t *testing.T) { // This looks silly, but we need to generate a unique name for // the second handler on topic "a". In real usage, each Consumer // will register a different function. - makeHandler: func(_ string, out chan eventPair) events.Handler { + makeHandler: func(_ string, out chan eventPair) pkgevents.Handler { return func(msg *message.Message) error { out <- eventPair{"other", msg.Copy()} return nil @@ -227,14 +228,14 @@ var setupMu sync.Mutex // We currently use the global meter provider, so reset it for each test. // Since this is global, we use a global mutex to ensure we don't enter setup // concurrently. -func setupEventerWithMetricReader(ctx context.Context) (events.Interface, *metric.ManualReader, error) { +func setupEventerWithMetricReader(ctx context.Context) (pkgevents.Interface, *metric.ManualReader, error) { setupMu.Lock() defer setupMu.Unlock() oldMeter := otel.GetMeterProvider() defer otel.SetMeterProvider(oldMeter) metricReader := metric.NewManualReader() otel.SetMeterProvider(metric.NewMeterProvider(metric.WithReader(metricReader))) - eventer, err := events.Setup(ctx, driverConfig()) + eventer, err := pkgevents.New(ctx, driverConfig()) return eventer, metricReader, err } @@ -249,8 +250,8 @@ func setupConsumer(c *fakeConsumer, out chan eventPair, failureCounter *int) { } } -func makeFailingHandler(counter *int) func(_ string, _ chan eventPair) events.Handler { - return func(_ string, _ chan eventPair) events.Handler { +func makeFailingHandler(counter *int) func(_ string, _ chan eventPair) pkgevents.Handler { + return func(_ string, _ chan eventPair) pkgevents.Handler { return countFailuresHandler(counter) } } diff --git a/internal/events/interfaces.go b/pkg/events/interfaces.go similarity index 100% rename from internal/events/interfaces.go rename to pkg/events/interfaces.go diff --git a/internal/events/mock/interfaces.go b/pkg/events/mock/interfaces.go similarity index 99% rename from internal/events/mock/interfaces.go rename to pkg/events/mock/interfaces.go index ffaa0c0b49..6462b668cf 100644 --- a/internal/events/mock/interfaces.go +++ b/pkg/events/mock/interfaces.go @@ -14,7 +14,7 @@ import ( reflect "reflect" message "github.com/ThreeDotsLabs/watermill/message" - events "github.com/mindersec/minder/internal/events" + events "github.com/mindersec/minder/pkg/events" gomock "go.uber.org/mock/gomock" )