Skip to content

Commit

Permalink
Add NATS publisher support to reminder
Browse files Browse the repository at this point in the history
Signed-off-by: Vyom Yadav <[email protected]>
  • Loading branch information
Vyom-Yadav committed Jan 3, 2025
1 parent ad22251 commit 926ca79
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 83 deletions.
22 changes: 15 additions & 7 deletions config/reminder-config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ logging:
level: "debug"

events:
sql_connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
driver: "sql"
# only sql and cloudevents-nats drivers are supported
# driver: "cloudevents-nats"
sql:
connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
# nats:
# url: "nats://nats:4222"
# prefix: "minder"
# queue: "minder"
22 changes: 22 additions & 0 deletions internal/reminder/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

package reminder

import (
"context"
"fmt"

"github.com/ThreeDotsLabs/watermill/message"

"github.com/mindersec/minder/pkg/eventer"
)

func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, error) {
pub, err := eventer.New(ctx, nil, &r.cfg.EventConfig)
if err != nil {
return nil, fmt.Errorf("failed to create publisher: %w", err)
}

return pub, nil
}
10 changes: 5 additions & 5 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/events/common"
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
"github.com/mindersec/minder/pkg/eventer/constants"
Expand All @@ -43,7 +42,6 @@ type reminder struct {
ticker *time.Ticker

eventPublisher message.Publisher
eventDBCloser common.DriverCloser
}

// NewReminder creates a new reminder instance
Expand All @@ -59,13 +57,12 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con
logger := zerolog.Ctx(ctx)
logger.Info().Msgf("initial repository cursor: %s", r.repositoryCursor)

pub, cl, err := r.setupSQLPublisher(ctx)
pub, err := r.getMessagePublisher(ctx)
if err != nil {
return nil, err
}

r.eventPublisher = pub
r.eventDBCloser = cl
return r, nil
}

Expand Down Expand Up @@ -118,7 +115,10 @@ func (r *reminder) Stop() {
}
r.stopOnce.Do(func() {
close(r.stop)
r.eventDBCloser()
err := r.eventPublisher.Close()
if err != nil {
zerolog.Ctx(context.Background()).Error().Err(err).Msg("error closing event publisher")
}
})
}

Expand Down
44 changes: 0 additions & 44 deletions internal/reminder/sql_publisher.go

This file was deleted.

27 changes: 23 additions & 4 deletions pkg/config/reminder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
package reminder

import (
"fmt"
"strings"

"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/mindersec/minder/pkg/config"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/constants"
)

// Config contains the configuration for the reminder service
type Config struct {
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
}

// Validate validates the configuration
Expand All @@ -28,6 +31,11 @@ func (c Config) Validate() error {
return err
}

err = validateEventConfig(c.EventConfig)
if err != nil {
return err
}

return nil
}

Expand All @@ -52,3 +60,14 @@ func RegisterReminderFlags(v *viper.Viper, flags *pflag.FlagSet) error {

return registerRecurrenceFlags(v, flags)
}

func validateEventConfig(cfg serverconfig.EventConfig) error {
switch cfg.Driver {
case constants.NATSDriver:
case constants.SQLDriver:
default:
return fmt.Errorf("events.driver %s is not supported", cfg.Driver)
}

return nil
}
39 changes: 32 additions & 7 deletions pkg/config/reminder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package reminder_test

import (
"bytes"
"fmt"
"os"
"testing"
"time"
Expand All @@ -16,6 +17,8 @@ import (

"github.com/mindersec/minder/pkg/config"
"github.com/mindersec/minder/pkg/config/reminder"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/eventer/constants"
)

func TestValidateConfig(t *testing.T) {
Expand All @@ -34,9 +37,12 @@ func TestValidateConfig(t *testing.T) {
BatchSize: 100,
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: reminder.EventConfig{
Connection: config.DatabaseConfig{
Port: 8080,
EventConfig: serverconfig.EventConfig{
Driver: constants.SQLDriver,
SQLPubSub: serverconfig.SQLEventConfig{
Connection: config.DatabaseConfig{
Port: 8080,
},
},
},
},
Expand All @@ -49,6 +55,9 @@ func TestValidateConfig(t *testing.T) {
BatchSize: 100,
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: serverconfig.EventConfig{
Driver: constants.SQLDriver,
},
},
errMsg: "cannot be negative",
},
Expand All @@ -60,9 +69,26 @@ func TestValidateConfig(t *testing.T) {
BatchSize: 100,
MinElapsed: parseTimeDuration(t, "-1h"),
},
EventConfig: serverconfig.EventConfig{
Driver: constants.SQLDriver,
},
},
errMsg: "cannot be negative",
},
{
name: "UnsupportedDriver",
config: reminder.Config{
RecurrenceConfig: reminder.RecurrenceConfig{
Interval: parseTimeDuration(t, "1h"),
BatchSize: 100,
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: serverconfig.EventConfig{
Driver: constants.GoChannelDriver,
},
},
errMsg: fmt.Sprintf("%s is not supported", constants.GoChannelDriver),
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -153,10 +179,9 @@ func TestSetViperDefaults(t *testing.T) {
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.interval")))
require.Equal(t, 100, v.GetInt("recurrence.batch_size"))
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.min_elapsed")))
require.Equal(t, "reminder", v.GetString("events.sql_connection.dbname"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql_connection.dbuser"))
require.Equal(t, "watermill", v.GetString("events.sql.connection.dbname"))
require.Equal(t, "localhost", v.GetString("events.sql.connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql.connection.dbuser"))
}

// TestOverrideConfigByEnvVar tests that the configuration can be overridden by environment variables
Expand Down
16 changes: 0 additions & 16 deletions pkg/config/reminder/events.go

This file was deleted.

0 comments on commit 926ca79

Please sign in to comment.