Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PostgreSQL Messaging Pub/Sub Watermill #1680

Merged
merged 11 commits into from
Nov 22, 2023
8 changes: 7 additions & 1 deletion cmd/server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package app

import (
"database/sql"
"fmt"
"log"
"net/url"
Expand Down Expand Up @@ -65,7 +66,12 @@ var serveCmd = &cobra.Command{
if err != nil {
return fmt.Errorf("unable to connect to database: %w", err)
}
defer dbConn.Close()
defer func(dbConn *sql.DB) {
err := dbConn.Close()
if err != nil {
log.Printf("error closing database connection: %v", err)
}
}(dbConn)

store := db.NewStore(dbConn)

Expand Down
5 changes: 5 additions & 0 deletions deployment/helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ spec:
value: "{{ .Values.trusty.endpoint }}"
- name: "SIGSTORE_NO_CACHE"
value: "true"
- name: "MINDER_DB_PASS"
valueFrom:
secretKeyRef:
name: minder-identity-secrets
key: password
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
{{- if .Values.deploymentSettings.extraEnv }}
{{- toYaml .Values.deploymentSettings.extraEnv | nindent 10 }}
{{- end }}
Expand Down
5 changes: 5 additions & 0 deletions deployment/helm_tests/basic.yaml-out
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ spec:
value: "http://pi.pi:8000"
- name: "SIGSTORE_NO_CACHE"
value: "true"
- name: "MINDER_DB_PASS"
valueFrom:
secretKeyRef:
name: minder-identity-secrets
key: password

# ko will always specify a digest, so we don't need to worry about
# CRI image caching
Expand Down
5 changes: 5 additions & 0 deletions deployment/helm_tests/sidecar.yaml-out
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ spec:
value: "http://pi.pi:8000"
- name: "SIGSTORE_NO_CACHE"
value: "true"
- name: "MINDER_DB_PASS"
valueFrom:
secretKeyRef:
name: minder-identity-secrets
key: password
- name: PGPASSFILE
value: /secrets/db/.pgpass

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
)

require (
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ecr v1.20.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ecrpublic v1.18.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ github.com/ThalesIgnite/crypto11 v1.2.5/go.mod h1:ILDKtnCKiQ7zRoNxcp36Y1ZR8LBPmR
github.com/ThreeDotsLabs/watermill v1.1.1/go.mod h1:Qd1xNFxolCAHCzcMrm6RnjW0manbvN+DJVWc1MWRFlI=
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 h1:wswlLYY0Jc0tloj3lty4Y+VTEA8AM1vYfrIDwWtqyJk=
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0/go.mod h1:83l/4sKaLHwoHJlrAsDLaXcHN+QOHHntAAyabNmiuO4=
github.com/acomagu/bufpipe v1.0.4 h1:e3H4WUzM3npvo5uv95QuJM3cQspFNtFBzvJ2oNjKIDQ=
github.com/acomagu/bufpipe v1.0.4/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4=
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
Expand Down
38 changes: 37 additions & 1 deletion internal/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,42 @@ func RegisterDatabaseFlags(v *viper.Viper, flags *pflag.FlagSet) error {
return err
}

return util.BindConfigFlagWithShort(
err = util.BindConfigFlagWithShort(
v, flags, "database.sslmode", "db-sslmode", "s", "disable", "Database sslmode", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
v, flags, "events.sql.dbhost", "db-events-host", "Q", "localhost", "Database Events host", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlag(
v, flags, "events.sql.dbport", "db-events-port", 5432, "Database Events port", flags.Int)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "events.sql.dbuser", "db-events-user", "U", "postgres", "Database Events user", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "events.sql.dbpass", "db-events-pass", "W", "postgres", "Database Events password", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "events.sql.dbname", "db-events-name", "N", "minder-queue", "Database Events name", flags.StringP)
if err != nil {
return err
}

return util.BindConfigFlagWithShort(
v, flags, "events.sql.sslmode", "db-events-sslmode", "M", "disable", "Database Events sslmode", flags.StringP)
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
}
9 changes: 9 additions & 0 deletions internal/config/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type EventConfig struct {
RouterCloseTimeout int64 `mapstructure:"router_close_timeout" default:"10"`
// GoChannel is the configuration for the go channel event driver
GoChannel GoChannelEventConfig `mapstructure:"go-channel" default:"{}"`
// SQLPubSub is the configuration for the database event driver
SQLPubSub SQLEventConfig `mapstructure:"sql" default:"{}"`
// Aggregator is the configuration for the event aggregator middleware
Aggregator AggregatorConfig `mapstructure:"aggregator" default:"{}"`
}
Expand All @@ -39,6 +41,13 @@ type GoChannelEventConfig struct {
BlockPublishUntilSubscriberAck bool `mapstructure:"block_publish_until_subscriber_ack" default:"false"`
}

// SQLEventConfig is the configuration for the database event driver
type SQLEventConfig struct {
// InitSchema is whether or not to initialize the schema
InitSchema bool `mapstructure:"init_schema" default:"true"`
Connection DatabaseConfig `mapstructure:"connection" default:"{}"`
}

// AggregatorConfig is the configuration for the event aggregator middleware
type AggregatorConfig struct {
// LockInterval is the interval for locking events in seconds.
Expand Down
82 changes: 72 additions & 10 deletions internal/events/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"context"
"errors"
"fmt"
"log"
"reflect"
"runtime"
"time"

"github.com/ThreeDotsLabs/watermill"
watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/metrics"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
Expand All @@ -43,6 +45,9 @@ const (
ProviderTypeKey = "provider"
ProviderSourceKey = "source"
GithubWebhookEventTypeKey = "type"

GoChannelDriver = "go-channel"
SQLDriver = "sql"
)

const (
Expand Down Expand Up @@ -82,6 +87,8 @@ type AggregatorMiddleware interface {
AggregateMiddleware(h message.HandlerFunc) message.HandlerFunc
}

type driverCloser func()

// Eventer is a wrapper over the relevant eventing objects in such
// a way that they can be easily accessible and configurable.
type Eventer struct {
Expand All @@ -91,6 +98,8 @@ type Eventer struct {
// webhookSubscriber will subscribe to the webhook topic and handle incoming events
webhookSubscriber message.Subscriber
// TODO: We'll have a Final publisher that will publish to the final topic

closer driverCloser
}

var _ Registrar = (*Eventer)(nil)
Expand Down Expand Up @@ -137,7 +146,7 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
middleware.Recoverer,
)

pub, sub, err := instantiateDriver(cfg.Driver, cfg)
pub, sub, cl, err := instantiateDriver(ctx, cfg.Driver, cfg)
if err != nil {
return nil, fmt.Errorf("failed instantiating driver: %w", err)
}
Expand All @@ -156,33 +165,86 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
router: router,
webhookPublisher: pubWithMetrics,
webhookSubscriber: subWithMetrics,
closer: func() {
//nolint:gosec // It's fine if there's an error as long as we close the router
pubWithMetrics.Close()
//nolint:gosec // It's fine if there's an error as long as we close the router
subWithMetrics.Close()
// driver close
cl()
},
}, nil
}

func instantiateDriver(driver string, cfg *config.EventConfig) (message.Publisher, message.Subscriber, error) {
func instantiateDriver(
ctx context.Context,
driver string,
cfg *config.EventConfig,
) (message.Publisher, message.Subscriber, driverCloser, error) {
switch driver {
case "go-channel":
case GoChannelDriver:
return buildGoChannelDriver(cfg)
case SQLDriver:
return buildPostgreSQLDriver(ctx, cfg)
default:
return nil, nil, fmt.Errorf("unknown driver %s", driver)
return nil, nil, nil, fmt.Errorf("unknown driver %s", driver)
}
}

func buildGoChannelDriver(cfg *config.EventConfig) (message.Publisher, message.Subscriber, error) {
func buildGoChannelDriver(cfg *config.EventConfig) (message.Publisher, message.Subscriber, driverCloser, error) {
pubsub := gochannel.NewGoChannel(gochannel.Config{
OutputChannelBuffer: cfg.GoChannel.BufferSize,
Persistent: cfg.GoChannel.PersistEvents,
}, nil)

return pubsub, pubsub, nil
return pubsub, pubsub, func() {}, nil
}

func buildPostgreSQLDriver(
ctx context.Context,
cfg *config.EventConfig,
) (message.Publisher, message.Subscriber, driverCloser, error) {
db, _, err := cfg.SQLPubSub.Connection.GetDBConnection(ctx)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to connect to events database: %w", err)
}

publisher, err := watermillsql.NewPublisher(
db,
watermillsql.PublisherConfig{
SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{},
AutoInitializeSchema: true,
},
watermill.NewStdLogger(false, false),
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create SQL publisher: %w", err)
}

subscriber, err := watermillsql.NewSubscriber(
db,
watermillsql.SubscriberConfig{
SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{},
OffsetsAdapter: watermillsql.DefaultPostgreSQLOffsetsAdapter{},
InitializeSchema: true,
},
watermill.NewStdLogger(false, false),
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create SQL subscriber: %w", err)
}

return publisher, subscriber, func() {
err := db.Close()
if err != nil {
log.Printf("error closing events database connection: %v", err)
}
}, nil
}

// Close closes the router
func (e *Eventer) Close() error {
//nolint:gosec // It's fine if there's an error as long as we close the router
e.webhookPublisher.Close()
//nolint:gosec // It's fine if there's an error as long as we close the router
e.webhookSubscriber.Close()
e.closer()
return e.router.Close()
}

Expand Down
Loading