Skip to content

Commit

Permalink
Move event constants to pkg
Browse files Browse the repository at this point in the history
Signed-off-by: Radoslav Dimitrov <[email protected]>
  • Loading branch information
rdimitrov committed Oct 25, 2024
1 parent bb0c951 commit a949325
Show file tree
Hide file tree
Showing 32 changed files with 203 additions and 197 deletions.
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/engcontext"
"github.com/mindersec/minder/internal/entities/properties"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/logger"
"github.com/mindersec/minder/internal/providers"
"github.com/mindersec/minder/internal/reconcilers/messages"
"github.com/mindersec/minder/internal/util"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
)

// ReconcileEntityRegistration reconciles the registration of an entity.
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *Server) ReconcileEntityRegistration(

func (s *Server) publishEntityMessage(l *zerolog.Logger, msg *message.Message) error {
l.Info().Str("messageID", msg.UUID).Msg("publishing register entities message for execution")
return s.evt.Publish(events.TopicQueueReconcileEntityAdd, msg)
return s.evt.Publish(constants.TopicQueueReconcileEntityAdd, msg)
}

func createEntityMessage(
Expand Down
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_reconciliationtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (

"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/engcontext"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/logger"
reconcilers "github.com/mindersec/minder/internal/reconcilers/messages"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
)

// CreateEntityReconciliationTask creates a task to reconcile the state of an entity
Expand Down Expand Up @@ -49,7 +49,7 @@ func (s *Server) CreateEntityReconciliationTask(ctx context.Context,
if err != nil {
return nil, err
}
topic = events.TopicQueueReconcileRepoInit
topic = constants.TopicQueueReconcileRepoInit
} else {
return nil, status.Errorf(codes.InvalidArgument, "entity type %s is not supported", entity.GetType())
}
Expand Down
4 changes: 2 additions & 2 deletions internal/eea/eea.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/entities"
"github.com/mindersec/minder/internal/entities/properties/service"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/providers/manager"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/constants"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

Expand Down Expand Up @@ -51,7 +51,7 @@ func NewEEA(querier db.Store, evt interfaces.Publisher, cfg *serverconfig.Aggreg

// Register implements the Consumer interface.
func (e *EEA) Register(r interfaces.Registrar) {
r.Register(events.TopicQueueEntityFlush, e.FlushMessageHandler)
r.Register(constants.TopicQueueEntityFlush, e.FlushMessageHandler)
}

// AggregateMiddleware will pass on the event to the executor engine
Expand Down
14 changes: 7 additions & 7 deletions internal/eea/eea_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
"github.com/mindersec/minder/internal/entities/properties"
psvc "github.com/mindersec/minder/internal/entities/properties/service"
propsvcmock "github.com/mindersec/minder/internal/entities/properties/service/mock"
"github.com/mindersec/minder/internal/events"
mockmanager "github.com/mindersec/minder/internal/providers/manager/mock"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
"github.com/mindersec/minder/pkg/eventer/constants"
)

const (
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestAggregator(t *testing.T) {
aggr.Register(evt)

// This tests that flushing sends messages to the executor engine
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware)

go func() {
t.Log("Running eventer")
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestAggregator(t *testing.T) {
msg, err := inf.BuildMessage()
require.NoError(t, err, "expected no error when building message")

err = evt.Publish(events.TopicQueueEntityFlush, msg.Copy())
err = evt.Publish(constants.TopicQueueEntityFlush, msg.Copy())
require.NoError(t, err, "expected no error when publishing message")
}()
}
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestFlushAll(t *testing.T) {
require.NoError(t, err)

flushedMessages := newTestPubSub()
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add)

go func() {
t.Log("Running eventer")
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) {
flushedMessages := newTestPubSub()

// This tests that flushing sends messages to the executor engine
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add, aggr.AggregateMiddleware)

t.Log("Flushing all")
require.NoError(t, aggr.FlushAll(ctx), "expected no error")
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestFlushAllListFlushFails(t *testing.T) {
require.NoError(t, err)

// This tests that flushing sends messages to the executor engine
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add)

go func() {
t.Log("Running eventer")
Expand Down Expand Up @@ -491,7 +491,7 @@ func TestFlushAllListFlushListsARepoThatGetsDeletedLater(t *testing.T) {
require.NoError(t, err)

// This tests that flushing sends messages to the executor engine
evt.Register(events.TopicQueueEntityEvaluate, flushedMessages.Add)
evt.Register(constants.TopicQueueEntityEvaluate, flushedMessages.Add)

go func() {
t.Log("Running eventer")
Expand Down
4 changes: 2 additions & 2 deletions internal/engine/entities/entity_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/mindersec/minder/internal/events"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

Expand Down Expand Up @@ -187,7 +187,7 @@ func (eiw *EntityInfoWrapper) Publish(evt interfaces.Publisher) error {
return err
}

if err := evt.Publish(events.TopicQueueEntityEvaluate, msg); err != nil {
if err := evt.Publish(constants.TopicQueueEntityEvaluate, msg); err != nil {
return fmt.Errorf("error publishing entity event: %w", err)
}

Expand Down
6 changes: 3 additions & 3 deletions internal/engine/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (

"github.com/mindersec/minder/internal/engine/engcontext"
"github.com/mindersec/minder/internal/engine/entities"
"github.com/mindersec/minder/internal/events"
minderlogger "github.com/mindersec/minder/internal/logger"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

Expand Down Expand Up @@ -72,7 +72,7 @@ func NewExecutorEventHandler(

// Register implements the Consumer interface.
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
r.Register(events.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
}

// Wait waits for all the entity executions to finish.
Expand Down Expand Up @@ -170,7 +170,7 @@ func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
}

// Publish the result of the entity evaluation
if err := e.evt.Publish(events.TopicQueueEntityFlush, msg); err != nil {
if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil {
logger.Err(err).Msg("error publishing flush event")
}
}()
Expand Down
4 changes: 2 additions & 2 deletions internal/engine/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
"github.com/mindersec/minder/internal/engine"
"github.com/mindersec/minder/internal/engine/entities"
mockengine "github.com/mindersec/minder/internal/engine/mock"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/util/testqueue"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
"github.com/mindersec/minder/pkg/eventer/constants"
)

func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {
Expand Down Expand Up @@ -52,7 +52,7 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {

go func() {
t.Log("Running eventer")
evt.Register(events.TopicQueueEntityFlush, pq.Pass)
evt.Register(constants.TopicQueueEntityFlush, pq.Pass)
err := evt.Run(context.Background())
require.NoError(t, err, "failed to run eventer")
}()
Expand Down
20 changes: 10 additions & 10 deletions internal/entities/handlers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/mindersec/minder/internal/entities/models"
"github.com/mindersec/minder/internal/entities/properties"
propertyService "github.com/mindersec/minder/internal/entities/properties/service"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/projects/features"
"github.com/mindersec/minder/internal/providers/manager"
v1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

Expand Down Expand Up @@ -206,8 +206,8 @@ func NewRefreshByIDAndEvaluateHandler(
refreshEntity: entStrategies.NewRefreshEntityByIDStrategy(propSvc, provMgr, store),
createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr),

handlerName: events.TopicQueueRefreshEntityByIDAndEvaluate,
forwardHandlerName: events.TopicQueueEntityEvaluate,
handlerName: constants.TopicQueueRefreshEntityByIDAndEvaluate,
forwardHandlerName: constants.TopicQueueEntityEvaluate,

handlerMiddleware: handlerMiddleware,
}
Expand All @@ -228,8 +228,8 @@ func NewRefreshEntityAndEvaluateHandler(
refreshEntity: entStrategies.NewRefreshEntityByUpstreamPropsStrategy(propSvc, provMgr, store),
createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr),

handlerName: events.TopicQueueRefreshEntityAndEvaluate,
forwardHandlerName: events.TopicQueueEntityEvaluate,
handlerName: constants.TopicQueueRefreshEntityAndEvaluate,
forwardHandlerName: constants.TopicQueueEntityEvaluate,

handlerMiddleware: handlerMiddleware,
}
Expand All @@ -249,8 +249,8 @@ func NewGetEntityAndDeleteHandler(
refreshEntity: entStrategies.NewGetEntityByUpstreamIDStrategy(propSvc),
createMessage: msgStrategies.NewToMinderEntity(),

handlerName: events.TopicQueueGetEntityAndDelete,
forwardHandlerName: events.TopicQueueReconcileEntityDelete,
handlerName: constants.TopicQueueGetEntityAndDelete,
forwardHandlerName: constants.TopicQueueReconcileEntityDelete,

handlerMiddleware: handlerMiddleware,
}
Expand All @@ -271,8 +271,8 @@ func NewAddOriginatingEntityHandler(
refreshEntity: entStrategies.NewAddOriginatingEntityStrategy(propSvc, provMgr, store),
createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr),

handlerName: events.TopicQueueOriginatingEntityAdd,
forwardHandlerName: events.TopicQueueEntityEvaluate,
handlerName: constants.TopicQueueOriginatingEntityAdd,
forwardHandlerName: constants.TopicQueueEntityEvaluate,

handlerMiddleware: handlerMiddleware,
}
Expand All @@ -292,7 +292,7 @@ func NewRemoveOriginatingEntityHandler(
refreshEntity: entStrategies.NewDelOriginatingEntityStrategy(propSvc, provMgr, store),
createMessage: msgStrategies.NewCreateEmpty(),

handlerName: events.TopicQueueOriginatingEntityDelete,
handlerName: constants.TopicQueueOriginatingEntityDelete,

handlerMiddleware: handlerMiddleware,
}
Expand Down
16 changes: 8 additions & 8 deletions internal/entities/handlers/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/mindersec/minder/internal/entities/properties"
"github.com/mindersec/minder/internal/entities/properties/service"
"github.com/mindersec/minder/internal/entities/properties/service/mock/fixtures"
"github.com/mindersec/minder/internal/events"
stubeventer "github.com/mindersec/minder/internal/events/stubs"
mockgithub "github.com/mindersec/minder/internal/providers/github/mock"
ghprops "github.com/mindersec/minder/internal/providers/github/properties"
Expand All @@ -33,6 +32,7 @@ import (
provManFixtures "github.com/mindersec/minder/internal/providers/manager/mock/fixtures"
"github.com/mindersec/minder/internal/reconcilers/messages"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/constants"
"github.com/mindersec/minder/pkg/eventer/interfaces"
provifv1 "github.com/mindersec/minder/pkg/providers/v1"
)
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) {
df.WithTransaction(),
),
expectedPublish: true,
topic: events.TopicQueueEntityEvaluate,
topic: constants.TopicQueueEntityEvaluate,
checkWmMsg: checkRepoMessage,
},
{
Expand All @@ -290,7 +290,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) {
},
mockStoreFunc: df.NewMockStore(),
expectedPublish: false,
topic: events.TopicQueueEntityEvaluate,
topic: constants.TopicQueueEntityEvaluate,
checkWmMsg: checkRepoMessage,
},
{
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) {
df.WithTransaction(),
),
expectedPublish: true,
topic: events.TopicQueueEntityEvaluate,
topic: constants.TopicQueueEntityEvaluate,
checkWmMsg: checkRepoMessage,
},
{
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) {
df.WithTransaction(),
),
expectedPublish: true,
topic: events.TopicQueueEntityEvaluate,
topic: constants.TopicQueueEntityEvaluate,
checkWmMsg: checkRepoMessage,
},
{
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) {
df.WithSuccessfulGetFeatureInProject(true),
),
expectedPublish: true,
topic: events.TopicQueueEntityEvaluate,
topic: constants.TopicQueueEntityEvaluate,
checkWmMsg: checkRepoMessage,
},
{
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) {
)
},
expectedPublish: true,
topic: events.TopicQueueEntityEvaluate,
topic: constants.TopicQueueEntityEvaluate,
checkWmMsg: checkPullRequestMessage,
},
{
Expand Down Expand Up @@ -684,7 +684,7 @@ func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) {
},
expectedPublish: true,
checkWmMsg: checkRepoEntityMessage,
topic: events.TopicQueueReconcileEntityDelete,
topic: constants.TopicQueueReconcileEntityDelete,
},
{
name: "NewGetEntityAndDeleteHandler: failure to get entity does not publish",
Expand Down
15 changes: 8 additions & 7 deletions internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/mindersec/minder/internal/events/nats"
eventersql "github.com/mindersec/minder/internal/events/sql"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/constants"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

Expand Down Expand Up @@ -77,8 +78,8 @@ func NewEventer(ctx context.Context, cfg *serverconfig.EventConfig) (interfaces.

metricsBuilder := metrics.NewPrometheusMetricsBuilder(
promgo.DefaultRegisterer,
metricsNamespace,
metricsSubsystem)
constants.MetricsNamespace,
constants.MetricsSubsystem)
metricsBuilder.AddPrometheusRouterMetrics(router)
zerolog.Ctx(ctx).Info().Msg("Router Metrics registered")

Expand All @@ -94,7 +95,7 @@ func NewEventer(ctx context.Context, cfg *serverconfig.EventConfig) (interfaces.
return nil, fmt.Errorf("failed instantiating driver: %w", err)
}

poisonQueueMiddleware, err := middleware.PoisonQueue(pub, DeadLetterQueueTopic)
poisonQueueMiddleware, err := middleware.PoisonQueue(pub, constants.DeadLetterQueueTopic)
if err != nil {
return nil, fmt.Errorf("failed instantiating poison queue: %w", err)
}
Expand Down Expand Up @@ -143,13 +144,13 @@ func instantiateDriver(
cfg *serverconfig.EventConfig,
) (message.Publisher, message.Subscriber, common.DriverCloser, error) {
switch driver {
case GoChannelDriver:
case constants.GoChannelDriver:
zerolog.Ctx(ctx).Info().Msg("Using go-channel driver")
return gochannel.BuildGoChannelDriver(ctx, cfg)
case SQLDriver:
case constants.SQLDriver:
zerolog.Ctx(ctx).Info().Msg("Using SQL driver")
return eventersql.BuildPostgreSQLDriver(ctx, cfg)
case NATSDriver:
case constants.NATSDriver:
zerolog.Ctx(ctx).Info().Msg("Using NATS driver")
return nats.BuildNatsChannelDriver(cfg)
default:
Expand Down Expand Up @@ -190,7 +191,7 @@ func (e *eventer) Publish(topic string, messages ...*message.Message) error {
"component": "eventer",
"function": "Publish",
})
msg.Metadata.Set(PublishedKey, time.Now().Format(time.RFC3339))
msg.Metadata.Set(constants.PublishedKey, time.Now().Format(time.RFC3339))
}
}

Expand Down
Loading

0 comments on commit a949325

Please sign in to comment.