Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PostgreSQL Messaging Pub/Sub Watermill #1680

Merged
merged 11 commits into from
Nov 22, 2023
28 changes: 26 additions & 2 deletions cmd/server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package app

import (
"database/sql"
"fmt"
"log"
"net/url"
Expand Down Expand Up @@ -65,13 +66,36 @@ var serveCmd = &cobra.Command{
if err != nil {
return fmt.Errorf("unable to connect to database: %w", err)
}
defer dbConn.Close()
defer func(dbConn *sql.DB) {
err := dbConn.Close()
if err != nil {
log.Printf("error closing database connection: %v", err)
}
}(dbConn)

store := db.NewStore(dbConn)

// Get a separate database connection for eventing. This is done to separate concerns from the main database.
// It is used for the lifetime of the application, hence it is initialised here.
// This is not ideal from a Clean Code perspective, but it is the simplest way to
// ensure that the connection is closed when the application exits.
var dbConnEvents *sql.DB
if cfg.Events.Driver == events.SQLDriver {
dbConnEvents, _, err = cfg.Events.SQLPubSub.GetDBConnection(ctx)
if err != nil {
return fmt.Errorf("unable to connect to events database: %w", err)
}
defer func(dbConnEvents *sql.DB) {
err := dbConnEvents.Close()
if err != nil {
log.Printf("error closing events database connection: %v", err)
}
}(dbConnEvents)
}

errg, ctx := errgroup.WithContext(ctx)

evt, err := events.Setup(ctx, &cfg.Events)
evt, err := events.Setup(ctx, &cfg.Events, dbConnEvents)
if err != nil {
log.Printf("Failed to set up eventer: %v", err)
return err
Expand Down
5 changes: 5 additions & 0 deletions deployment/helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ spec:
value: "{{ .Values.trusty.endpoint }}"
- name: "SIGSTORE_NO_CACHE"
value: "true"
- name: "MINDER_DB_PASSWORD"
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
valueFrom:
secretKeyRef:
name: minder-identity-secrets
key: password
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
{{- if .Values.deploymentSettings.extraEnv }}
{{- toYaml .Values.deploymentSettings.extraEnv | nindent 10 }}
{{- end }}
Expand Down
5 changes: 5 additions & 0 deletions deployment/helm_tests/basic.yaml-out
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ spec:
value: "http://pi.pi:8000"
- name: "SIGSTORE_NO_CACHE"
value: "true"
- name: "MINDER_DB_PASSWORD"
valueFrom:
secretKeyRef:
name: minder-identity-secrets
key: password

# ko will always specify a digest, so we don't need to worry about
# CRI image caching
Expand Down
5 changes: 5 additions & 0 deletions deployment/helm_tests/sidecar.yaml-out
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ spec:
value: "http://pi.pi:8000"
- name: "SIGSTORE_NO_CACHE"
value: "true"
- name: "MINDER_DB_PASSWORD"
valueFrom:
secretKeyRef:
name: minder-identity-secrets
key: password
- name: PGPASSFILE
value: /secrets/db/.pgpass

Expand Down
20 changes: 20 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ services:
"--http-host=0.0.0.0",
"--metric-host=0.0.0.0",
"--db-host=postgres",
"--db-events-host=postgres_queue",
"--config=/app/config.yaml",
# If you don't want to store your GitHub client ID and secret in the main
# config file, point to them here:
Expand Down Expand Up @@ -60,6 +61,8 @@ services:
depends_on:
postgres:
condition: service_healthy
postgres_queue:
condition: service_healthy
keycloak:
condition: service_healthy
# migrate:
Expand Down Expand Up @@ -109,6 +112,23 @@ services:
retries: 5
networks:
- app_net
postgres_queue:
container_name: postgres_queue_container
image: postgres:15-alpine
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: minder-queue
networks:
- app_net
ports:
- "5433:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
interval: 10s
timeout: 5s
retries: 5

keycloak:
container_name: keycloak_container
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
)

require (
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ecr v1.20.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ecrpublic v1.18.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ github.com/ThalesIgnite/crypto11 v1.2.5/go.mod h1:ILDKtnCKiQ7zRoNxcp36Y1ZR8LBPmR
github.com/ThreeDotsLabs/watermill v1.1.1/go.mod h1:Qd1xNFxolCAHCzcMrm6RnjW0manbvN+DJVWc1MWRFlI=
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 h1:wswlLYY0Jc0tloj3lty4Y+VTEA8AM1vYfrIDwWtqyJk=
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0/go.mod h1:83l/4sKaLHwoHJlrAsDLaXcHN+QOHHntAAyabNmiuO4=
github.com/acomagu/bufpipe v1.0.4 h1:e3H4WUzM3npvo5uv95QuJM3cQspFNtFBzvJ2oNjKIDQ=
github.com/acomagu/bufpipe v1.0.4/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4=
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
Expand Down
38 changes: 37 additions & 1 deletion internal/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,42 @@ func RegisterDatabaseFlags(v *viper.Viper, flags *pflag.FlagSet) error {
return err
}

return util.BindConfigFlagWithShort(
err = util.BindConfigFlagWithShort(
v, flags, "database.sslmode", "db-sslmode", "s", "disable", "Database sslmode", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
v, flags, "events.sql.dbhost", "db-events-host", "Q", "localhost", "Database Events host", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlag(
v, flags, "events.sql.dbport", "db-events-port", 5432, "Database Events port", flags.Int)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "events.sql.dbuser", "db-events-user", "U", "postgres", "Database Events user", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "events.sql.dbpass", "db-events-pass", "W", "postgres", "Database Events password", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "events.sql.dbname", "db-events-name", "N", "minder-queue", "Database Events name", flags.StringP)
if err != nil {
return err
}

return util.BindConfigFlagWithShort(
v, flags, "events.sql.sslmode", "db-events-sslmode", "M", "disable", "Database Events sslmode", flags.StringP)
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 2 additions & 0 deletions internal/config/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type EventConfig struct {
RouterCloseTimeout int64 `mapstructure:"router_close_timeout" default:"10"`
// GoChannel is the configuration for the go channel event driver
GoChannel GoChannelEventConfig `mapstructure:"go-channel" default:"{}"`
// SQLPubSub is the configuration for the database event driver
SQLPubSub DatabaseConfig `mapstructure:"sql" default:"{}"`
// Aggregator is the configuration for the event aggregator middleware
Aggregator AggregatorConfig `mapstructure:"aggregator" default:"{}"`
}
Expand Down
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestCreateUser_gRPC(t *testing.T) {
evt, err := events.Setup(context.Background(), &config.EventConfig{
Driver: "go-channel",
GoChannel: config.GoChannelEventConfig{},
})
}, nil)
require.NoError(t, err, "failed to setup eventer")
server, err := NewServer(mockStore, evt, NewMetrics(), &config.Config{
Salt: config.DefaultConfigForTest().Salt,
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestDeleteUser_gRPC(t *testing.T) {
evt, err := events.Setup(context.Background(), &config.EventConfig{
Driver: "go-channel",
GoChannel: config.GoChannelEventConfig{},
})
}, nil)
require.NoError(t, err, "failed to setup eventer")
server, err := NewServer(mockStore, evt, NewMetrics(), &config.Config{
Salt: config.DefaultConfigForTest().Salt,
Expand Down
2 changes: 1 addition & 1 deletion internal/controlplane/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newDefaultServer(t *testing.T, mockStore *mockdb.MockStore) *Server {
evt, err := events.Setup(context.Background(), &config.EventConfig{
Driver: "go-channel",
GoChannel: config.GoChannelEventConfig{},
})
}, nil)
require.NoError(t, err, "failed to setup eventer")

var c *config.Config
Expand Down
2 changes: 1 addition & 1 deletion internal/eea/eea_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestAggregator(t *testing.T) {
BufferSize: concurrentEvents,
BlockPublishUntilSubscriberAck: true,
},
})
}, nil)
require.NoError(t, err)

// we'll wait 2 seconds for the lock to be available
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ default allow = true`,
GoChannel: config.GoChannelEventConfig{
BlockPublishUntilSubscriberAck: true,
},
})
}, nil)
require.NoError(t, err, "failed to setup eventer")

go func() {
Expand Down
44 changes: 40 additions & 4 deletions internal/events/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package events

import (
"context"
"database/sql"
"errors"
"fmt"
"reflect"
"runtime"
"time"

"github.com/ThreeDotsLabs/watermill"
watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/metrics"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
Expand All @@ -43,6 +45,9 @@ const (
ProviderTypeKey = "provider"
ProviderSourceKey = "source"
GithubWebhookEventTypeKey = "type"

GoChannelDriver = "go-channel"
SQLDriver = "sql"
)

const (
Expand Down Expand Up @@ -98,7 +103,7 @@ var _ message.Publisher = (*Eventer)(nil)

// Setup creates an Eventer object which isolates the watermill setup code
// TODO: pass in logger
func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
func Setup(ctx context.Context, cfg *config.EventConfig, db *sql.DB) (*Eventer, error) {
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
if cfg == nil {
return nil, errors.New("event config is nil")
}
Expand Down Expand Up @@ -137,7 +142,7 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
middleware.Recoverer,
)

pub, sub, err := instantiateDriver(cfg.Driver, cfg)
pub, sub, err := instantiateDriver(cfg.Driver, cfg, db)
if err != nil {
return nil, fmt.Errorf("failed instantiating driver: %w", err)
}
Expand All @@ -159,10 +164,12 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
}, nil
}

func instantiateDriver(driver string, cfg *config.EventConfig) (message.Publisher, message.Subscriber, error) {
func instantiateDriver(driver string, cfg *config.EventConfig, db *sql.DB) (message.Publisher, message.Subscriber, error) {
switch driver {
case "go-channel":
case GoChannelDriver:
return buildGoChannelDriver(cfg)
case SQLDriver:
return buildPostgreSQLDriver(db)
default:
return nil, nil, fmt.Errorf("unknown driver %s", driver)
}
Expand All @@ -177,6 +184,35 @@ func buildGoChannelDriver(cfg *config.EventConfig) (message.Publisher, message.S
return pubsub, pubsub, nil
}

func buildPostgreSQLDriver(db *sql.DB) (message.Publisher, message.Subscriber, error) {
publisher, err := watermillsql.NewPublisher(
db,
watermillsql.PublisherConfig{
SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{},
AutoInitializeSchema: true,
},
watermill.NewStdLogger(false, false),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create SQL publisher: %w", err)
}

subscriber, err := watermillsql.NewSubscriber(
db,
watermillsql.SubscriberConfig{
SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{},
OffsetsAdapter: watermillsql.DefaultPostgreSQLOffsetsAdapter{},
InitializeSchema: true,
},
watermill.NewStdLogger(false, false),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create SQL subscriber: %w", err)
}

return publisher, subscriber, nil
}

// Close closes the router
func (e *Eventer) Close() error {
//nolint:gosec // It's fine if there's an error as long as we close the router
Expand Down
2 changes: 1 addition & 1 deletion internal/events/eventer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestEventer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventer, err := events.Setup(ctx, driverConfig())
eventer, err := events.Setup(ctx, driverConfig(), nil)
if err != nil {
t.Errorf("Setup() error = %v", err)
return
Expand Down
Loading