From bb0c95108b731de49ed3c7380ecf641ed685c3e4 Mon Sep 17 00:00:00 2001 From: Radoslav Dimitrov Date: Fri, 25 Oct 2024 11:57:58 +0300 Subject: [PATCH] Move part of events from internal to pkg Signed-off-by: Radoslav Dimitrov --- .../controlplane/handlers_entities_test.go | 2 +- internal/controlplane/handlers_oauth_test.go | 8 +++--- internal/controlplane/handlers_user_test.go | 4 +-- internal/controlplane/server.go | 6 ++--- internal/controlplane/server_test.go | 4 +-- internal/eea/eea.go | 7 ++--- internal/eea/eea_test.go | 11 ++++---- internal/email/awsses/awsses.go | 4 +-- internal/email/noop/noop.go | 4 +-- internal/engine/entities/entity_event.go | 3 ++- internal/engine/handler.go | 7 ++--- internal/engine/handler_test.go | 3 ++- internal/entities/handlers/handler.go | 25 +++++++++--------- internal/entities/handlers/handler_test.go | 25 +++++++++--------- internal/events/{eventer.go => events.go} | 26 +++++++++++-------- .../{eventer_test.go => events_test.go} | 22 +++++++++------- internal/events/stubs/eventer.go | 8 +++--- internal/invites/mock/service.go | 6 ++--- internal/invites/service.go | 10 +++---- internal/invites/service_test.go | 2 +- .../logger/telemetry_store_watermill_test.go | 4 +-- .../github/installations/installations.go | 4 +-- internal/providers/github/manager/manager.go | 6 ++--- internal/providers/github/webhook/app.go | 3 ++- .../webhook/handlers_githubwebhooks_test.go | 15 ++++++----- internal/providers/github/webhook/hook.go | 3 ++- internal/providers/gitlab/manager/manager.go | 6 ++--- internal/reconcilers/entity_delete_test.go | 4 +-- internal/reconcilers/reconcilers.go | 7 ++--- .../reminderprocessor/reminder_processor.go | 7 ++--- internal/repositories/service.go | 5 ++-- internal/repositories/service_test.go | 2 +- internal/service/service.go | 7 ++--- pkg/eventer/events.go | 18 +++++++++++++ .../eventer/interfaces}/interfaces.go | 3 ++- .../eventer/interfaces}/mock/interfaces.go | 18 ++++++------- pkg/profiles/service.go | 5 ++-- 37 files changed, 172 insertions(+), 132 deletions(-) rename internal/events/{eventer.go => events.go} (90%) rename internal/events/{eventer_test.go => events_test.go} (92%) create mode 100644 pkg/eventer/events.go rename {internal/events => pkg/eventer/interfaces}/interfaces.go (96%) rename {internal/events => pkg/eventer/interfaces}/mock/interfaces.go (94%) diff --git a/internal/controlplane/handlers_entities_test.go b/internal/controlplane/handlers_entities_test.go index a6533f22f8..b000c42d58 100644 --- a/internal/controlplane/handlers_entities_test.go +++ b/internal/controlplane/handlers_entities_test.go @@ -13,12 +13,12 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" - mockevents "github.com/mindersec/minder/internal/events/mock" mockgh "github.com/mindersec/minder/internal/providers/github/mock" "github.com/mindersec/minder/internal/providers/manager" mockmanager "github.com/mindersec/minder/internal/providers/manager/mock" rf "github.com/mindersec/minder/internal/repositories/mock/fixtures" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + mockevents "github.com/mindersec/minder/pkg/eventer/interfaces/mock" ) func TestServer_ReconcileEntityRegistration(t *testing.T) { diff --git a/internal/controlplane/handlers_oauth_test.go b/internal/controlplane/handlers_oauth_test.go index 4bb2d04054..95f05406ec 100644 --- a/internal/controlplane/handlers_oauth_test.go +++ b/internal/controlplane/handlers_oauth_test.go @@ -38,7 +38,6 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" mockprops "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" "github.com/mindersec/minder/internal/providers/dockerhub" mockclients "github.com/mindersec/minder/internal/providers/github/clients/mock" @@ -51,6 +50,7 @@ import ( "github.com/mindersec/minder/internal/providers/session" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" provinfv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -378,7 +378,7 @@ func TestGetAuthorizationURL(t *testing.T) { store := mockdb.NewMockStore(ctrl) tc.buildStubs(store) - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -890,7 +890,7 @@ func TestHandleGitHubAppCallback(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -1046,7 +1046,7 @@ func TestVerifyProviderCredential(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/controlplane/handlers_user_test.go b/internal/controlplane/handlers_user_test.go index 84fda12104..17d5d770e4 100644 --- a/internal/controlplane/handlers_user_test.go +++ b/internal/controlplane/handlers_user_test.go @@ -29,7 +29,6 @@ import ( "github.com/mindersec/minder/internal/authz/mock" mockcrypto "github.com/mindersec/minder/internal/crypto/mock" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/flags" "github.com/mindersec/minder/internal/marketplaces" "github.com/mindersec/minder/internal/projects" @@ -37,6 +36,7 @@ import ( mockprov "github.com/mindersec/minder/internal/providers/github/service/mock" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) const ( @@ -439,7 +439,7 @@ func TestDeleteUser_gRPC(t *testing.T) { })) defer testServer.Close() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index a9fceea316..1c53847574 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -47,7 +47,6 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" propSvc "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/history" "github.com/mindersec/minder/internal/invites" "github.com/mindersec/minder/internal/logger" @@ -63,6 +62,7 @@ import ( "github.com/mindersec/minder/internal/util" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" "github.com/mindersec/minder/pkg/profiles" "github.com/mindersec/minder/pkg/ruletypes" ) @@ -81,7 +81,7 @@ var ( type Server struct { store db.Store cfg *serverconfig.Config - evt events.Publisher + evt interfaces.Publisher mt metrics.Metrics grpcServer *grpc.Server jwt jwt.Validator @@ -126,7 +126,7 @@ type Server struct { // NewServer creates a new server instance func NewServer( store db.Store, - evt events.Publisher, + evt interfaces.Publisher, cfg *serverconfig.Config, serverMetrics metrics.Metrics, jwtValidator jwt.Validator, diff --git a/internal/controlplane/server_test.go b/internal/controlplane/server_test.go index bd8879fb9a..6c50379357 100644 --- a/internal/controlplane/server_test.go +++ b/internal/controlplane/server_test.go @@ -33,13 +33,13 @@ import ( "github.com/mindersec/minder/internal/controlplane/metrics" "github.com/mindersec/minder/internal/crypto" mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" ghclient "github.com/mindersec/minder/internal/providers/github/clients" ghService "github.com/mindersec/minder/internal/providers/github/service" mock_reposvc "github.com/mindersec/minder/internal/repositories/mock" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) const bufSize = 1024 * 1024 @@ -78,7 +78,7 @@ func newDefaultServer( ) *Server { t.Helper() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/eea/eea.go b/internal/eea/eea.go index a88cce4167..b25beb7d79 100644 --- a/internal/eea/eea.go +++ b/internal/eea/eea.go @@ -24,12 +24,13 @@ import ( "github.com/mindersec/minder/internal/providers/manager" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // EEA is the Event Execution Aggregator type EEA struct { querier db.Store - evt events.Publisher + evt interfaces.Publisher cfg *serverconfig.AggregatorConfig entityFetcher service.PropertiesService @@ -37,7 +38,7 @@ type EEA struct { } // NewEEA creates a new EEA -func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.AggregatorConfig, +func NewEEA(querier db.Store, evt interfaces.Publisher, cfg *serverconfig.AggregatorConfig, ef service.PropertiesService, provMan manager.ProviderManager) *EEA { return &EEA{ querier: querier, @@ -49,7 +50,7 @@ func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.Aggregator } // Register implements the Consumer interface. -func (e *EEA) Register(r events.Registrar) { +func (e *EEA) Register(r interfaces.Registrar) { r.Register(events.TopicQueueEntityFlush, e.FlushMessageHandler) } diff --git a/internal/eea/eea_test.go b/internal/eea/eea_test.go index dd1c19bd80..bdf63dfa90 100644 --- a/internal/eea/eea_test.go +++ b/internal/eea/eea_test.go @@ -31,6 +31,7 @@ import ( mockmanager "github.com/mindersec/minder/internal/providers/manager/mock" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) const ( @@ -55,7 +56,7 @@ func TestAggregator(t *testing.T) { projectID, repoID := createNeededEntities(ctx, t, testQueries) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BufferSize: concurrentEvents, @@ -345,7 +346,7 @@ func TestFlushAll(t *testing.T) { propsvc := propsvcmock.NewMockPropertiesService(ctrl) provman := mockmanager.NewMockProviderManager(ctrl) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -394,7 +395,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) { require.NoError(t, err, "expected no error when creating embedded store") t.Cleanup(td) - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -433,7 +434,7 @@ func TestFlushAllListFlushFails(t *testing.T) { flushedMessages := newTestPubSub() - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -483,7 +484,7 @@ func TestFlushAllListFlushListsARepoThatGetsDeletedLater(t *testing.T) { flushedMessages := newTestPubSub() - evt, err := events.Setup(ctx, &serverconfig.EventConfig{ + evt, err := eventer.New(ctx, &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/email/awsses/awsses.go b/internal/email/awsses/awsses.go index a17c178719..0abfb2e097 100644 --- a/internal/email/awsses/awsses.go +++ b/internal/email/awsses/awsses.go @@ -17,7 +17,7 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) const ( @@ -47,7 +47,7 @@ func New(ctx context.Context, sender, region string) (*awsSES, error) { } // Register implements the Consumer interface. -func (a *awsSES) Register(reg events.Registrar) { +func (a *awsSES) Register(reg interfaces.Registrar) { reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error { var e email.MailEventPayload diff --git a/internal/email/noop/noop.go b/internal/email/noop/noop.go index 1f3474f78d..649fae8002 100644 --- a/internal/email/noop/noop.go +++ b/internal/email/noop/noop.go @@ -12,7 +12,7 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) type noop struct { @@ -24,7 +24,7 @@ func New() *noop { } // Register implements the Consumer interface. -func (_ *noop) Register(reg events.Registrar) { +func (_ *noop) Register(reg interfaces.Registrar) { reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error { var e email.MailEventPayload diff --git a/internal/engine/entities/entity_event.go b/internal/engine/entities/entity_event.go index 0ef6b4ebc8..0a00e81b11 100644 --- a/internal/engine/entities/entity_event.go +++ b/internal/engine/entities/entity_event.go @@ -14,6 +14,7 @@ import ( "github.com/mindersec/minder/internal/events" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // EntityInfoWrapper is a helper struct to gather information @@ -180,7 +181,7 @@ func (eiw *EntityInfoWrapper) BuildMessage() (*message.Message, error) { } // Publish builds a message.Message and publishes it to the event bus -func (eiw *EntityInfoWrapper) Publish(evt events.Publisher) error { +func (eiw *EntityInfoWrapper) Publish(evt interfaces.Publisher) error { msg, err := eiw.BuildMessage() if err != nil { return err diff --git a/internal/engine/handler.go b/internal/engine/handler.go index 65d87612bf..a7a4a7b261 100644 --- a/internal/engine/handler.go +++ b/internal/engine/handler.go @@ -18,6 +18,7 @@ import ( "github.com/mindersec/minder/internal/events" minderlogger "github.com/mindersec/minder/internal/logger" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) const ( @@ -32,7 +33,7 @@ const ( // ExecutorEventHandler is responsible for consuming entity events, passing // entities to the executor, and then publishing the results. type ExecutorEventHandler struct { - evt events.Publisher + evt interfaces.Publisher handlerMiddleware []message.HandlerMiddleware wgEntityEventExecution *sync.WaitGroup executor Executor @@ -46,7 +47,7 @@ type ExecutorEventHandler struct { // NewExecutorEventHandler creates the event handler for the executor func NewExecutorEventHandler( ctx context.Context, - evt events.Publisher, + evt interfaces.Publisher, handlerMiddleware []message.HandlerMiddleware, executor Executor, ) *ExecutorEventHandler { @@ -70,7 +71,7 @@ func NewExecutorEventHandler( } // Register implements the Consumer interface. -func (e *ExecutorEventHandler) Register(r events.Registrar) { +func (e *ExecutorEventHandler) Register(r interfaces.Registrar) { r.Register(events.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...) } diff --git a/internal/engine/handler_test.go b/internal/engine/handler_test.go index 2b3a8f822f..b9e7aa12c1 100644 --- a/internal/engine/handler_test.go +++ b/internal/engine/handler_test.go @@ -20,6 +20,7 @@ import ( "github.com/mindersec/minder/internal/util/testqueue" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { @@ -38,7 +39,7 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { // -- end expectations - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BlockPublishUntilSubscriberAck: true, diff --git a/internal/entities/handlers/handler.go b/internal/entities/handlers/handler.go index 6913c6e491..1b21b3e9f8 100644 --- a/internal/entities/handlers/handler.go +++ b/internal/entities/handlers/handler.go @@ -23,6 +23,7 @@ import ( "github.com/mindersec/minder/internal/projects/features" "github.com/mindersec/minder/internal/providers/manager" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) var ( @@ -32,7 +33,7 @@ var ( ) type handleEntityAndDoBase struct { - evt events.Publisher + evt interfaces.Publisher store db.Store refreshEntity strategies.GetEntityStrategy @@ -45,7 +46,7 @@ type handleEntityAndDoBase struct { } // Register satisfies the events.Consumer interface. -func (b *handleEntityAndDoBase) Register(r events.Registrar) { +func (b *handleEntityAndDoBase) Register(r interfaces.Registrar) { r.Register(b.handlerName, b.handleRefreshEntityAndDo, b.handlerMiddleware...) } @@ -192,12 +193,12 @@ func (b *handleEntityAndDoBase) repoPrivateOrArchivedCheck( // NewRefreshByIDAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. func NewRefreshByIDAndEvaluateHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -214,12 +215,12 @@ func NewRefreshByIDAndEvaluateHandler( // NewRefreshEntityAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. func NewRefreshEntityAndEvaluateHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -236,11 +237,11 @@ func NewRefreshEntityAndEvaluateHandler( // NewGetEntityAndDeleteHandler creates a new handler that gets an entity and deletes it. func NewGetEntityAndDeleteHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -257,12 +258,12 @@ func NewGetEntityAndDeleteHandler( // NewAddOriginatingEntityHandler creates a new handler that adds an originating entity. func NewAddOriginatingEntityHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, store: store, @@ -279,12 +280,12 @@ func NewAddOriginatingEntityHandler( // NewRemoveOriginatingEntityHandler creates a new handler that removes an originating entity. func NewRemoveOriginatingEntityHandler( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc propertyService.PropertiesService, provMgr manager.ProviderManager, handlerMiddleware ...watermill.HandlerMiddleware, -) events.Consumer { +) interfaces.Consumer { return &handleEntityAndDoBase{ evt: evt, diff --git a/internal/entities/handlers/handler_test.go b/internal/entities/handlers/handler_test.go index ef27c22ea4..640f436f5c 100644 --- a/internal/entities/handlers/handler_test.go +++ b/internal/entities/handlers/handler_test.go @@ -33,6 +33,7 @@ import ( provManFixtures "github.com/mindersec/minder/internal/providers/manager/mock/fixtures" "github.com/mindersec/minder/internal/reconcilers/messages" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/interfaces" provifv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -186,54 +187,54 @@ func checkPullRequestMessage(t *testing.T, msg *watermill.Message) { } type handlerBuilder func( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer +) interfaces.Consumer func refreshEntityHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewRefreshEntityAndEvaluateHandler(evt, store, propSvc, provMgr) } func refreshByIDHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewRefreshByIDAndEvaluateHandler(evt, store, propSvc, provMgr) } func addOriginatingEntityHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewAddOriginatingEntityHandler(evt, store, propSvc, provMgr) } func removeOriginatingEntityHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, provMgr manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewRemoveOriginatingEntityHandler(evt, store, propSvc, provMgr) } func getAndDeleteEntityHandlerBuilder( - evt events.Publisher, + evt interfaces.Publisher, store db.Store, propSvc service.PropertiesService, _ manager.ProviderManager, -) events.Consumer { +) interfaces.Consumer { return NewGetEntityAndDeleteHandler(evt, store, propSvc) } diff --git a/internal/events/eventer.go b/internal/events/events.go similarity index 90% rename from internal/events/eventer.go rename to internal/events/events.go index 7469ec11d8..ebb23b81ce 100644 --- a/internal/events/eventer.go +++ b/internal/events/events.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright 2023 The Minder Authors // SPDX-License-Identifier: Apache-2.0 -// Package events provides the eventer object which is responsible for setting up the watermill router +// Package events provide the eventer object which is responsible for setting up the watermill router // and handling the incoming events package events @@ -24,12 +24,19 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/mindersec/minder/internal/events/common" - gochannel "github.com/mindersec/minder/internal/events/gochannel" + "github.com/mindersec/minder/internal/events/gochannel" "github.com/mindersec/minder/internal/events/nats" eventersql "github.com/mindersec/minder/internal/events/sql" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) +// Ensure that the eventer implements the interfaces +var _ interfaces.Publisher = (*eventer)(nil) +var _ interfaces.Service = (*eventer)(nil) +var _ interfaces.Registrar = (*eventer)(nil) +var _ message.Publisher = (*eventer)(nil) + // eventer is a wrapper over the relevant eventing objects in such // a way that they can be easily accessible and configurable. type eventer struct { @@ -44,19 +51,16 @@ type eventer struct { closer common.DriverCloser } -var _ Publisher = (*eventer)(nil) -var _ Service = (*eventer)(nil) - type messageInstruments struct { // message processing time duration histogram messageProcessingTimeHistogram metric.Int64Histogram } -var _ Registrar = (*eventer)(nil) -var _ message.Publisher = (*eventer)(nil) - -// Setup creates an eventer object which isolates the watermill setup code -func Setup(ctx context.Context, cfg *serverconfig.EventConfig) (Interface, error) { +// NewEventer creates an eventer object which isolates the watermill setup code +func NewEventer(ctx context.Context, cfg *serverconfig.EventConfig) (interfaces.Interface, error) { + if cfg == nil { + return nil, errors.New("event config is nil") + } if cfg == nil { return nil, errors.New("event config is nil") } @@ -234,7 +238,7 @@ func (e *eventer) Register( } // ConsumeEvents allows registration of multiple consumers easily -func (e *eventer) ConsumeEvents(consumers ...Consumer) { +func (e *eventer) ConsumeEvents(consumers ...interfaces.Consumer) { for _, c := range consumers { c.Register(e) } diff --git a/internal/events/eventer_test.go b/internal/events/events_test.go similarity index 92% rename from internal/events/eventer_test.go rename to internal/events/events_test.go index f7f7ac3c09..a26d6700f3 100644 --- a/internal/events/eventer_test.go +++ b/internal/events/events_test.go @@ -17,11 +17,13 @@ import ( "github.com/mindersec/minder/internal/events" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) type fakeConsumer struct { topics []string - makeHandler func(string, chan eventPair) events.Handler + makeHandler func(string, chan eventPair) interfaces.Handler shouldFailHandler bool // Filled in by test later out chan eventPair @@ -39,13 +41,13 @@ func driverConfig() *serverconfig.EventConfig { } } -func (f *fakeConsumer) Register(r events.Registrar) { +func (f *fakeConsumer) Register(r interfaces.Registrar) { for _, t := range f.topics { r.Register(t, f.makeHandler(t, f.out)) } } -func fakeHandler(id string, out chan eventPair) events.Handler { +func fakeHandler(id string, out chan eventPair) interfaces.Handler { return func(msg *message.Message) error { ctx := msg.Context() select { @@ -56,7 +58,7 @@ func fakeHandler(id string, out chan eventPair) events.Handler { } } -func countFailuresHandler(counter *int) events.Handler { +func countFailuresHandler(counter *int) interfaces.Handler { return func(_ *message.Message) error { *counter++ return errors.New("handler always fails") @@ -116,7 +118,7 @@ func TestEventer(t *testing.T) { // This looks silly, but we need to generate a unique name for // the second handler on topic "a". In real usage, each Consumer // will register a different function. - makeHandler: func(_ string, out chan eventPair) events.Handler { + makeHandler: func(_ string, out chan eventPair) interfaces.Handler { return func(msg *message.Message) error { out <- eventPair{"other", msg.Copy()} return nil @@ -227,15 +229,15 @@ var setupMu sync.Mutex // We currently use the global meter provider, so reset it for each test. // Since this is global, we use a global mutex to ensure we don't enter setup // concurrently. -func setupEventerWithMetricReader(ctx context.Context) (events.Interface, *metric.ManualReader, error) { +func setupEventerWithMetricReader(ctx context.Context) (interfaces.Interface, *metric.ManualReader, error) { setupMu.Lock() defer setupMu.Unlock() oldMeter := otel.GetMeterProvider() defer otel.SetMeterProvider(oldMeter) metricReader := metric.NewManualReader() otel.SetMeterProvider(metric.NewMeterProvider(metric.WithReader(metricReader))) - eventer, err := events.Setup(ctx, driverConfig()) - return eventer, metricReader, err + ev, err := eventer.New(ctx, driverConfig()) + return ev, metricReader, err } func setupConsumer(c *fakeConsumer, out chan eventPair, failureCounter *int) { @@ -249,8 +251,8 @@ func setupConsumer(c *fakeConsumer, out chan eventPair, failureCounter *int) { } } -func makeFailingHandler(counter *int) func(_ string, _ chan eventPair) events.Handler { - return func(_ string, _ chan eventPair) events.Handler { +func makeFailingHandler(counter *int) func(_ string, _ chan eventPair) interfaces.Handler { + return func(_ string, _ chan eventPair) interfaces.Handler { return countFailuresHandler(counter) } } diff --git a/internal/events/stubs/eventer.go b/internal/events/stubs/eventer.go index 85b62c9161..ebc15d7e96 100644 --- a/internal/events/stubs/eventer.go +++ b/internal/events/stubs/eventer.go @@ -10,12 +10,12 @@ import ( "github.com/ThreeDotsLabs/watermill/message" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // StubEventer is a stub implementation of events.Interface and the events.Publisher interface -var _ events.Interface = (*StubEventer)(nil) -var _ events.Publisher = (*StubEventer)(nil) +var _ interfaces.Interface = (*StubEventer)(nil) +var _ interfaces.Publisher = (*StubEventer)(nil) // StubEventer is an eventer that's useful for testing. type StubEventer struct { @@ -29,7 +29,7 @@ func (*StubEventer) Close() error { } // ConsumeEvents implements events.Interface. -func (*StubEventer) ConsumeEvents(...events.Consumer) { +func (*StubEventer) ConsumeEvents(...interfaces.Consumer) { panic("unimplemented") } diff --git a/internal/invites/mock/service.go b/internal/invites/mock/service.go index f8218ad58c..420d8c1e6d 100644 --- a/internal/invites/mock/service.go +++ b/internal/invites/mock/service.go @@ -17,9 +17,9 @@ import ( auth "github.com/mindersec/minder/internal/auth" authz "github.com/mindersec/minder/internal/authz" db "github.com/mindersec/minder/internal/db" - events "github.com/mindersec/minder/internal/events" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" server "github.com/mindersec/minder/pkg/config/server" + interfaces "github.com/mindersec/minder/pkg/eventer/interfaces" gomock "go.uber.org/mock/gomock" ) @@ -48,7 +48,7 @@ func (m *MockInviteService) EXPECT() *MockInviteServiceMockRecorder { } // CreateInvite mocks base method. -func (m *MockInviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, emailConfig server.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string) (*v1.Invitation, error) { +func (m *MockInviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig server.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string) (*v1.Invitation, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateInvite", ctx, qtx, idClient, eventsPub, emailConfig, targetProject, authzRole, inviteeEmail) ret0, _ := ret[0].(*v1.Invitation) @@ -78,7 +78,7 @@ func (mr *MockInviteServiceMockRecorder) RemoveInvite(ctx, qtx, idClient, target } // UpdateInvite mocks base method. -func (m *MockInviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, emailConfig server.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string) (*v1.Invitation, error) { +func (m *MockInviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig server.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string) (*v1.Invitation, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateInvite", ctx, qtx, idClient, eventsPub, emailConfig, targetProject, authzRole, inviteeEmail) ret0, _ := ret[0].(*v1.Invitation) diff --git a/internal/invites/service.go b/internal/invites/service.go index 56f1089b70..dd7bb2609c 100644 --- a/internal/invites/service.go +++ b/internal/invites/service.go @@ -20,11 +20,11 @@ import ( "github.com/mindersec/minder/internal/authz" "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/email" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/projects" "github.com/mindersec/minder/internal/util" minder "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) //go:generate go run go.uber.org/mock/mockgen -package mock_$GOPACKAGE -destination=./mock/$GOFILE -source=./$GOFILE @@ -32,12 +32,12 @@ import ( // InviteService encapsulates the methods to manage user invites to a project type InviteService interface { // CreateInvite creates a new user invite - CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, + CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) // UpdateInvite updates the invite status - UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, + UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) @@ -55,7 +55,7 @@ func NewInviteService() InviteService { return &inviteService{} } -func (_ *inviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, +func (_ *inviteService) UpdateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) { var userInvite db.UserInvite @@ -229,7 +229,7 @@ func (_ *inviteService) RemoveInvite(ctx context.Context, qtx db.Querier, idClie }, nil } -func (_ *inviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub events.Publisher, +func (_ *inviteService) CreateInvite(ctx context.Context, qtx db.Querier, idClient auth.Resolver, eventsPub interfaces.Publisher, emailConfig serverconfig.EmailConfig, targetProject uuid.UUID, authzRole authz.Role, inviteeEmail string, ) (*minder.Invitation, error) { diff --git a/internal/invites/service_test.go b/internal/invites/service_test.go index 84701e003f..4edf0252b9 100644 --- a/internal/invites/service_test.go +++ b/internal/invites/service_test.go @@ -23,10 +23,10 @@ import ( "github.com/mindersec/minder/internal/db" dbf "github.com/mindersec/minder/internal/db/fixtures" "github.com/mindersec/minder/internal/email" - mockevents "github.com/mindersec/minder/internal/events/mock" "github.com/mindersec/minder/internal/projects" minder "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" "github.com/mindersec/minder/pkg/config/server" + mockevents "github.com/mindersec/minder/pkg/eventer/interfaces/mock" ) func TestCreateInvite(t *testing.T) { diff --git a/internal/logger/telemetry_store_watermill_test.go b/internal/logger/telemetry_store_watermill_test.go index 71ae0105ee..c60f731aa2 100644 --- a/internal/logger/telemetry_store_watermill_test.go +++ b/internal/logger/telemetry_store_watermill_test.go @@ -14,11 +14,11 @@ import ( "github.com/stretchr/testify/require" "github.com/mindersec/minder/internal/engine/entities" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/util/testqueue" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) func TestTelemetryStoreWMMiddlewareLogsRepositoryInfo(t *testing.T) { @@ -33,7 +33,7 @@ func TestTelemetryStoreWMMiddlewareLogsRepositoryInfo(t *testing.T) { pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{ BlockPublishUntilSubscriberAck: true, diff --git a/internal/providers/github/installations/installations.go b/internal/providers/github/installations/installations.go index 65f9220b75..cd4a03f4c9 100644 --- a/internal/providers/github/installations/installations.go +++ b/internal/providers/github/installations/installations.go @@ -16,8 +16,8 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/service" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) const ( @@ -55,7 +55,7 @@ func NewInstallationManager( } // Register implements the Consumer interface. -func (im *InstallationManager) Register(reg events.Registrar) { +func (im *InstallationManager) Register(reg interfaces.Registrar) { reg.Register(ProviderInstallationTopic, im.handleProviderInstallationEvent) } diff --git a/internal/providers/github/manager/manager.go b/internal/providers/github/manager/manager.go index 442f05a65e..8177ae252e 100644 --- a/internal/providers/github/manager/manager.go +++ b/internal/providers/github/manager/manager.go @@ -22,7 +22,6 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" propssvc "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers" "github.com/mindersec/minder/internal/providers/credentials" "github.com/mindersec/minder/internal/providers/github/clients" @@ -31,6 +30,7 @@ import ( m "github.com/mindersec/minder/internal/providers/manager" "github.com/mindersec/minder/internal/providers/ratecache" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" v1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -47,7 +47,7 @@ func NewGitHubProviderClassManager( ghService service.GitHubProviderService, propSvc propssvc.PropertiesService, mt metrics.Metrics, - publisher events.Publisher, + publisher interfaces.Publisher, ) m.ProviderClassManager { return &githubProviderManager{ restClientCache: restClientCache, @@ -75,7 +75,7 @@ type githubProviderManager struct { store db.Store ghService service.GitHubProviderService mt metrics.Metrics - publisher events.Publisher + publisher interfaces.Publisher } var ( diff --git a/internal/providers/github/webhook/app.go b/internal/providers/github/webhook/app.go index 45df6cf61b..5a3640e6fa 100644 --- a/internal/providers/github/webhook/app.go +++ b/internal/providers/github/webhook/app.go @@ -26,6 +26,7 @@ import ( "github.com/mindersec/minder/internal/providers/github/service" "github.com/mindersec/minder/internal/reconcilers/messages" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // installationEvent are events related the GitHub App. Minder uses @@ -104,7 +105,7 @@ func HandleGitHubAppWebhook( store db.Store, ghService service.GitHubProviderService, mt metrics.Metrics, - publisher events.Publisher, + publisher interfaces.Publisher, ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() diff --git a/internal/providers/github/webhook/handlers_githubwebhooks_test.go b/internal/providers/github/webhook/handlers_githubwebhooks_test.go index ee712454cb..a2bb333725 100644 --- a/internal/providers/github/webhook/handlers_githubwebhooks_test.go +++ b/internal/providers/github/webhook/handlers_githubwebhooks_test.go @@ -50,6 +50,7 @@ import ( "github.com/mindersec/minder/internal/util/testqueue" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) //go:embed test-payloads/installation-deleted.json @@ -112,7 +113,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -170,7 +171,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -228,7 +229,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -331,7 +332,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -390,7 +391,7 @@ func (s *UnitTestSuite) TestNoopWebhookHandler() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -2654,7 +2655,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ctrl := gomock.NewController(t) defer ctrl.Finish() - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) @@ -3250,7 +3251,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { mockStore = mockdb.NewMockStore(ctrl) } - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/providers/github/webhook/hook.go b/internal/providers/github/webhook/hook.go index 304d97fac4..03cc6ec638 100644 --- a/internal/providers/github/webhook/hook.go +++ b/internal/providers/github/webhook/hook.go @@ -26,6 +26,7 @@ import ( "github.com/mindersec/minder/internal/providers/github/installations" "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) const ( @@ -66,7 +67,7 @@ type processingResult struct { // HandleWebhookEvent is the main entry point for processing github webhook events func HandleWebhookEvent( mt metrics.Metrics, - publisher events.Publisher, + publisher interfaces.Publisher, whconfig *server.WebhookConfig, ) http.HandlerFunc { // the function handles incoming GitHub webhooks diff --git a/internal/providers/gitlab/manager/manager.go b/internal/providers/gitlab/manager/manager.go index eb03fb3a18..c30fdda351 100644 --- a/internal/providers/gitlab/manager/manager.go +++ b/internal/providers/gitlab/manager/manager.go @@ -20,10 +20,10 @@ import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/credentials" "github.com/mindersec/minder/internal/providers/gitlab" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" v1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -38,7 +38,7 @@ type providerClassManager struct { glpcfg *server.GitLabConfig webhookURL string parentContext context.Context - pub events.Publisher + pub interfaces.Publisher // secrets for the webhook. These are stored in the // structure to allow efficient fetching. Rotation @@ -49,7 +49,7 @@ type providerClassManager struct { // NewGitLabProviderClassManager creates a new provider class manager for the dockerhub provider func NewGitLabProviderClassManager( - ctx context.Context, crypteng crypto.Engine, store db.Store, pub events.Publisher, + ctx context.Context, crypteng crypto.Engine, store db.Store, pub interfaces.Publisher, cfg *server.GitLabConfig, wgCfg server.WebhookConfig, ) (*providerClassManager, error) { webhookURLBase := wgCfg.ExternalWebhookURL diff --git a/internal/reconcilers/entity_delete_test.go b/internal/reconcilers/entity_delete_test.go index e148dff875..f5cc608f71 100644 --- a/internal/reconcilers/entity_delete_test.go +++ b/internal/reconcilers/entity_delete_test.go @@ -16,12 +16,12 @@ import ( mockdb "github.com/mindersec/minder/database/mock" df "github.com/mindersec/minder/database/mock/fixtures" "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/reconcilers/messages" mockrepo "github.com/mindersec/minder/internal/repositories/mock" rf "github.com/mindersec/minder/internal/repositories/mock/fixtures" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer" ) var ( @@ -160,7 +160,7 @@ func setUp(t *testing.T, tt testCase, ctrl *gomock.Controller) *Reconciler { repoService = tt.mockReposFunc(ctrl) } - evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{ + evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{ Driver: "go-channel", GoChannel: serverconfig.GoChannelEventConfig{}, }) diff --git a/internal/reconcilers/reconcilers.go b/internal/reconcilers/reconcilers.go index 9bdfab616e..67ddc02235 100644 --- a/internal/reconcilers/reconcilers.go +++ b/internal/reconcilers/reconcilers.go @@ -11,12 +11,13 @@ import ( "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/manager" "github.com/mindersec/minder/internal/repositories" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // Reconciler is a helper that reconciles entities type Reconciler struct { store db.Store - evt events.Publisher + evt interfaces.Publisher crypteng crypto.Engine providerManager manager.ProviderManager repos repositories.RepositoryService @@ -25,7 +26,7 @@ type Reconciler struct { // NewReconciler creates a new reconciler object func NewReconciler( store db.Store, - evt events.Publisher, + evt interfaces.Publisher, cryptoEngine crypto.Engine, providerManager manager.ProviderManager, repositoryService repositories.RepositoryService, @@ -40,7 +41,7 @@ func NewReconciler( } // Register implements the Consumer interface. -func (r *Reconciler) Register(reg events.Registrar) { +func (r *Reconciler) Register(reg interfaces.Registrar) { reg.Register(events.TopicQueueReconcileRepoInit, r.handleRepoReconcilerEvent) reg.Register(events.TopicQueueReconcileProfileInit, r.handleProfileInitEvent) reg.Register(events.TopicQueueReconcileEntityDelete, r.handleEntityDeleteEvent) diff --git a/internal/reminderprocessor/reminder_processor.go b/internal/reminderprocessor/reminder_processor.go index c9e3b6fab1..2365fcfe81 100644 --- a/internal/reminderprocessor/reminder_processor.go +++ b/internal/reminderprocessor/reminder_processor.go @@ -13,20 +13,21 @@ import ( "github.com/mindersec/minder/internal/events" reconcilermessages "github.com/mindersec/minder/internal/reconcilers/messages" remindermessages "github.com/mindersec/minder/internal/reminder/messages" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) // ReminderProcessor processes the incoming reminders type ReminderProcessor struct { - evt events.Interface + evt interfaces.Interface } // NewReminderProcessor creates a new ReminderProcessor -func NewReminderProcessor(evt events.Interface) *ReminderProcessor { +func NewReminderProcessor(evt interfaces.Interface) *ReminderProcessor { return &ReminderProcessor{evt: evt} } // Register implements the Consumer interface. -func (rp *ReminderProcessor) Register(r events.Registrar) { +func (rp *ReminderProcessor) Register(r interfaces.Registrar) { r.Register(events.TopicQueueRepoReminder, rp.reminderMessageHandler) } diff --git a/internal/repositories/service.go b/internal/repositories/service.go index 72f47a4ccf..ec484d0712 100644 --- a/internal/repositories/service.go +++ b/internal/repositories/service.go @@ -25,6 +25,7 @@ import ( reconcilers "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/internal/util/ptr" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/interfaces" provifv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -95,7 +96,7 @@ var ( type repositoryService struct { store db.Store - eventProducer events.Publisher + eventProducer interfaces.Publisher providerManager manager.ProviderManager propSvc service.PropertiesService } @@ -104,7 +105,7 @@ type repositoryService struct { func NewRepositoryService( store db.Store, propSvc service.PropertiesService, - eventProducer events.Publisher, + eventProducer interfaces.Publisher, providerManager manager.ProviderManager, ) RepositoryService { return &repositoryService{ diff --git a/internal/repositories/service_test.go b/internal/repositories/service_test.go index a152db53ba..24eaf8b5b5 100644 --- a/internal/repositories/service_test.go +++ b/internal/repositories/service_test.go @@ -21,7 +21,6 @@ import ( "github.com/mindersec/minder/internal/entities/models" "github.com/mindersec/minder/internal/entities/properties" mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock" - mockevents "github.com/mindersec/minder/internal/events/mock" mockgithub "github.com/mindersec/minder/internal/providers/github/mock" ghprop "github.com/mindersec/minder/internal/providers/github/properties" "github.com/mindersec/minder/internal/providers/manager" @@ -29,6 +28,7 @@ import ( "github.com/mindersec/minder/internal/repositories" "github.com/mindersec/minder/internal/util/ptr" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + mockevents "github.com/mindersec/minder/pkg/eventer/interfaces/mock" provinfv1 "github.com/mindersec/minder/pkg/providers/v1" ) diff --git a/internal/service/service.go b/internal/service/service.go index 62b0da47a4..e331ff4676 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -25,7 +25,6 @@ import ( "github.com/mindersec/minder/internal/engine" "github.com/mindersec/minder/internal/entities/handlers" propService "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/flags" "github.com/mindersec/minder/internal/history" "github.com/mindersec/minder/internal/invites" @@ -50,6 +49,8 @@ import ( "github.com/mindersec/minder/internal/roles" serverconfig "github.com/mindersec/minder/pkg/config/server" "github.com/mindersec/minder/pkg/engine/selectors" + "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/interfaces" "github.com/mindersec/minder/pkg/profiles" "github.com/mindersec/minder/pkg/ruletypes" ) @@ -73,7 +74,7 @@ func AllInOneServerService( ) error { errg, ctx := errgroup.WithContext(ctx) - evt, err := events.Setup(ctx, &cfg.Events) + evt, err := eventer.New(ctx, &cfg.Events) if err != nil { return fmt.Errorf("unable to setup eventer: %w", err) } @@ -270,7 +271,7 @@ func AllInOneServerService( evt.ConsumeEvents(getAndDeleteEntity) // Register the email manager to handle email invitations - var mailClient events.Consumer + var mailClient interfaces.Consumer if cfg.Email.AWSSES.Region != "" && cfg.Email.AWSSES.Sender != "" { // If AWS SES is configured, use it to send emails mailClient, err = awsses.New(ctx, cfg.Email.AWSSES.Sender, cfg.Email.AWSSES.Region) diff --git a/pkg/eventer/events.go b/pkg/eventer/events.go new file mode 100644 index 0000000000..7c0e106e72 --- /dev/null +++ b/pkg/eventer/events.go @@ -0,0 +1,18 @@ +// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package eventer provides the constructor for the eventer +package eventer + +import ( + "context" + + "github.com/mindersec/minder/internal/events" + serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/interfaces" +) + +// New creates a new eventer object implementing the Interface interface +func New(ctx context.Context, cfg *serverconfig.EventConfig) (interfaces.Interface, error) { + return events.NewEventer(ctx, cfg) +} diff --git a/internal/events/interfaces.go b/pkg/eventer/interfaces/interfaces.go similarity index 96% rename from internal/events/interfaces.go rename to pkg/eventer/interfaces/interfaces.go index 822f35c26f..7c2017d78f 100644 --- a/internal/events/interfaces.go +++ b/pkg/eventer/interfaces/interfaces.go @@ -1,7 +1,8 @@ // SPDX-FileCopyrightText: Copyright 2024 The Minder Authors // SPDX-License-Identifier: Apache-2.0 -package events +// Package interfaces provides the interfaces for the eventer object +package interfaces import ( "context" diff --git a/internal/events/mock/interfaces.go b/pkg/eventer/interfaces/mock/interfaces.go similarity index 94% rename from internal/events/mock/interfaces.go rename to pkg/eventer/interfaces/mock/interfaces.go index ffaa0c0b49..b758e8c79c 100644 --- a/internal/events/mock/interfaces.go +++ b/pkg/eventer/interfaces/mock/interfaces.go @@ -3,18 +3,18 @@ // // Generated by this command: // -// mockgen -package mock_events -destination=./mock/interfaces.go -source=./interfaces.go +// mockgen -package mock_interfaces -destination=./mock/interfaces.go -source=./interfaces.go // -// Package mock_events is a generated GoMock package. -package mock_events +// Package mock_interfaces is a generated GoMock package. +package mock_interfaces import ( context "context" reflect "reflect" message "github.com/ThreeDotsLabs/watermill/message" - events "github.com/mindersec/minder/internal/events" + interfaces "github.com/mindersec/minder/pkg/eventer/interfaces" gomock "go.uber.org/mock/gomock" ) @@ -43,7 +43,7 @@ func (m *MockRegistrar) EXPECT() *MockRegistrarMockRecorder { } // Register mocks base method. -func (m *MockRegistrar) Register(topic string, handler events.Handler, mdw ...message.HandlerMiddleware) { +func (m *MockRegistrar) Register(topic string, handler interfaces.Handler, mdw ...message.HandlerMiddleware) { m.ctrl.T.Helper() varargs := []any{topic, handler} for _, a := range mdw { @@ -84,7 +84,7 @@ func (m *MockConsumer) EXPECT() *MockConsumerMockRecorder { } // Register mocks base method. -func (m *MockConsumer) Register(arg0 events.Registrar) { +func (m *MockConsumer) Register(arg0 interfaces.Registrar) { m.ctrl.T.Helper() m.ctrl.Call(m, "Register", arg0) } @@ -215,7 +215,7 @@ func (mr *MockServiceMockRecorder) Close() *gomock.Call { } // ConsumeEvents mocks base method. -func (m *MockService) ConsumeEvents(consumers ...events.Consumer) { +func (m *MockService) ConsumeEvents(consumers ...interfaces.Consumer) { m.ctrl.T.Helper() varargs := []any{} for _, a := range consumers { @@ -297,7 +297,7 @@ func (mr *MockInterfaceMockRecorder) Close() *gomock.Call { } // ConsumeEvents mocks base method. -func (m *MockInterface) ConsumeEvents(consumers ...events.Consumer) { +func (m *MockInterface) ConsumeEvents(consumers ...interfaces.Consumer) { m.ctrl.T.Helper() varargs := []any{} for _, a := range consumers { @@ -332,7 +332,7 @@ func (mr *MockInterfaceMockRecorder) Publish(topic any, messages ...any) *gomock } // Register mocks base method. -func (m *MockInterface) Register(topic string, handler events.Handler, mdw ...message.HandlerMiddleware) { +func (m *MockInterface) Register(topic string, handler interfaces.Handler, mdw ...message.HandlerMiddleware) { m.ctrl.T.Helper() varargs := []any{topic, handler} for _, a := range mdw { diff --git a/pkg/profiles/service.go b/pkg/profiles/service.go index e2770fa011..487bec6bdb 100644 --- a/pkg/profiles/service.go +++ b/pkg/profiles/service.go @@ -30,6 +30,7 @@ import ( "github.com/mindersec/minder/internal/util/ptr" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" "github.com/mindersec/minder/pkg/engine/selectors" + "github.com/mindersec/minder/pkg/eventer/interfaces" ) //go:generate go run go.uber.org/mock/mockgen -package mock_$GOPACKAGE -destination=./mock/$GOFILE -source=./$GOFILE @@ -72,13 +73,13 @@ type ProfileService interface { } type profileService struct { - publisher events.Publisher + publisher interfaces.Publisher validator *Validator } // NewProfileService creates an instance of ProfileService func NewProfileService( - publisher events.Publisher, + publisher interfaces.Publisher, selChecker selectors.SelectionChecker, ) ProfileService { return &profileService{