Skip to content

Commit

Permalink
Move part of events from internal to pkg
Browse files Browse the repository at this point in the history
Signed-off-by: Radoslav Dimitrov <[email protected]>
  • Loading branch information
rdimitrov committed Oct 25, 2024
1 parent 2f4770a commit bb0c951
Show file tree
Hide file tree
Showing 37 changed files with 172 additions and 132 deletions.
2 changes: 1 addition & 1 deletion internal/controlplane/handlers_entities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (

"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/engcontext"
mockevents "github.com/mindersec/minder/internal/events/mock"
mockgh "github.com/mindersec/minder/internal/providers/github/mock"
"github.com/mindersec/minder/internal/providers/manager"
mockmanager "github.com/mindersec/minder/internal/providers/manager/mock"
rf "github.com/mindersec/minder/internal/repositories/mock/fixtures"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
mockevents "github.com/mindersec/minder/pkg/eventer/interfaces/mock"
)

func TestServer_ReconcileEntityRegistration(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions internal/controlplane/handlers_oauth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/engcontext"
mockprops "github.com/mindersec/minder/internal/entities/properties/service/mock"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/providers"
"github.com/mindersec/minder/internal/providers/dockerhub"
mockclients "github.com/mindersec/minder/internal/providers/github/clients/mock"
Expand All @@ -51,6 +50,7 @@ import (
"github.com/mindersec/minder/internal/providers/session"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
provinfv1 "github.com/mindersec/minder/pkg/providers/v1"
)

Expand Down Expand Up @@ -378,7 +378,7 @@ func TestGetAuthorizationURL(t *testing.T) {
store := mockdb.NewMockStore(ctrl)
tc.buildStubs(store)

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down Expand Up @@ -890,7 +890,7 @@ func TestHandleGitHubAppCallback(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func TestVerifyProviderCredential(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import (
"github.com/mindersec/minder/internal/authz/mock"
mockcrypto "github.com/mindersec/minder/internal/crypto/mock"
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/flags"
"github.com/mindersec/minder/internal/marketplaces"
"github.com/mindersec/minder/internal/projects"
"github.com/mindersec/minder/internal/providers"
mockprov "github.com/mindersec/minder/internal/providers/github/service/mock"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
)

const (
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestDeleteUser_gRPC(t *testing.T) {
}))
defer testServer.Close()

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down
6 changes: 3 additions & 3 deletions internal/controlplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/mindersec/minder/internal/crypto"
"github.com/mindersec/minder/internal/db"
propSvc "github.com/mindersec/minder/internal/entities/properties/service"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/history"
"github.com/mindersec/minder/internal/invites"
"github.com/mindersec/minder/internal/logger"
Expand All @@ -63,6 +62,7 @@ import (
"github.com/mindersec/minder/internal/util"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/interfaces"
"github.com/mindersec/minder/pkg/profiles"
"github.com/mindersec/minder/pkg/ruletypes"
)
Expand All @@ -81,7 +81,7 @@ var (
type Server struct {
store db.Store
cfg *serverconfig.Config
evt events.Publisher
evt interfaces.Publisher
mt metrics.Metrics
grpcServer *grpc.Server
jwt jwt.Validator
Expand Down Expand Up @@ -126,7 +126,7 @@ type Server struct {
// NewServer creates a new server instance
func NewServer(
store db.Store,
evt events.Publisher,
evt interfaces.Publisher,
cfg *serverconfig.Config,
serverMetrics metrics.Metrics,
jwtValidator jwt.Validator,
Expand Down
4 changes: 2 additions & 2 deletions internal/controlplane/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import (
"github.com/mindersec/minder/internal/controlplane/metrics"
"github.com/mindersec/minder/internal/crypto"
mock_service "github.com/mindersec/minder/internal/entities/properties/service/mock"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/internal/providers"
ghclient "github.com/mindersec/minder/internal/providers/github/clients"
ghService "github.com/mindersec/minder/internal/providers/github/service"
mock_reposvc "github.com/mindersec/minder/internal/repositories/mock"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
)

const bufSize = 1024 * 1024
Expand Down Expand Up @@ -78,7 +78,7 @@ func newDefaultServer(
) *Server {
t.Helper()

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down
7 changes: 4 additions & 3 deletions internal/eea/eea.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,21 @@ import (
"github.com/mindersec/minder/internal/providers/manager"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

// EEA is the Event Execution Aggregator
type EEA struct {
querier db.Store
evt events.Publisher
evt interfaces.Publisher
cfg *serverconfig.AggregatorConfig

entityFetcher service.PropertiesService
provMan manager.ProviderManager
}

// NewEEA creates a new EEA
func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.AggregatorConfig,
func NewEEA(querier db.Store, evt interfaces.Publisher, cfg *serverconfig.AggregatorConfig,
ef service.PropertiesService, provMan manager.ProviderManager) *EEA {
return &EEA{
querier: querier,
Expand All @@ -49,7 +50,7 @@ func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.Aggregator
}

// Register implements the Consumer interface.
func (e *EEA) Register(r events.Registrar) {
func (e *EEA) Register(r interfaces.Registrar) {
r.Register(events.TopicQueueEntityFlush, e.FlushMessageHandler)
}

Expand Down
11 changes: 6 additions & 5 deletions internal/eea/eea_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
mockmanager "github.com/mindersec/minder/internal/providers/manager/mock"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
)

const (
Expand All @@ -55,7 +56,7 @@ func TestAggregator(t *testing.T) {

projectID, repoID := createNeededEntities(ctx, t, testQueries)

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{
BufferSize: concurrentEvents,
Expand Down Expand Up @@ -345,7 +346,7 @@ func TestFlushAll(t *testing.T) {
propsvc := propsvcmock.NewMockPropertiesService(ctrl)
provman := mockmanager.NewMockProviderManager(ctrl)

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down Expand Up @@ -394,7 +395,7 @@ func TestFlushAllListFlushIsEmpty(t *testing.T) {
require.NoError(t, err, "expected no error when creating embedded store")
t.Cleanup(td)

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down Expand Up @@ -433,7 +434,7 @@ func TestFlushAllListFlushFails(t *testing.T) {

flushedMessages := newTestPubSub()

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down Expand Up @@ -483,7 +484,7 @@ func TestFlushAllListFlushListsARepoThatGetsDeletedLater(t *testing.T) {

flushedMessages := newTestPubSub()

evt, err := events.Setup(ctx, &serverconfig.EventConfig{
evt, err := eventer.New(ctx, &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{},
})
Expand Down
4 changes: 2 additions & 2 deletions internal/email/awsses/awsses.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/email"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

const (
Expand Down Expand Up @@ -47,7 +47,7 @@ func New(ctx context.Context, sender, region string) (*awsSES, error) {
}

// Register implements the Consumer interface.
func (a *awsSES) Register(reg events.Registrar) {
func (a *awsSES) Register(reg interfaces.Registrar) {
reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error {
var e email.MailEventPayload

Expand Down
4 changes: 2 additions & 2 deletions internal/email/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/email"
"github.com/mindersec/minder/internal/events"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

type noop struct {
Expand All @@ -24,7 +24,7 @@ func New() *noop {
}

// Register implements the Consumer interface.
func (_ *noop) Register(reg events.Registrar) {
func (_ *noop) Register(reg interfaces.Registrar) {
reg.Register(email.TopicQueueInviteEmail, func(msg *message.Message) error {
var e email.MailEventPayload

Expand Down
3 changes: 2 additions & 1 deletion internal/engine/entities/entity_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/mindersec/minder/internal/events"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

// EntityInfoWrapper is a helper struct to gather information
Expand Down Expand Up @@ -180,7 +181,7 @@ func (eiw *EntityInfoWrapper) BuildMessage() (*message.Message, error) {
}

// Publish builds a message.Message and publishes it to the event bus
func (eiw *EntityInfoWrapper) Publish(evt events.Publisher) error {
func (eiw *EntityInfoWrapper) Publish(evt interfaces.Publisher) error {
msg, err := eiw.BuildMessage()
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions internal/engine/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/mindersec/minder/internal/events"
minderlogger "github.com/mindersec/minder/internal/logger"
pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
"github.com/mindersec/minder/pkg/eventer/interfaces"
)

const (
Expand All @@ -32,7 +33,7 @@ const (
// ExecutorEventHandler is responsible for consuming entity events, passing
// entities to the executor, and then publishing the results.
type ExecutorEventHandler struct {
evt events.Publisher
evt interfaces.Publisher
handlerMiddleware []message.HandlerMiddleware
wgEntityEventExecution *sync.WaitGroup
executor Executor
Expand All @@ -46,7 +47,7 @@ type ExecutorEventHandler struct {
// NewExecutorEventHandler creates the event handler for the executor
func NewExecutorEventHandler(
ctx context.Context,
evt events.Publisher,
evt interfaces.Publisher,
handlerMiddleware []message.HandlerMiddleware,
executor Executor,
) *ExecutorEventHandler {
Expand All @@ -70,7 +71,7 @@ func NewExecutorEventHandler(
}

// Register implements the Consumer interface.
func (e *ExecutorEventHandler) Register(r events.Registrar) {
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
r.Register(events.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
}

Expand Down
3 changes: 2 additions & 1 deletion internal/engine/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/mindersec/minder/internal/util/testqueue"
minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer"
)

func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {
Expand All @@ -38,7 +39,7 @@ func TestExecutorEventHandler_handleEntityEvent(t *testing.T) {

// -- end expectations

evt, err := events.Setup(context.Background(), &serverconfig.EventConfig{
evt, err := eventer.New(context.Background(), &serverconfig.EventConfig{
Driver: "go-channel",
GoChannel: serverconfig.GoChannelEventConfig{
BlockPublishUntilSubscriberAck: true,
Expand Down
Loading

0 comments on commit bb0c951

Please sign in to comment.