Skip to content

Commit

Permalink
fix(core): database clients pooling improvements (#1047)
Browse files Browse the repository at this point in the history
Closes #681 

Per service with db connection:
- one client
- one schema
- one shared pool

Not supported (and documented in db package)
- multiple schemas per client
- multiple services sharing a pool (unless rolled up into one service
like policy)
- multiple databases per platform Postgres instance
- multiple Postgres servers per platform instance
  • Loading branch information
jakedoublev authored Jun 28, 2024
1 parent 552e970 commit 8193cec
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
22 changes: 22 additions & 0 deletions service/pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ type Config struct {
MigrationsFS *embed.FS
}

/*
A wrapper around a pgxpool.Pool and sql.DB reference.
Each service should have a single instance of the Client to share a connection pool,
schema (driven by the service namespace), and an embedded file system for migrations.
The 'search_path' is set to the schema on connection to the database.
If the database config 'runMigrations' is set to true, the client will run migrations on startup,
once per namespace (as there should only be one embedded migrations FS per namespace).
Multiple pools, schemas, or migrations per service are not supported. Multiple databases per
PostgreSQL instance or multiple PostgreSQL servers per platform instance are not supported.
*/
type Client struct {
Pgx PgxIface
config Config
Expand Down Expand Up @@ -102,6 +116,7 @@ func New(ctx context.Context, config Config, o ...OptsFunc) (*Client, error) {
return nil, fmt.Errorf("failed to parse pgx config: %w", err)
}

slog.Info("opening new database pool", slog.String("schema", config.Schema))
pool, err := pgxpool.NewWithConfig(ctx, dbConfig)
if err != nil {
return nil, fmt.Errorf("failed to create pgxpool: %w", err)
Expand All @@ -117,6 +132,13 @@ func New(ctx context.Context, config Config, o ...OptsFunc) (*Client, error) {
}
}

// Set the Client search_path to the schema
q := fmt.Sprintf("SET search_path TO %s", config.Schema)
if _, err := c.Pgx.Exec(ctx, q); err != nil {
return nil, fmt.Errorf("failed to SET search_path for db Client schema to [%s]: %w", config.Schema, err)
}

slog.Info("successfully set database client search_path", slog.String("schema", config.Schema))
return &c, nil
}

Expand Down
9 changes: 1 addition & 8 deletions service/pkg/db/db_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ func migrationInit(ctx context.Context, c *Client, migrations *embed.FS) (*goose
return nil, 0, nil, fmt.Errorf("migrations are disabled")
}

// Set the schema
q := fmt.Sprintf("SET search_path TO %s", c.config.Schema)
if tag, err := c.Pgx.Exec(ctx, q); err != nil {
slog.Error("migration error", slog.String("query", q), slog.String("error", err.Error()), slog.Any("tag", tag))
return nil, 0, nil, fmt.Errorf("failed to SET search_path [%w]", err)
}

// Cast the pgxpool.Pool to a *sql.DB
pool, ok := c.Pgx.(*pgxpool.Pool)
if !ok || pool == nil {
Expand All @@ -42,7 +35,7 @@ func migrationInit(ctx context.Context, c *Client, migrations *embed.FS) (*goose
if e != nil {
return nil, 0, nil, errors.Join(fmt.Errorf("failed to get current version"), e)
}
slog.Info("migration db info ", slog.Any("current version", v))
slog.Info("migration db info", slog.Any("current version", v))

// Return the provider, version, and close function
return provider, v, func() {
Expand Down

0 comments on commit 8193cec

Please sign in to comment.