Skip to content

Commit

Permalink
Merge pull request #636 from oasisprotocol/mitjat/pgx-fast-batch-expl…
Browse files Browse the repository at this point in the history
…icit-tx

pgx: fast batches: always create an explicit tx
  • Loading branch information
mitjat authored Feb 14, 2024
2 parents 92b9851 + 56991b2 commit 30ae67c
Showing 1 changed file with 52 additions and 73 deletions.
125 changes: 52 additions & 73 deletions storage/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package postgres

import (
"context"
"errors"
"fmt"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -72,7 +73,7 @@ func NewClient(connString string, l *log.Logger) (*Client, error) {
config.ConnConfig.Tracer = &tracelog.TraceLog{
LogLevel: tracelog.LogLevelWarn,
Logger: &pgxLogger{
logger: l.WithModule(moduleName).With("db", config.ConnConfig.Database),
logger: l.WithModule(moduleName).With("db", config.ConnConfig.Database).WithCallerUnwind(10),
},
}

Expand All @@ -95,93 +96,72 @@ func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error
return c.SendBatchWithOptions(ctx, batch, pgx.TxOptions{})
}

// Starts a new DB transaction and runs fn() with it. Takes care of committing
// or rolling back the transaction: If fn() returns an error, the transaction is
// rolled back, otherwise, it is committed.
// Adapted from https://github.com/jackc/pgx/blob/v4.18.1/tx.go#L108
func (c *Client) WithTx(
ctx context.Context,
txOptions pgx.TxOptions,
fn func(pgx.Tx) error,
) error {
tx, err := c.pool.BeginTx(ctx, txOptions)
if err != nil {
return err
}

if fErr := fn(tx); fErr != nil {
_ = tx.Rollback(ctx) // ignore rollback error as there is already an error to return
return fErr
}

if err := tx.Commit(ctx); err != nil {
rollbackErr := tx.Rollback(ctx)
if rollbackErr != nil && !errors.Is(rollbackErr, pgx.ErrTxClosed) {
return rollbackErr
}
return err
}
return nil
}

// Submits a new batch. Under the hood, uses `tx.SendBatch(batch.AsPgxBatch())`,
// which is more efficient as it happens in a single roundtrip to the server.
// However, it reports errors poorly: If _any_ query is syntactically
// malformed, called with the wrong number of args, or has a type conversion problem,
// pgx will report the _first_ query as failing.
//
// For efficiency and simplicity, the method does not use explicit transactions
// with BEGIN/COMMIT unless required by the tx options. Even so, the batch is processed
// atomically, because:
// 1) We use pgx in its default QueryExecMode.
// 2) This in turn makes pgx use postgresql in pipeline mode.
// 3) Postgresql pipeline mode implies transactions-like behavior:
// https://www.postgresql.org/docs/15/libpq-pipeline-mode.html#LIBPQ-PIPELINE-ERRORS

func (c *Client) sendBatchWithOptionsFast(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error {
pgxBatch := batch.AsPgxBatch()
var batchResults pgx.BatchResults
var emptyTxOptions pgx.TxOptions
var tx pgx.Tx
var err error

// Begin a transaction.
useExplicitTx := opts != emptyTxOptions // see function docstring for more info
if useExplicitTx {
// set up our own tx with the specified options
tx, err = c.pool.BeginTx(ctx, opts)
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}
batchResults = c.pool.SendBatch(ctx, &pgxBatch)
} else {
// use implicit tx provided by SendBatch; see https://github.com/jackc/pgx/issues/879
batchResults = c.pool.SendBatch(ctx, &pgxBatch)
}
defer common.CloseOrLog(batchResults, c.logger)

// Read the results of indiviual queries in the batch.
for i := 0; i < pgxBatch.Len(); i++ {
if _, err := batchResults.Exec(); err != nil {
rollbackErr := ""
if useExplicitTx {
err2 := tx.Rollback(ctx)
if err2 != nil {
rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err2.Error())
}
return c.WithTx(ctx, opts, func(tx pgx.Tx) error {
// Read the results of indiviual queries in the batch.
batchResults := tx.SendBatch(ctx, &pgxBatch)
defer common.CloseOrLog(batchResults, c.logger)
for i := 0; i < pgxBatch.Len(); i++ {
if _, err2 := batchResults.Exec(); err2 != nil {
return fmt.Errorf("query %d %v: %w", i, batch.Queries()[i], err2)
}
return fmt.Errorf("query %d %v: %w%s", i, batch.Queries()[i], err, rollbackErr)
}
}

// Commit the tx.
if useExplicitTx {
err := tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit tx: %w", err)
}
}
return nil
return nil
})
}

// Submits a new batch of queries, sending one query at a time. Compared with `sendBatchWithOptionsSlow`, this
// gives slower performance but better error reporting.
func (c *Client) sendBatchWithOptionsSlow(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error {
// Begin a transaction.
tx, err := c.pool.BeginTx(ctx, opts)
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}

// Exec indiviual queries in the batch.
for i, q := range batch.Queries() {
if _, err2 := tx.Exec(ctx, q.Cmd, q.Args...); err2 != nil {
rollbackErr := ""
err3 := tx.Rollback(ctx)
if err3 != nil {
rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err3.Error())
return c.WithTx(ctx, opts, func(tx pgx.Tx) error {
// Exec indiviual queries in the batch.
for i, q := range batch.Queries() {
if _, err2 := tx.Exec(ctx, q.Cmd, q.Args...); err2 != nil {
rollbackErr := ""
err3 := tx.Rollback(ctx)
if err3 != nil {
rollbackErr = fmt.Sprintf("; also failed to rollback tx: %s", err3.Error())
}
return fmt.Errorf("query %d %v: %w%s", i, q, err2, rollbackErr)
}
return fmt.Errorf("query %d %v: %w%s", i, q, err2, rollbackErr)
}
}

// Commit the transaction.
err = tx.Commit(ctx)
if err != nil {
return err
}
return nil
return nil
})
}

func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error {
Expand All @@ -197,7 +177,6 @@ func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryB
// This time, use the slow method for better error msgs.
c.logger.Warn("failed to submit tx using the fast path; falling back to slow path",
"error", err,
"batch", batch.Queries(),
)
return c.sendBatchWithOptionsSlow(ctx, batch, opts)
}
Expand Down

0 comments on commit 30ae67c

Please sign in to comment.