Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PostgreSQL Messaging Pub/Sub Watermill #1680

Merged
merged 11 commits into from
Nov 22, 2023
21 changes: 19 additions & 2 deletions cmd/server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package app

import (
"database/sql"
"fmt"
"log"
"net/url"
Expand Down Expand Up @@ -65,13 +66,29 @@ var serveCmd = &cobra.Command{
if err != nil {
return fmt.Errorf("unable to connect to database: %w", err)
}
defer dbConn.Close()
defer func(dbConn *sql.DB) {
err := dbConn.Close()
if err != nil {
log.Printf("error closing database connection: %v", err)
}
}(dbConn)

store := db.NewStore(dbConn)

dbConnEvents, _, err := cfg.DatabaseQueue.GetDBConnection(ctx)
if err != nil {
return fmt.Errorf("unable to connect to events database: %w", err)
}
defer func(dbConnEvents *sql.DB) {
err := dbConnEvents.Close()
if err != nil {
log.Printf("error closing events database connection: %v", err)
}
}(dbConnEvents)

errg, ctx := errgroup.WithContext(ctx)

evt, err := events.Setup(ctx, &cfg.Events)
evt, err := events.Setup(ctx, &cfg.Events, dbConnEvents)
if err != nil {
log.Printf("Failed to set up eventer: %v", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@ github:
events: ["*"]

events:
driver: go-channel
driver: postgresql
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
router_close_timeout: 10
go-channel: {}
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions deployment/helm_tests/basic.yaml-out
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ data:
events: ["*"]

events:
driver: go-channel
driver: postgresql
router_close_timeout: 10
go-channel: {}

Expand Down Expand Up @@ -738,7 +738,7 @@ data:
events: ["*"]

events:
driver: go-channel
driver: postgresql
router_close_timeout: 10
go-channel: {}

Expand Down
4 changes: 2 additions & 2 deletions deployment/helm_tests/sidecar.yaml-out
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ data:
events: ["*"]

events:
driver: go-channel
driver: postgresql
router_close_timeout: 10
go-channel: {}

Expand Down Expand Up @@ -757,7 +757,7 @@ data:
events: ["*"]

events:
driver: go-channel
driver: postgresql
router_close_timeout: 10
go-channel: {}

Expand Down
20 changes: 20 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ services:
"--http-host=0.0.0.0",
"--metric-host=0.0.0.0",
"--db-host=postgres",
"--db-events-host=postgres_queue",
"--config=/app/config.yaml",
# If you don't want to store your GitHub client ID and secret in the main
# config file, point to them here:
Expand Down Expand Up @@ -60,6 +61,8 @@ services:
depends_on:
postgres:
condition: service_healthy
postgres_queue:
condition: service_healthy
keycloak:
condition: service_healthy
# migrate:
Expand Down Expand Up @@ -109,6 +112,23 @@ services:
retries: 5
networks:
- app_net
postgres_queue:
container_name: postgres_queue_container
image: postgres:15-alpine
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: minder-queue
networks:
- app_net
ports:
- "5433:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
interval: 10s
timeout: 5s
retries: 5

keycloak:
container_name: keycloak_container
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
)

require (
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ecr v1.20.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ecrpublic v1.18.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ github.com/ThalesIgnite/crypto11 v1.2.5/go.mod h1:ILDKtnCKiQ7zRoNxcp36Y1ZR8LBPmR
github.com/ThreeDotsLabs/watermill v1.1.1/go.mod h1:Qd1xNFxolCAHCzcMrm6RnjW0manbvN+DJVWc1MWRFlI=
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 h1:wswlLYY0Jc0tloj3lty4Y+VTEA8AM1vYfrIDwWtqyJk=
github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0/go.mod h1:83l/4sKaLHwoHJlrAsDLaXcHN+QOHHntAAyabNmiuO4=
github.com/acomagu/bufpipe v1.0.4 h1:e3H4WUzM3npvo5uv95QuJM3cQspFNtFBzvJ2oNjKIDQ=
github.com/acomagu/bufpipe v1.0.4/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4=
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Config struct {
Tracing TracingConfig `mapstructure:"tracing"`
Metrics MetricsConfig `mapstructure:"metrics"`
Database DatabaseConfig `mapstructure:"database"`
DatabaseQueue DatabaseConfig `mapstructure:"database_queue"`
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
Identity IdentityConfig `mapstructure:"identity"`
Salt CryptoConfig `mapstructure:"salt"`
Auth AuthConfig `mapstructure:"auth"`
Expand Down
35 changes: 34 additions & 1 deletion internal/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,39 @@
return err
}

return util.BindConfigFlagWithShort(
err = util.BindConfigFlagWithShort(

Check failure on line 157 in internal/config/db.go

View workflow job for this annotation

GitHub Actions / golangci-lint / Go Lint

ineffectual assignment to err (ineffassign)
v, flags, "database.sslmode", "db-sslmode", "s", "disable", "Database sslmode", flags.StringP)

err = util.BindConfigFlagWithShort(
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
v, flags, "database_queue.dbhost", "db-events-host", "Q", "localhost", "Database Events host", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlag(
v, flags, "database_queue.dbport", "db-events-port", 5432, "Database Events port", flags.Int)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "database_queue.dbuser", "db-events-user", "U", "postgres", "Database Events user", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "database_queue.dbpass", "db-events-pass", "W", "postgres", "Database Events password", flags.StringP)
if err != nil {
return err
}

err = util.BindConfigFlagWithShort(
v, flags, "database_queue.dbname", "db-events-name", "N", "minder-queue", "Database Events name", flags.StringP)
if err != nil {
return err
}

return util.BindConfigFlagWithShort(
v, flags, "database_queue.sslmode", "db-events-sslmode", "M", "disable", "Database Events sslmode", flags.StringP)
}
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestCreateUser_gRPC(t *testing.T) {
evt, err := events.Setup(context.Background(), &config.EventConfig{
Driver: "go-channel",
GoChannel: config.GoChannelEventConfig{},
})
}, nil)
require.NoError(t, err, "failed to setup eventer")
server, err := NewServer(mockStore, evt, NewMetrics(), &config.Config{
Salt: config.DefaultConfigForTest().Salt,
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestDeleteUser_gRPC(t *testing.T) {
evt, err := events.Setup(context.Background(), &config.EventConfig{
Driver: "go-channel",
GoChannel: config.GoChannelEventConfig{},
})
}, nil)
require.NoError(t, err, "failed to setup eventer")
server, err := NewServer(mockStore, evt, NewMetrics(), &config.Config{
Salt: config.DefaultConfigForTest().Salt,
Expand Down
2 changes: 1 addition & 1 deletion internal/controlplane/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newDefaultServer(t *testing.T, mockStore *mockdb.MockStore) *Server {
evt, err := events.Setup(context.Background(), &config.EventConfig{
Driver: "go-channel",
GoChannel: config.GoChannelEventConfig{},
})
}, nil)
require.NoError(t, err, "failed to setup eventer")

var c *config.Config
Expand Down
2 changes: 1 addition & 1 deletion internal/eea/eea_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestAggregator(t *testing.T) {
BufferSize: concurrentEvents,
BlockPublishUntilSubscriberAck: true,
},
})
}, nil)
require.NoError(t, err)

// we'll wait 2 seconds for the lock to be available
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ default allow = true`,
GoChannel: config.GoChannelEventConfig{
BlockPublishUntilSubscriberAck: true,
},
})
}, nil)
require.NoError(t, err, "failed to setup eventer")

go func() {
Expand Down
39 changes: 36 additions & 3 deletions internal/events/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package events

import (
"context"
"database/sql"
"errors"
"fmt"
"reflect"
"runtime"
"time"

"github.com/ThreeDotsLabs/watermill"
watermillsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/metrics"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
Expand Down Expand Up @@ -98,7 +100,7 @@ var _ message.Publisher = (*Eventer)(nil)

// Setup creates an Eventer object which isolates the watermill setup code
// TODO: pass in logger
func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
func Setup(ctx context.Context, cfg *config.EventConfig, db *sql.DB) (*Eventer, error) {
JAORMX marked this conversation as resolved.
Show resolved Hide resolved
if cfg == nil {
return nil, errors.New("event config is nil")
}
Expand Down Expand Up @@ -137,7 +139,7 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
middleware.Recoverer,
)

pub, sub, err := instantiateDriver(cfg.Driver, cfg)
pub, sub, err := instantiateDriver(cfg.Driver, cfg, db)
if err != nil {
return nil, fmt.Errorf("failed instantiating driver: %w", err)
}
Expand All @@ -159,10 +161,12 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
}, nil
}

func instantiateDriver(driver string, cfg *config.EventConfig) (message.Publisher, message.Subscriber, error) {
func instantiateDriver(driver string, cfg *config.EventConfig, db *sql.DB) (message.Publisher, message.Subscriber, error) {
switch driver {
case "go-channel":
return buildGoChannelDriver(cfg)
case "postgresql":
return buildPostgreSQLDriver(db)
default:
return nil, nil, fmt.Errorf("unknown driver %s", driver)
}
Expand All @@ -177,6 +181,35 @@ func buildGoChannelDriver(cfg *config.EventConfig) (message.Publisher, message.S
return pubsub, pubsub, nil
}

func buildPostgreSQLDriver(db *sql.DB) (message.Publisher, message.Subscriber, error) {
publisher, err := watermillsql.NewPublisher(
db,
watermillsql.PublisherConfig{
SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{},
AutoInitializeSchema: true,
},
watermill.NewStdLogger(false, false),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create SQL publisher: %w", err)
}

subscriber, err := watermillsql.NewSubscriber(
db,
watermillsql.SubscriberConfig{
SchemaAdapter: watermillsql.DefaultPostgreSQLSchema{},
OffsetsAdapter: watermillsql.DefaultPostgreSQLOffsetsAdapter{},
InitializeSchema: true,
},
watermill.NewStdLogger(false, false),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create SQL subscriber: %w", err)
}

return publisher, subscriber, nil
}

// Close closes the router
func (e *Eventer) Close() error {
//nolint:gosec // It's fine if there's an error as long as we close the router
Expand Down
2 changes: 1 addition & 1 deletion internal/events/eventer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestEventer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventer, err := events.Setup(ctx, driverConfig())
eventer, err := events.Setup(ctx, driverConfig(), nil)
if err != nil {
t.Errorf("Setup() error = %v", err)
return
Expand Down
Loading