From a9493258f24b8e3ae45534471ef71a4caec16ca2 Mon Sep 17 00:00:00 2001 From: Radoslav Dimitrov Date: Fri, 25 Oct 2024 12:11:52 +0300 Subject: [PATCH] Move event constants to pkg Signed-off-by: Radoslav Dimitrov --- internal/controlplane/handlers_entities.go | 4 +- .../handlers_reconciliationtasks.go | 4 +- internal/eea/eea.go | 4 +- internal/eea/eea_test.go | 14 +- internal/engine/entities/entity_event.go | 4 +- internal/engine/handler.go | 6 +- internal/engine/handler_test.go | 4 +- internal/entities/handlers/handler.go | 20 +-- internal/entities/handlers/handler_test.go | 16 +- internal/events/events.go | 15 +- internal/events/events_test.go | 6 +- internal/events/metrics.go | 4 +- internal/providers/github/webhook/app.go | 18 +- .../providers/github/webhook/fuzz_test.go | 10 +- .../webhook/handlers_githubwebhooks_test.go | 162 +++++++++--------- .../github/webhook/handlers_packages.go | 4 +- .../github/webhook/handlers_pull_requests.go | 6 +- .../github/webhook/handlers_repos.go | 8 +- internal/providers/github/webhook/hook.go | 16 +- .../webhook_handlers_merge_requests.go | 8 +- .../manager/webhook_handlers_releases.go | 8 +- .../gitlab/manager/webhook_handlers_repos.go | 4 +- internal/reconcilers/reconcilers.go | 10 +- internal/reconcilers/repository.go | 4 +- internal/reconcilers/repository_test.go | 6 +- internal/reconcilers/run_profile.go | 4 +- internal/reconcilers/run_profile_test.go | 4 +- internal/reminder/reminder.go | 4 +- .../reminderprocessor/reminder_processor.go | 6 +- internal/repositories/service.go | 4 +- .../eventer/constants}/constants.go | 9 +- pkg/profiles/service.go | 4 +- 32 files changed, 203 insertions(+), 197 deletions(-) rename {internal/events => pkg/eventer/constants}/constants.go (88%) diff --git a/internal/controlplane/handlers_entities.go b/internal/controlplane/handlers_entities.go index f05c3ca11c..a2b5439335 100644 --- a/internal/controlplane/handlers_entities.go +++ b/internal/controlplane/handlers_entities.go @@ -16,12 +16,12 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/providers" "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/internal/util" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // ReconcileEntityRegistration reconciles the registration of an entity. @@ -100,7 +100,7 @@ func (s *Server) ReconcileEntityRegistration( func (s *Server) publishEntityMessage(l *zerolog.Logger, msg *message.Message) error { l.Info().Str("messageID", msg.UUID).Msg("publishing register entities message for execution") - return s.evt.Publish(events.TopicQueueReconcileEntityAdd, msg) + return s.evt.Publish(constants.TopicQueueReconcileEntityAdd, msg) } func createEntityMessage( diff --git a/internal/controlplane/handlers_reconciliationtasks.go b/internal/controlplane/handlers_reconciliationtasks.go index 9a0148bcd7..0842edf659 100644 --- a/internal/controlplane/handlers_reconciliationtasks.go +++ b/internal/controlplane/handlers_reconciliationtasks.go @@ -16,10 +16,10 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/engcontext" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" reconcilers "github.com/mindersec/minder/internal/reconcilers/messages" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // CreateEntityReconciliationTask creates a task to reconcile the state of an entity @@ -49,7 +49,7 @@ func (s *Server) CreateEntityReconciliationTask(ctx context.Context, if err != nil { return nil, err } - topic = events.TopicQueueReconcileRepoInit + topic = constants.TopicQueueReconcileRepoInit } else { return nil, status.Errorf(codes.InvalidArgument, "entity type %s is not supported", entity.GetType()) } diff --git a/internal/eea/eea.go b/internal/eea/eea.go index b25beb7d79..3fd6c50794 100644 --- a/internal/eea/eea.go +++ b/internal/eea/eea.go @@ -20,10 +20,10 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/entities" "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/manager" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -51,7 +51,7 @@ func NewEEA(querier db.Store, evt interfaces.Publisher, cfg *serverconfig.Aggreg // Register implements the Consumer interface. func (e *EEA) Register(r interfaces.Registrar) { - r.Register(events.TopicQueueEntityFlush, e.FlushMessageHandler) + r.Register(constants.TopicQueueEntityFlush, e.FlushMessageHandler) } // AggregateMiddleware will pass on the event to the executor engine diff --git a/internal/eea/eea_test.go b/internal/eea/eea_test.go index bdf63dfa90..8749a027e4 100644 --- a/internal/eea/eea_test.go +++ b/internal/eea/eea_test.go @@ -27,11 +27,11 @@ import ( "github.com/mindersec/minder/internal/entities/properties" psvc "github.com/mindersec/minder/internal/entities/properties/service" propsvcmock "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" mockmanager "github.com/mindersec/minder/internal/providers/manager/mock" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/constants" ) const ( @@ -87,7 +87,7 @@ func TestAggregator(t *testing.T) { aggr.Register(evt) // This tests that flushing sends messages to the executor engine - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware) go func() { t.Log("Running eventer") @@ -134,7 +134,7 @@ func TestAggregator(t *testing.T) { msg, err := inf.BuildMessage() require.NoError(t, err, "expected no error when building message") - err = evt.Publish(events.TopicQueueEntityFlush, msg.Copy()) + err = evt.Publish(constants.TopicQueueEntityFlush, msg.Copy()) require.NoError(t, err, "expected no error when publishing message") }() } @@ -353,7 +353,7 @@ func TestFlushAll(t *testing.T) { require.NoError(t, err) flushedMessages := newTestPubSub() - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add) go func() { t.Log("Running eventer") @@ -413,7 +413,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) { flushedMessages := newTestPubSub() // This tests that flushing sends messages to the executor engine - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware) t.Log("Flushing all") require.NoError(t, aggr.FlushAll(ctx), "expected no error") @@ -441,7 +441,7 @@ func TestFlushAllListFlushFails(t *testing.T) { require.NoError(t, err) // This tests that flushing sends messages to the executor engine - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add) go func() { t.Log("Running eventer") @@ -491,7 +491,7 @@ func TestFlushAllListFlushListsARepoThatGetsDeletedLater(t *testing.T) { require.NoError(t, err) // This tests that flushing sends messages to the executor engine - evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add) + evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add) go func() { t.Log("Running eventer") diff --git a/internal/engine/entities/entity_event.go b/internal/engine/entities/entity_event.go index 0a00e81b11..d45e1d7c45 100644 --- a/internal/engine/entities/entity_event.go +++ b/internal/engine/entities/entity_event.go @@ -12,8 +12,8 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/reflect/protoreflect" - "github.com/mindersec/minder/internal/events" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -187,7 +187,7 @@ func (eiw *EntityInfoWrapper) Publish(evt interfaces.Publisher) error { return err } - if err := evt.Publish(events.TopicQueueEntityEvaluate, msg); err != nil { + if err := evt.Publish(constants.TopicQueueEntityEvaluate, msg); err != nil { return fmt.Errorf("error publishing entity event: %w", err) } diff --git a/internal/engine/handler.go b/internal/engine/handler.go index a7a4a7b261..292d910d53 100644 --- a/internal/engine/handler.go +++ b/internal/engine/handler.go @@ -15,9 +15,9 @@ import ( "github.com/mindersec/minder/internal/engine/engcontext" "github.com/mindersec/minder/internal/engine/entities" - "github.com/mindersec/minder/internal/events" minderlogger "github.com/mindersec/minder/internal/logger" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -72,7 +72,7 @@ func NewExecutorEventHandler( // Register implements the Consumer interface. func (e *ExecutorEventHandler) Register(r interfaces.Registrar) { - r.Register(events.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...) + r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...) } // Wait waits for all the entity executions to finish. @@ -170,7 +170,7 @@ func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error { } // Publish the result of the entity evaluation - if err := e.evt.Publish(events.TopicQueueEntityFlush, msg); err != nil { + if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil { logger.Err(err).Msg("error publishing flush event") } }() diff --git a/internal/engine/handler_test.go b/internal/engine/handler_test.go index b9e7aa12c1..af6eb873de 100644 --- a/internal/engine/handler_test.go +++ b/internal/engine/handler_test.go @@ -16,11 +16,11 @@ import ( "github.com/mindersec/minder/internal/engine" "github.com/mindersec/minder/internal/engine/entities" mockengine "github.com/mindersec/minder/internal/engine/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/util/testqueue" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/constants" ) func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { @@ -52,7 +52,7 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) { go func() { t.Log("Running eventer") - evt.Register(events.TopicQueueEntityFlush, pq.Pass) + evt.Register(constants.TopicQueueEntityFlush, pq.Pass) err := evt.Run(context.Background()) require.NoError(t, err, "failed to run eventer") }() diff --git a/internal/entities/handlers/handler.go b/internal/entities/handlers/handler.go index 1b21b3e9f8..b619e7e96c 100644 --- a/internal/entities/handlers/handler.go +++ b/internal/entities/handlers/handler.go @@ -19,10 +19,10 @@ import ( "github.com/mindersec/minder/internal/entities/models" "github.com/mindersec/minder/internal/entities/properties" propertyService "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/projects/features" "github.com/mindersec/minder/internal/providers/manager" v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -206,8 +206,8 @@ func NewRefreshByIDAndEvaluateHandler( refreshEntity: entStrategies.NewRefreshEntityByIDStrategy(propSvc, provMgr, store), createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), - handlerName: events.TopicQueueRefreshEntityByIDAndEvaluate, - forwardHandlerName: events.TopicQueueEntityEvaluate, + handlerName: constants.TopicQueueRefreshEntityByIDAndEvaluate, + forwardHandlerName: constants.TopicQueueEntityEvaluate, handlerMiddleware: handlerMiddleware, } @@ -228,8 +228,8 @@ func NewRefreshEntityAndEvaluateHandler( refreshEntity: entStrategies.NewRefreshEntityByUpstreamPropsStrategy(propSvc, provMgr, store), createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), - handlerName: events.TopicQueueRefreshEntityAndEvaluate, - forwardHandlerName: events.TopicQueueEntityEvaluate, + handlerName: constants.TopicQueueRefreshEntityAndEvaluate, + forwardHandlerName: constants.TopicQueueEntityEvaluate, handlerMiddleware: handlerMiddleware, } @@ -249,8 +249,8 @@ func NewGetEntityAndDeleteHandler( refreshEntity: entStrategies.NewGetEntityByUpstreamIDStrategy(propSvc), createMessage: msgStrategies.NewToMinderEntity(), - handlerName: events.TopicQueueGetEntityAndDelete, - forwardHandlerName: events.TopicQueueReconcileEntityDelete, + handlerName: constants.TopicQueueGetEntityAndDelete, + forwardHandlerName: constants.TopicQueueReconcileEntityDelete, handlerMiddleware: handlerMiddleware, } @@ -271,8 +271,8 @@ func NewAddOriginatingEntityHandler( refreshEntity: entStrategies.NewAddOriginatingEntityStrategy(propSvc, provMgr, store), createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), - handlerName: events.TopicQueueOriginatingEntityAdd, - forwardHandlerName: events.TopicQueueEntityEvaluate, + handlerName: constants.TopicQueueOriginatingEntityAdd, + forwardHandlerName: constants.TopicQueueEntityEvaluate, handlerMiddleware: handlerMiddleware, } @@ -292,7 +292,7 @@ func NewRemoveOriginatingEntityHandler( refreshEntity: entStrategies.NewDelOriginatingEntityStrategy(propSvc, provMgr, store), createMessage: msgStrategies.NewCreateEmpty(), - handlerName: events.TopicQueueOriginatingEntityDelete, + handlerName: constants.TopicQueueOriginatingEntityDelete, handlerMiddleware: handlerMiddleware, } diff --git a/internal/entities/handlers/handler_test.go b/internal/entities/handlers/handler_test.go index 640f436f5c..afb40cf27c 100644 --- a/internal/entities/handlers/handler_test.go +++ b/internal/entities/handlers/handler_test.go @@ -24,7 +24,6 @@ import ( "github.com/mindersec/minder/internal/entities/properties" "github.com/mindersec/minder/internal/entities/properties/service" "github.com/mindersec/minder/internal/entities/properties/service/mock/fixtures" - "github.com/mindersec/minder/internal/events" stubeventer "github.com/mindersec/minder/internal/events/stubs" mockgithub "github.com/mindersec/minder/internal/providers/github/mock" ghprops "github.com/mindersec/minder/internal/providers/github/properties" @@ -33,6 +32,7 @@ import ( provManFixtures "github.com/mindersec/minder/internal/providers/manager/mock/fixtures" "github.com/mindersec/minder/internal/reconcilers/messages" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" provifv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -275,7 +275,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { df.WithTransaction(), ), expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -290,7 +290,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { }, mockStoreFunc: df.NewMockStore(), expectedPublish: false, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -320,7 +320,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { df.WithTransaction(), ), expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -355,7 +355,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { df.WithTransaction(), ), expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -419,7 +419,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { df.WithSuccessfulGetFeatureInProject(true), ), expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkRepoMessage, }, { @@ -619,7 +619,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { ) }, expectedPublish: true, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, checkWmMsg: checkPullRequestMessage, }, { @@ -684,7 +684,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { }, expectedPublish: true, checkWmMsg: checkRepoEntityMessage, - topic: events.TopicQueueReconcileEntityDelete, + topic: constants.TopicQueueReconcileEntityDelete, }, { name: "NewGetEntityAndDeleteHandler: failure to get entity does not publish", diff --git a/internal/events/events.go b/internal/events/events.go index ebb23b81ce..f4ea37be31 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -28,6 +28,7 @@ import ( "github.com/mindersec/minder/internal/events/nats" eventersql "github.com/mindersec/minder/internal/events/sql" serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -77,8 +78,8 @@ func NewEventer(ctx context.Context, cfg *serverconfig.EventConfig) (interfaces. metricsBuilder := metrics.NewPrometheusMetricsBuilder( promgo.DefaultRegisterer, - metricsNamespace, - metricsSubsystem) + constants.MetricsNamespace, + constants.MetricsSubsystem) metricsBuilder.AddPrometheusRouterMetrics(router) zerolog.Ctx(ctx).Info().Msg("Router Metrics registered") @@ -94,7 +95,7 @@ func NewEventer(ctx context.Context, cfg *serverconfig.EventConfig) (interfaces. return nil, fmt.Errorf("failed instantiating driver: %w", err) } - poisonQueueMiddleware, err := middleware.PoisonQueue(pub, DeadLetterQueueTopic) + poisonQueueMiddleware, err := middleware.PoisonQueue(pub, constants.DeadLetterQueueTopic) if err != nil { return nil, fmt.Errorf("failed instantiating poison queue: %w", err) } @@ -143,13 +144,13 @@ func instantiateDriver( cfg *serverconfig.EventConfig, ) (message.Publisher, message.Subscriber, common.DriverCloser, error) { switch driver { - case GoChannelDriver: + case constants.GoChannelDriver: zerolog.Ctx(ctx).Info().Msg("Using go-channel driver") return gochannel.BuildGoChannelDriver(ctx, cfg) - case SQLDriver: + case constants.SQLDriver: zerolog.Ctx(ctx).Info().Msg("Using SQL driver") return eventersql.BuildPostgreSQLDriver(ctx, cfg) - case NATSDriver: + case constants.NATSDriver: zerolog.Ctx(ctx).Info().Msg("Using NATS driver") return nats.BuildNatsChannelDriver(cfg) default: @@ -190,7 +191,7 @@ func (e *eventer) Publish(topic string, messages ...*message.Message) error { "component": "eventer", "function": "Publish", }) - msg.Metadata.Set(PublishedKey, time.Now().Format(time.RFC3339)) + msg.Metadata.Set(constants.PublishedKey, time.Now().Format(time.RFC3339)) } } diff --git a/internal/events/events_test.go b/internal/events/events_test.go index a26d6700f3..c0b07d7723 100644 --- a/internal/events/events_test.go +++ b/internal/events/events_test.go @@ -15,9 +15,9 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "github.com/mindersec/minder/internal/events" serverconfig "github.com/mindersec/minder/pkg/config/server" "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -131,7 +131,7 @@ func TestEventer(t *testing.T) { name: "handler fails, message goes to DLQ", publish: []eventPair{{"test_dlq", &message.Message{Metadata: map[string]string{}}}}, want: map[string][]message.Message{ - events.DeadLetterQueueTopic: {{}}, + constants.DeadLetterQueueTopic: {{}}, }, consumers: []fakeConsumer{ { @@ -139,7 +139,7 @@ func TestEventer(t *testing.T) { shouldFailHandler: true, }, { - topics: []string{events.DeadLetterQueueTopic}, + topics: []string{constants.DeadLetterQueueTopic}, makeHandler: fakeHandler, }, }, diff --git a/internal/events/metrics.go b/internal/events/metrics.go index 5c573a5b58..f8cfcc60e7 100644 --- a/internal/events/metrics.go +++ b/internal/events/metrics.go @@ -11,13 +11,15 @@ import ( "github.com/ThreeDotsLabs/watermill/message/router/middleware" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + + "github.com/mindersec/minder/pkg/eventer/constants" ) func recordMetrics(instruments *messageInstruments) func(h message.HandlerFunc) message.HandlerFunc { metricsFunc := func(h message.HandlerFunc) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { var processingTime time.Duration - if publishedAt := msg.Metadata.Get(PublishedKey); publishedAt != "" { + if publishedAt := msg.Metadata.Get(constants.PublishedKey); publishedAt != "" { if parsedTime, err := time.Parse(time.RFC3339, publishedAt); err == nil { processingTime = time.Since(parsedTime) } diff --git a/internal/providers/github/webhook/app.go b/internal/providers/github/webhook/app.go index 5a3640e6fa..e7fff195b0 100644 --- a/internal/providers/github/webhook/app.go +++ b/internal/providers/github/webhook/app.go @@ -20,12 +20,12 @@ import ( "github.com/mindersec/minder/internal/controlplane/metrics" "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/clients" "github.com/mindersec/minder/internal/providers/github/installations" "github.com/mindersec/minder/internal/providers/github/service" "github.com/mindersec/minder/internal/reconcilers/messages" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -130,15 +130,15 @@ func HandleGitHubAppWebhook( wes.Typ = github.WebHookType(r) m := message.NewMessage(uuid.New().String(), nil) - m.Metadata.Set(events.ProviderDeliveryIdKey, github.DeliveryID(r)) + m.Metadata.Set(constants.ProviderDeliveryIdKey, github.DeliveryID(r)) // TODO: handle other sources - m.Metadata.Set(events.ProviderSourceKey, "https://api.github.com/") - m.Metadata.Set(events.GithubWebhookEventTypeKey, wes.Typ) + m.Metadata.Set(constants.ProviderSourceKey, "https://api.github.com/") + m.Metadata.Set(constants.GithubWebhookEventTypeKey, wes.Typ) l = l.With(). - Str("webhook-event-type", m.Metadata[events.GithubWebhookEventTypeKey]). - Str("providertype", m.Metadata[events.ProviderTypeKey]). - Str("upstream-delivery-id", m.Metadata[events.ProviderDeliveryIdKey]). + Str("webhook-event-type", m.Metadata[constants.GithubWebhookEventTypeKey]). + Str("providertype", m.Metadata[constants.ProviderTypeKey]). + Str("upstream-delivery-id", m.Metadata[constants.ProviderDeliveryIdKey]). // This is added for consistency with how // watermill tracks message UUID when logging. Str("message_uuid", m.UUID). @@ -357,7 +357,7 @@ func processInstallationRepositoriesAppEvent( func repositoryRemoved( repo *repo, ) (*processingResult, error) { - return sendEvaluateRepoMessage(repo, events.TopicQueueGetEntityAndDelete) + return sendEvaluateRepoMessage(repo, constants.TopicQueueGetEntityAndDelete) } func repositoryAdded( @@ -384,7 +384,7 @@ func repositoryAdded( WithProperties(addRepoProps) return &processingResult{ - topic: events.TopicQueueReconcileEntityAdd, + topic: constants.TopicQueueReconcileEntityAdd, wrapper: event, }, nil } diff --git a/internal/providers/github/webhook/fuzz_test.go b/internal/providers/github/webhook/fuzz_test.go index 1200d2cf95..676a5ad33b 100644 --- a/internal/providers/github/webhook/fuzz_test.go +++ b/internal/providers/github/webhook/fuzz_test.go @@ -19,8 +19,8 @@ import ( "github.com/mindersec/minder/internal/controlplane/metrics" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" ) var eventTypes = [23]string{ @@ -89,10 +89,10 @@ func FuzzGitHubEventParsers(f *testing.F) { wes.Typ = github.WebHookType(req) m := message.NewMessage("", nil) - m.Metadata.Set(events.ProviderDeliveryIdKey, github.DeliveryID(req)) - m.Metadata.Set(events.ProviderTypeKey, string(db.ProviderTypeGithub)) - m.Metadata.Set(events.ProviderSourceKey, "") - m.Metadata.Set(events.GithubWebhookEventTypeKey, wes.Typ) + m.Metadata.Set(constants.ProviderDeliveryIdKey, github.DeliveryID(req)) + m.Metadata.Set(constants.ProviderTypeKey, string(db.ProviderTypeGithub)) + m.Metadata.Set(constants.ProviderSourceKey, "") + m.Metadata.Set(constants.GithubWebhookEventTypeKey, wes.Typ) // Create whConfig whSecretFile, err := os.CreateTemp("", "webhooksecret*") diff --git a/internal/providers/github/webhook/handlers_githubwebhooks_test.go b/internal/providers/github/webhook/handlers_githubwebhooks_test.go index a2bb333725..3217a62f35 100644 --- a/internal/providers/github/webhook/handlers_githubwebhooks_test.go +++ b/internal/providers/github/webhook/handlers_githubwebhooks_test.go @@ -41,7 +41,6 @@ import ( entMsg "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/installations" gf "github.com/mindersec/minder/internal/providers/github/mock/fixtures" ghprop "github.com/mindersec/minder/internal/providers/github/properties" @@ -51,6 +50,7 @@ import ( v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" serverconfig "github.com/mindersec/minder/pkg/config/server" "github.com/mindersec/minder/pkg/eventer" + "github.com/mindersec/minder/pkg/eventer/constants" ) //go:embed test-payloads/installation-deleted.json @@ -132,7 +132,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() { pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() - evt.Register(events.TopicQueueEntityEvaluate, pq.Pass) + evt.Register(constants.TopicQueueEntityEvaluate, pq.Pass) go func() { err := evt.Run(context.Background()) @@ -182,7 +182,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() { defer pq.Close() queued := pq.GetQueue() - evt.Register(events.TopicQueueRefreshEntityAndEvaluate, pq.Pass) + evt.Register(constants.TopicQueueRefreshEntityAndEvaluate, pq.Pass) go func() { err := evt.Run(context.Background()) @@ -242,7 +242,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() { // This changes because "meta" event can only trigger a // deletion - evt.Register(events.TopicQueueGetEntityAndDelete, pq.Pass) + evt.Register(constants.TopicQueueGetEntityAndDelete, pq.Pass) go func() { err := evt.Run(context.Background()) @@ -342,7 +342,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() { pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() - evt.Register(events.TopicQueueEntityEvaluate, pq.Pass) + evt.Register(constants.TopicQueueEntityEvaluate, pq.Pass) go func() { err := evt.Run(context.Background()) @@ -461,7 +461,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/apps"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, { @@ -485,7 +485,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://example.com/random/url"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, { @@ -523,7 +523,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("login/package-name"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -546,7 +546,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -591,7 +591,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -609,7 +609,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, @@ -645,7 +645,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://example.com/random/url"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -680,7 +680,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -715,7 +715,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -747,7 +747,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -792,7 +792,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://example.com/random/url"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -827,7 +827,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -862,7 +862,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -894,7 +894,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -924,7 +924,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -960,7 +960,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -999,7 +999,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -1034,7 +1034,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1062,7 +1062,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1090,7 +1090,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1118,7 +1118,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1145,7 +1145,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1173,7 +1173,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1200,7 +1200,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1228,7 +1228,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1261,7 +1261,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1293,7 +1293,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -1325,7 +1325,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -1357,7 +1357,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1390,7 +1390,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1423,7 +1423,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1456,7 +1456,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1490,7 +1490,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, _ string, ch <-chan *message.Message) { t.Helper() @@ -1521,7 +1521,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1556,7 +1556,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Private: github.Bool(true), }, }, - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, // the message is passed on to events.TopicQueueRefreshEntityAndEvaluate // which should discard it (see test there) @@ -1589,7 +1589,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Private: github.Bool(true), }, }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, // the message is passed on to events.TopicQueueRefreshEntityAndEvaluate // which should discard it (see test there) @@ -1625,7 +1625,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1653,7 +1653,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1681,7 +1681,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1709,7 +1709,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1737,7 +1737,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1765,7 +1765,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1792,7 +1792,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1820,7 +1820,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1848,7 +1848,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1876,7 +1876,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1904,7 +1904,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1932,7 +1932,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1960,7 +1960,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -1988,7 +1988,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2016,7 +2016,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2044,7 +2044,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2071,7 +2071,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2098,7 +2098,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2118,7 +2118,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { event: "org_block", // https://pkg.go.dev/github.com/google/go-github/v62@v62.0.0/github#OrgBlockEvent payload: &github.OrgBlockEvent{}, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, queued: nil, }, @@ -2136,7 +2136,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { HTMLURL: github.String("https://github.com/mindersec/minder"), }, }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2156,7 +2156,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { event: "push", // https://pkg.go.dev/github.com/google/go-github/v62@v62.0.0/github#PushEvent rawPayload: []byte(rawPushEvent), - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2185,7 +2185,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2211,7 +2211,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2230,7 +2230,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { // https://docs.github.com/en/webhooks/webhook-events-and-payloads#branch_protection_configuration event: "branch_protection_configuration", rawPayload: []byte(rawBranchProtectionConfigurationDisabledEvent), - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2256,7 +2256,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2282,7 +2282,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2308,7 +2308,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2334,7 +2334,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2360,7 +2360,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2386,7 +2386,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { "https://github.com/mindersec/minder", ), }, - topic: events.TopicQueueRefreshEntityAndEvaluate, + topic: constants.TopicQueueRefreshEntityAndEvaluate, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2430,7 +2430,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder/42"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2472,7 +2472,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder/42"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2514,7 +2514,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder/42"), }, - topic: events.TopicQueueOriginatingEntityAdd, + topic: constants.TopicQueueOriginatingEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2555,7 +2555,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { ghMocks: []func(hubMock gf.GitHubMock){ gf.WithSuccessfulGetEntityName("mindersec/minder/42"), }, - topic: events.TopicQueueOriginatingEntityDelete, + topic: constants.TopicQueueOriginatingEntityDelete, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -2619,7 +2619,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Login: github.String("stacklok"), }, }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusInternalServerError, queued: nil, }, @@ -2632,7 +2632,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Action: github.String("created"), Garbage: github.String("garbage"), }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, { @@ -2642,7 +2642,7 @@ func (s *UnitTestSuite) TestHandleGitHubWebHook() { Action: github.String("created"), Garbage: github.String("garbage"), }, - topic: events.TopicQueueEntityEvaluate, + topic: constants.TopicQueueEntityEvaluate, statusCode: http.StatusOK, }, } @@ -3022,7 +3022,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { }, 54321), ), - topic: events.TopicQueueReconcileEntityAdd, + topic: constants.TopicQueueReconcileEntityAdd, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() @@ -3109,7 +3109,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { }, 54321), ), - topic: events.TopicQueueReconcileEntityAdd, + topic: constants.TopicQueueReconcileEntityAdd, statusCode: http.StatusOK, //nolint:thelper queued: nil, @@ -3164,7 +3164,7 @@ func (s *UnitTestSuite) TestHandleGitHubAppWebHook() { }, 54321), ), - topic: events.TopicQueueGetEntityAndDelete, + topic: constants.TopicQueueGetEntityAndDelete, statusCode: http.StatusOK, queued: func(t *testing.T, event string, ch <-chan *message.Message) { t.Helper() diff --git a/internal/providers/github/webhook/handlers_packages.go b/internal/providers/github/webhook/handlers_packages.go index 9316c97541..7af5941e34 100644 --- a/internal/providers/github/webhook/handlers_packages.go +++ b/internal/providers/github/webhook/handlers_packages.go @@ -15,9 +15,9 @@ import ( "github.com/mindersec/minder/internal/db" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" ghprop "github.com/mindersec/minder/internal/providers/github/properties" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // packageEvent represent any event related to a repository and one of @@ -124,7 +124,7 @@ func processPackageEvent( WithOriginator(pb.Entity_ENTITY_REPOSITORIES, repoProps). WithProviderImplementsHint(string(db.ProviderTypeGithub)) - return &processingResult{topic: events.TopicQueueOriginatingEntityAdd, wrapper: pkgMsg}, nil + return &processingResult{topic: constants.TopicQueueOriginatingEntityAdd, wrapper: pkgMsg}, nil } // This routine assumes that all necessary validation is performed on diff --git a/internal/providers/github/webhook/handlers_pull_requests.go b/internal/providers/github/webhook/handlers_pull_requests.go index 769b71f4ea..a4061ef0a5 100644 --- a/internal/providers/github/webhook/handlers_pull_requests.go +++ b/internal/providers/github/webhook/handlers_pull_requests.go @@ -14,9 +14,9 @@ import ( "github.com/mindersec/minder/internal/db" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" ghprop "github.com/mindersec/minder/internal/providers/github/properties" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // pullRequestEvent are events related to pull requests issued around @@ -142,9 +142,9 @@ func processPullRequestEvent( case webhookActionEventOpened, webhookActionEventReopened, webhookActionEventSynchronize: - topic = events.TopicQueueOriginatingEntityAdd + topic = constants.TopicQueueOriginatingEntityAdd case webhookActionEventClosed: - topic = events.TopicQueueOriginatingEntityDelete + topic = constants.TopicQueueOriginatingEntityDelete default: zerolog.Ctx(ctx).Info().Msgf("action %s is not handled for pull requests", pullProps.GetProperty(ghprop.PullPropertyAction).GetString()) diff --git a/internal/providers/github/webhook/handlers_repos.go b/internal/providers/github/webhook/handlers_repos.go index 835e59c385..53eb6f0883 100644 --- a/internal/providers/github/webhook/handlers_repos.go +++ b/internal/providers/github/webhook/handlers_repos.go @@ -15,9 +15,9 @@ import ( "github.com/mindersec/minder/internal/db" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" ghprop "github.com/mindersec/minder/internal/providers/github/properties" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) // repoEvent represents any event related to a repository. @@ -125,7 +125,7 @@ func processRepositoryEvent( l.Info().Msg("handling event for repository") - return sendEvaluateRepoMessage(event.GetRepo(), events.TopicQueueRefreshEntityAndEvaluate) + return sendEvaluateRepoMessage(event.GetRepo(), constants.TopicQueueRefreshEntityAndEvaluate) } func sendEvaluateRepoMessage( @@ -204,14 +204,14 @@ func processRelevantRepositoryEvent( } // For all other events exept deletions we issue a refresh event. - topic := events.TopicQueueRefreshEntityAndEvaluate + topic := constants.TopicQueueRefreshEntityAndEvaluate // For webhook deletions, repository deletions, and repository // transfers, we issue a delete event with the correct message // type. if event.GetAction() == webhookActionEventDeleted || event.GetAction() == webhookActionEventTransferred { - topic = events.TopicQueueGetEntityAndDelete + topic = constants.TopicQueueGetEntityAndDelete } return &processingResult{ diff --git a/internal/providers/github/webhook/hook.go b/internal/providers/github/webhook/hook.go index 03cc6ec638..14e3236ec2 100644 --- a/internal/providers/github/webhook/hook.go +++ b/internal/providers/github/webhook/hook.go @@ -22,10 +22,10 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/entities" entMsg "github.com/mindersec/minder/internal/entities/handlers/message" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/github/installations" "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -106,15 +106,15 @@ func HandleWebhookEvent( // TODO: extract sender and event time from payload portably m := message.NewMessage(uuid.New().String(), nil) - m.Metadata.Set(events.ProviderDeliveryIdKey, github.DeliveryID(r)) - m.Metadata.Set(events.ProviderTypeKey, string(db.ProviderTypeGithub)) - m.Metadata.Set(events.ProviderSourceKey, "https://api.github.com/") // TODO: handle other sources - m.Metadata.Set(events.GithubWebhookEventTypeKey, wes.Typ) + m.Metadata.Set(constants.ProviderDeliveryIdKey, github.DeliveryID(r)) + m.Metadata.Set(constants.ProviderTypeKey, string(db.ProviderTypeGithub)) + m.Metadata.Set(constants.ProviderSourceKey, "https://api.github.com/") // TODO: handle other sources + m.Metadata.Set(constants.GithubWebhookEventTypeKey, wes.Typ) l = l.With(). - Str("webhook-event-type", m.Metadata[events.GithubWebhookEventTypeKey]). - Str("providertype", m.Metadata[events.ProviderTypeKey]). - Str("upstream-delivery-id", m.Metadata[events.ProviderDeliveryIdKey]). + Str("webhook-event-type", m.Metadata[constants.GithubWebhookEventTypeKey]). + Str("providertype", m.Metadata[constants.ProviderTypeKey]). + Str("upstream-delivery-id", m.Metadata[constants.ProviderDeliveryIdKey]). // This is added for consistency with how // watermill tracks message UUID when logging. Str("message_uuid", m.UUID). diff --git a/internal/providers/gitlab/manager/webhook_handlers_merge_requests.go b/internal/providers/gitlab/manager/webhook_handlers_merge_requests.go index 42f9c83150..13c38c8234 100644 --- a/internal/providers/gitlab/manager/webhook_handlers_merge_requests.go +++ b/internal/providers/gitlab/manager/webhook_handlers_merge_requests.go @@ -14,9 +14,9 @@ import ( entmsg "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/gitlab" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) func (m *providerClassManager) handleMergeRequest(l zerolog.Logger, r *http.Request) error { @@ -47,13 +47,13 @@ func (m *providerClassManager) handleMergeRequest(l zerolog.Logger, r *http.Requ case mergeRequestEvent.ObjectAttributes.Action == "open", mergeRequestEvent.ObjectAttributes.Action == "reopen": return m.publishMergeRequestMessage(mrID, mrIID, rawProjectID, - events.TopicQueueOriginatingEntityAdd) + constants.TopicQueueOriginatingEntityAdd) case mergeRequestEvent.ObjectAttributes.Action == "close": return m.publishMergeRequestMessage(mrID, mrIID, rawProjectID, - events.TopicQueueOriginatingEntityDelete) + constants.TopicQueueOriginatingEntityDelete) case mergeRequestEvent.ObjectAttributes.Action == "update": return m.publishMergeRequestMessage(mrID, mrIID, rawProjectID, - events.TopicQueueRefreshEntityAndEvaluate) + constants.TopicQueueRefreshEntityAndEvaluate) default: return nil } diff --git a/internal/providers/gitlab/manager/webhook_handlers_releases.go b/internal/providers/gitlab/manager/webhook_handlers_releases.go index 92f0430586..a1c19838d5 100644 --- a/internal/providers/gitlab/manager/webhook_handlers_releases.go +++ b/internal/providers/gitlab/manager/webhook_handlers_releases.go @@ -14,9 +14,9 @@ import ( entmsg "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/gitlab" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) func (m *providerClassManager) handleRelease(l zerolog.Logger, r *http.Request) error { @@ -47,13 +47,13 @@ func (m *providerClassManager) handleRelease(l zerolog.Logger, r *http.Request) switch { case releaseEvent.Action == "create": return m.publishReleaseMessage(releaseID, tag, rawProjectID, - events.TopicQueueOriginatingEntityAdd) + constants.TopicQueueOriginatingEntityAdd) case releaseEvent.Action == "update": return m.publishReleaseMessage(releaseID, tag, rawProjectID, - events.TopicQueueRefreshEntityAndEvaluate) + constants.TopicQueueRefreshEntityAndEvaluate) case releaseEvent.Action == "delete": return m.publishReleaseMessage(releaseID, tag, rawProjectID, - events.TopicQueueOriginatingEntityDelete) + constants.TopicQueueOriginatingEntityDelete) default: return nil } diff --git a/internal/providers/gitlab/manager/webhook_handlers_repos.go b/internal/providers/gitlab/manager/webhook_handlers_repos.go index 01c46180df..3e2f1b7ba8 100644 --- a/internal/providers/gitlab/manager/webhook_handlers_repos.go +++ b/internal/providers/gitlab/manager/webhook_handlers_repos.go @@ -14,9 +14,9 @@ import ( entmsg "github.com/mindersec/minder/internal/entities/handlers/message" "github.com/mindersec/minder/internal/entities/properties" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/gitlab" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" ) func (m *providerClassManager) handleRepoPush(l zerolog.Logger, r *http.Request) error { @@ -83,7 +83,7 @@ func (m *providerClassManager) publishRefreshAndEvalForGitlabProject( // Publish message l.Debug().Str("msg_id", msgID).Msg("publishing refresh and eval message") - if err := m.pub.Publish(events.TopicQueueRefreshEntityAndEvaluate, msg); err != nil { + if err := m.pub.Publish(constants.TopicQueueRefreshEntityAndEvaluate, msg); err != nil { l.Error().Err(err).Msg("error publishing refresh and eval message") return fmt.Errorf("error publishing refresh and eval message: %w", err) } diff --git a/internal/reconcilers/reconcilers.go b/internal/reconcilers/reconcilers.go index 67ddc02235..cdef3778ad 100644 --- a/internal/reconcilers/reconcilers.go +++ b/internal/reconcilers/reconcilers.go @@ -8,9 +8,9 @@ package reconcilers import ( "github.com/mindersec/minder/internal/crypto" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/providers/manager" "github.com/mindersec/minder/internal/repositories" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -42,8 +42,8 @@ func NewReconciler( // Register implements the Consumer interface. func (r *Reconciler) Register(reg interfaces.Registrar) { - reg.Register(events.TopicQueueReconcileRepoInit, r.handleRepoReconcilerEvent) - reg.Register(events.TopicQueueReconcileProfileInit, r.handleProfileInitEvent) - reg.Register(events.TopicQueueReconcileEntityDelete, r.handleEntityDeleteEvent) - reg.Register(events.TopicQueueReconcileEntityAdd, r.handleEntityAddEvent) + reg.Register(constants.TopicQueueReconcileRepoInit, r.handleRepoReconcilerEvent) + reg.Register(constants.TopicQueueReconcileProfileInit, r.handleProfileInitEvent) + reg.Register(constants.TopicQueueReconcileEntityDelete, r.handleEntityDeleteEvent) + reg.Register(constants.TopicQueueReconcileEntityAdd, r.handleEntityAddEvent) } diff --git a/internal/reconcilers/repository.go b/internal/reconcilers/repository.go index 708b3f1146..d414832d72 100644 --- a/internal/reconcilers/repository.go +++ b/internal/reconcilers/repository.go @@ -15,8 +15,8 @@ import ( "github.com/rs/zerolog/log" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/reconcilers/messages" + "github.com/mindersec/minder/pkg/eventer/constants" ) // handleRepoReconcilerEvent handles events coming from the reconciler topic @@ -64,7 +64,7 @@ func (r *Reconciler) handleRepositoryReconcilerEvent(ctx context.Context, evt *m } m.SetContext(ctx) - if err := r.evt.Publish(events.TopicQueueRefreshEntityByIDAndEvaluate, m); err != nil { + if err := r.evt.Publish(constants.TopicQueueRefreshEntityByIDAndEvaluate, m); err != nil { // we retry in case watermill is having a bad day return fmt.Errorf("error publishing message: %w", err) } diff --git a/internal/reconcilers/repository_test.go b/internal/reconcilers/repository_test.go index 26153e40c6..7f3987bcb1 100644 --- a/internal/reconcilers/repository_test.go +++ b/internal/reconcilers/repository_test.go @@ -10,9 +10,9 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "github.com/mindersec/minder/internal/events" stubeventer "github.com/mindersec/minder/internal/events/stubs" "github.com/mindersec/minder/internal/reconcilers/messages" + "github.com/mindersec/minder/pkg/eventer/constants" ) var ( @@ -33,7 +33,7 @@ func Test_handleRepoReconcilerEvent(t *testing.T) { }{ { name: "valid event", - topic: events.TopicQueueRefreshEntityByIDAndEvaluate, + topic: constants.TopicQueueRefreshEntityByIDAndEvaluate, entityID: testRepoID, expectedPublish: true, expectedErr: false, @@ -44,7 +44,7 @@ func Test_handleRepoReconcilerEvent(t *testing.T) { // just before reconciling artifacts - we verify that because if we hit the artifacts path, we would have // a bunch of other mocks to call name: "event with string as upstream ID does publish", - topic: events.TopicQueueRefreshEntityByIDAndEvaluate, + topic: constants.TopicQueueRefreshEntityByIDAndEvaluate, entityID: testRepoID, expectedPublish: true, expectedErr: false, diff --git a/internal/reconcilers/run_profile.go b/internal/reconcilers/run_profile.go index 896b8e594c..bb9eb13061 100644 --- a/internal/reconcilers/run_profile.go +++ b/internal/reconcilers/run_profile.go @@ -14,7 +14,7 @@ import ( "github.com/rs/zerolog" entityMessage "github.com/mindersec/minder/internal/entities/handlers/message" - "github.com/mindersec/minder/internal/events" + "github.com/mindersec/minder/pkg/eventer/constants" ) // ProfileInitEvent is an event that is sent to the reconciler topic @@ -96,7 +96,7 @@ func (r *Reconciler) publishProfileInitEvents( return nil } - if err := r.evt.Publish(events.TopicQueueRefreshEntityByIDAndEvaluate, m); err != nil { + if err := r.evt.Publish(constants.TopicQueueRefreshEntityByIDAndEvaluate, m); err != nil { // we retry in case watermill is having a bad day return fmt.Errorf("error publishing message: %w", err) } diff --git a/internal/reconcilers/run_profile_test.go b/internal/reconcilers/run_profile_test.go index a6a29c805a..44f11dce88 100644 --- a/internal/reconcilers/run_profile_test.go +++ b/internal/reconcilers/run_profile_test.go @@ -14,8 +14,8 @@ import ( df "github.com/mindersec/minder/database/mock/fixtures" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" stubeventer "github.com/mindersec/minder/internal/events/stubs" + "github.com/mindersec/minder/pkg/eventer/constants" ) var ( @@ -94,7 +94,7 @@ func Test_handleProfileInitEvent(t *testing.T) { require.Equal(t, scenario.numPublish, len(stubEventer.Sent)) if scenario.numPublish > 0 { - require.Contains(t, stubEventer.Topics, events.TopicQueueRefreshEntityByIDAndEvaluate) + require.Contains(t, stubEventer.Topics, constants.TopicQueueRefreshEntityByIDAndEvaluate) } }) } diff --git a/internal/reminder/reminder.go b/internal/reminder/reminder.go index 51fb5908ec..7ec9a61974 100644 --- a/internal/reminder/reminder.go +++ b/internal/reminder/reminder.go @@ -16,10 +16,10 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/events/common" remindermessages "github.com/mindersec/minder/internal/reminder/messages" reminderconfig "github.com/mindersec/minder/pkg/config/reminder" + "github.com/mindersec/minder/pkg/eventer/constants" ) // Interface is an interface over the reminder service @@ -143,7 +143,7 @@ func (r *reminder) sendReminders(ctx context.Context) error { return fmt.Errorf("error creating reminder messages: %w", err) } - err = r.eventPublisher.Publish(events.TopicQueueRepoReminder, messages...) + err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...) if err != nil { return fmt.Errorf("error publishing messages: %w", err) } diff --git a/internal/reminderprocessor/reminder_processor.go b/internal/reminderprocessor/reminder_processor.go index 2365fcfe81..2368296b1e 100644 --- a/internal/reminderprocessor/reminder_processor.go +++ b/internal/reminderprocessor/reminder_processor.go @@ -10,9 +10,9 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/rs/zerolog/log" - "github.com/mindersec/minder/internal/events" reconcilermessages "github.com/mindersec/minder/internal/reconcilers/messages" remindermessages "github.com/mindersec/minder/internal/reminder/messages" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -28,7 +28,7 @@ func NewReminderProcessor(evt interfaces.Interface) *ReminderProcessor { // Register implements the Consumer interface. func (rp *ReminderProcessor) Register(r interfaces.Registrar) { - r.Register(events.TopicQueueRepoReminder, rp.reminderMessageHandler) + r.Register(constants.TopicQueueRepoReminder, rp.reminderMessageHandler) } func (rp *ReminderProcessor) reminderMessageHandler(msg *message.Message) error { @@ -45,7 +45,7 @@ func (rp *ReminderProcessor) reminderMessageHandler(msg *message.Message) error } // This is a non-fatal error, so we'll just log it and continue with the next ones - if err := rp.evt.Publish(events.TopicQueueReconcileRepoInit, repoReconcileMsg); err != nil { + if err := rp.evt.Publish(constants.TopicQueueReconcileRepoInit, repoReconcileMsg); err != nil { log.Printf("error publishing reconciler event: %v", err) } return nil diff --git a/internal/repositories/service.go b/internal/repositories/service.go index ec484d0712..69126ea69e 100644 --- a/internal/repositories/service.go +++ b/internal/repositories/service.go @@ -18,13 +18,13 @@ import ( "github.com/mindersec/minder/internal/entities/models" "github.com/mindersec/minder/internal/entities/properties" "github.com/mindersec/minder/internal/entities/properties/service" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/projects/features" "github.com/mindersec/minder/internal/providers/manager" reconcilers "github.com/mindersec/minder/internal/reconcilers/messages" "github.com/mindersec/minder/internal/util/ptr" pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" provifv1 "github.com/mindersec/minder/pkg/providers/v1" ) @@ -395,7 +395,7 @@ func (r *repositoryService) pushReconcilerEvent(entityID uuid.UUID, projectID uu } // This is a non-fatal error, so we'll just log it and continue with the next ones - if err = r.eventProducer.Publish(events.TopicQueueReconcileRepoInit, msg); err != nil { + if err = r.eventProducer.Publish(constants.TopicQueueReconcileRepoInit, msg); err != nil { log.Printf("error publishing reconciler event: %v", err) } diff --git a/internal/events/constants.go b/pkg/eventer/constants/constants.go similarity index 88% rename from internal/events/constants.go rename to pkg/eventer/constants/constants.go index b2b73a1087..cd7c0041e7 100644 --- a/internal/events/constants.go +++ b/pkg/eventer/constants/constants.go @@ -1,7 +1,8 @@ // SPDX-FileCopyrightText: Copyright 2024 The Minder Authors // SPDX-License-Identifier: Apache-2.0 -package events +// Package constants contains constants used by the eventer package. +package constants // Metadata added to Messages const ( @@ -19,8 +20,10 @@ const ( ) const ( - metricsNamespace = "minder" - metricsSubsystem = "eventer" + // MetricsNamespace is the namespace for all metrics emitted by the eventer + MetricsNamespace = "minder" + // MetricsSubsystem is the subsystem for all metrics emitted by the eventer + MetricsSubsystem = "eventer" ) const ( diff --git a/pkg/profiles/service.go b/pkg/profiles/service.go index 487bec6bdb..f5dab7c177 100644 --- a/pkg/profiles/service.go +++ b/pkg/profiles/service.go @@ -22,7 +22,6 @@ import ( "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/entities" - "github.com/mindersec/minder/internal/events" "github.com/mindersec/minder/internal/logger" "github.com/mindersec/minder/internal/marketplaces/namespaces" "github.com/mindersec/minder/internal/reconcilers" @@ -30,6 +29,7 @@ import ( "github.com/mindersec/minder/internal/util/ptr" minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1" "github.com/mindersec/minder/pkg/engine/selectors" + "github.com/mindersec/minder/pkg/eventer/constants" "github.com/mindersec/minder/pkg/eventer/interfaces" ) @@ -420,7 +420,7 @@ func (p *profileService) sendNewProfileEvent( } // This is a non-fatal error, so we'll just log it and continue with the next ones - if err := p.publisher.Publish(events.TopicQueueReconcileProfileInit, msg); err != nil { + if err := p.publisher.Publish(constants.TopicQueueReconcileProfileInit, msg); err != nil { log.Printf("error publishing reconciler event: %v", err) } }