diff --git a/config/reminder-config.yaml.example b/config/reminder-config.yaml.example index 2a95067d3f..2f53856e3e 100644 --- a/config/reminder-config.yaml.example +++ b/config/reminder-config.yaml.example @@ -18,10 +18,18 @@ logging: level: "debug" events: - sql_connection: - dbhost: "watermill-postgres" - dbport: 5432 - dbuser: postgres - dbpass: postgres - dbname: watermill - sslmode: disable + driver: "sql" +# only sql and cloudevents-nats drivers are supported +# driver: "cloudevents-nats" + sql: + connection: + dbhost: "watermill-postgres" + dbport: 5432 + dbuser: postgres + dbpass: postgres + dbname: watermill + sslmode: disable +# nats: +# url: "nats://nats:4222" +# prefix: "minder" +# queue: "minder" diff --git a/internal/reminder/publisher.go b/internal/reminder/publisher.go new file mode 100644 index 0000000000..d8ef59cff3 --- /dev/null +++ b/internal/reminder/publisher.go @@ -0,0 +1,22 @@ +// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors +// SPDX-License-Identifier: Apache-2.0 + +package reminder + +import ( + "context" + "fmt" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/mindersec/minder/pkg/eventer" +) + +func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, error) { + pub, err := eventer.New(ctx, nil, &r.cfg.EventConfig) + if err != nil { + return nil, fmt.Errorf("failed to create publisher: %w", err) + } + + return pub, nil +} diff --git a/internal/reminder/reminder.go b/internal/reminder/reminder.go index 7ec9a61974..22cb041e83 100644 --- a/internal/reminder/reminder.go +++ b/internal/reminder/reminder.go @@ -16,7 +16,6 @@ import ( "github.com/rs/zerolog" "github.com/mindersec/minder/internal/db" - "github.com/mindersec/minder/internal/events/common" remindermessages "github.com/mindersec/minder/internal/reminder/messages" reminderconfig "github.com/mindersec/minder/pkg/config/reminder" "github.com/mindersec/minder/pkg/eventer/constants" @@ -43,7 +42,6 @@ type reminder struct { ticker *time.Ticker eventPublisher message.Publisher - eventDBCloser common.DriverCloser } // NewReminder creates a new reminder instance @@ -59,13 +57,12 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con logger := zerolog.Ctx(ctx) logger.Info().Msgf("initial repository cursor: %s", r.repositoryCursor) - pub, cl, err := r.setupSQLPublisher(ctx) + pub, err := r.getMessagePublisher(ctx) if err != nil { return nil, err } r.eventPublisher = pub - r.eventDBCloser = cl return r, nil } @@ -118,7 +115,10 @@ func (r *reminder) Stop() { } r.stopOnce.Do(func() { close(r.stop) - r.eventDBCloser() + err := r.eventPublisher.Close() + if err != nil { + zerolog.Ctx(context.Background()).Error().Err(err).Msg("error closing event publisher") + } }) } diff --git a/internal/reminder/sql_publisher.go b/internal/reminder/sql_publisher.go deleted file mode 100644 index 383bd08548..0000000000 --- a/internal/reminder/sql_publisher.go +++ /dev/null @@ -1,44 +0,0 @@ -// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors -// SPDX-License-Identifier: Apache-2.0 - -package reminder - -import ( - "context" - "fmt" - - "github.com/ThreeDotsLabs/watermill" - watermillsql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/rs/zerolog" - - "github.com/mindersec/minder/internal/events/common" -) - -func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) { - logger := zerolog.Ctx(ctx) - - db, _, err := r.cfg.EventConfig.Connection.GetDBConnection(ctx) - if err != nil { - return nil, nil, fmt.Errorf("unable to connect to events database: %w", err) - } - - publisher, err := watermillsql.NewPublisher( - db, - watermillsql.PublisherConfig{ - SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{}, - AutoInitializeSchema: true, - }, - watermill.NewStdLogger(false, false), - ) - if err != nil { - return nil, nil, fmt.Errorf("failed to create SQL publisher: %w", err) - } - - return publisher, func() { - err := db.Close() - if err != nil { - logger.Printf("error closing events database connection: %v", err) - } - }, nil -} diff --git a/pkg/config/reminder/config.go b/pkg/config/reminder/config.go index c53b5c1129..da16082e2a 100644 --- a/pkg/config/reminder/config.go +++ b/pkg/config/reminder/config.go @@ -5,20 +5,23 @@ package reminder import ( + "fmt" "strings" "github.com/spf13/pflag" "github.com/spf13/viper" "github.com/mindersec/minder/pkg/config" + serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" ) // Config contains the configuration for the reminder service type Config struct { - Database config.DatabaseConfig `mapstructure:"database"` - RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"` - EventConfig EventConfig `mapstructure:"events"` - LoggingConfig LoggingConfig `mapstructure:"logging"` + Database config.DatabaseConfig `mapstructure:"database"` + RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"` + EventConfig serverconfig.EventConfig `mapstructure:"events"` + LoggingConfig LoggingConfig `mapstructure:"logging"` } // Validate validates the configuration @@ -28,6 +31,11 @@ func (c Config) Validate() error { return err } + err = validateEventConfig(c.EventConfig) + if err != nil { + return err + } + return nil } @@ -52,3 +60,14 @@ func RegisterReminderFlags(v *viper.Viper, flags *pflag.FlagSet) error { return registerRecurrenceFlags(v, flags) } + +func validateEventConfig(cfg serverconfig.EventConfig) error { + switch cfg.Driver { + case constants.NATSDriver: + case constants.SQLDriver: + default: + return fmt.Errorf("events.driver %s is not supported", cfg.Driver) + } + + return nil +} diff --git a/pkg/config/reminder/config_test.go b/pkg/config/reminder/config_test.go index 37846e706f..956444602d 100644 --- a/pkg/config/reminder/config_test.go +++ b/pkg/config/reminder/config_test.go @@ -5,6 +5,7 @@ package reminder_test import ( "bytes" + "fmt" "os" "testing" "time" @@ -16,6 +17,8 @@ import ( "github.com/mindersec/minder/pkg/config" "github.com/mindersec/minder/pkg/config/reminder" + serverconfig "github.com/mindersec/minder/pkg/config/server" + "github.com/mindersec/minder/pkg/eventer/constants" ) func TestValidateConfig(t *testing.T) { @@ -34,9 +37,12 @@ func TestValidateConfig(t *testing.T) { BatchSize: 100, MinElapsed: parseTimeDuration(t, "1h"), }, - EventConfig: reminder.EventConfig{ - Connection: config.DatabaseConfig{ - Port: 8080, + EventConfig: serverconfig.EventConfig{ + Driver: constants.SQLDriver, + SQLPubSub: serverconfig.SQLEventConfig{ + Connection: config.DatabaseConfig{ + Port: 8080, + }, }, }, }, @@ -49,6 +55,9 @@ func TestValidateConfig(t *testing.T) { BatchSize: 100, MinElapsed: parseTimeDuration(t, "1h"), }, + EventConfig: serverconfig.EventConfig{ + Driver: constants.SQLDriver, + }, }, errMsg: "cannot be negative", }, @@ -60,9 +69,26 @@ func TestValidateConfig(t *testing.T) { BatchSize: 100, MinElapsed: parseTimeDuration(t, "-1h"), }, + EventConfig: serverconfig.EventConfig{ + Driver: constants.SQLDriver, + }, }, errMsg: "cannot be negative", }, + { + name: "UnsupportedDriver", + config: reminder.Config{ + RecurrenceConfig: reminder.RecurrenceConfig{ + Interval: parseTimeDuration(t, "1h"), + BatchSize: 100, + MinElapsed: parseTimeDuration(t, "1h"), + }, + EventConfig: serverconfig.EventConfig{ + Driver: constants.GoChannelDriver, + }, + }, + errMsg: fmt.Sprintf("%s is not supported", constants.GoChannelDriver), + }, } for _, tt := range tests { @@ -153,10 +179,9 @@ func TestSetViperDefaults(t *testing.T) { require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.interval"))) require.Equal(t, 100, v.GetInt("recurrence.batch_size")) require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.min_elapsed"))) - require.Equal(t, "reminder", v.GetString("events.sql_connection.dbname")) - require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost")) - require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost")) - require.Equal(t, "postgres", v.GetString("events.sql_connection.dbuser")) + require.Equal(t, "watermill", v.GetString("events.sql.connection.dbname")) + require.Equal(t, "localhost", v.GetString("events.sql.connection.dbhost")) + require.Equal(t, "postgres", v.GetString("events.sql.connection.dbuser")) } // TestOverrideConfigByEnvVar tests that the configuration can be overridden by environment variables diff --git a/pkg/config/reminder/events.go b/pkg/config/reminder/events.go deleted file mode 100644 index c4c9ea3d88..0000000000 --- a/pkg/config/reminder/events.go +++ /dev/null @@ -1,16 +0,0 @@ -// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors -// SPDX-License-Identifier: Apache-2.0 - -package reminder - -import ( - "github.com/mindersec/minder/pkg/config" -) - -// EventConfig is the configuration for reminder's eventing system. -type EventConfig struct { - // Connection is the configuration for the SQL event driver - // - // nolint: lll - Connection config.DatabaseConfig `mapstructure:"sql_connection" default:"{\"dbname\":\"reminder\",\"dbhost\":\"reminder-event-postgres\"}"` -}