Skip to content

Commit

Permalink
feat: add transaction runner for better transaction retries (#315)
Browse files Browse the repository at this point in the history
* feat: add transaction runner for better transaction retries

Adds a RunTransaction function that executes read/write transactions in
a retry loop, and automatically retries the transaction if it was aborted
by Spanner. This prevents ErrAbortedDueToConcurrentModification from
being returned.

Using this function requires the transaction to be passed as a function
argument instead of using the traditional model of calling Begin and
Commit on a connection.

* test: add a test for errors during commit
  • Loading branch information
olavloite authored Nov 14, 2024
1 parent 367726c commit 3731176
Show file tree
Hide file tree
Showing 5 changed files with 621 additions and 21 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,38 @@ tx, err := conn.BeginTx(ctx, &sql.TxOptions{
})
```

## Transaction Runner (Retry Transactions)

Spanner can abort a read/write transaction if concurrent modifications are detected
that would violate the transaction consistency. When this happens, the driver will
return the `ErrAbortedDueToConcurrentModification` error. You can use the
`RunTransaction` function to let the driver automatically retry transactions that
are aborted by Spanner.

```go
package sample

import (
"context"
"database/sql"
"fmt"

_ "github.com/googleapis/go-sql-spanner"
spannerdriver "github.com/googleapis/go-sql-spanner"
)

spannerdriver.RunTransaction(ctx, db, &sql.TxOptions{}, func(ctx context.Context, tx *sql.Tx) error {
row := tx.QueryRowContext(ctx, "select Name from Singers where SingerId=@id", 123)
var name string
if err := row.Scan(&name); err != nil {
return err
}
return nil
})
```

See also the [transaction runner sample](./examples/run-transaction/main.go).

## DDL Statements

[DDL statements](https://cloud.google.com/spanner/docs/data-definition-language)
Expand Down
98 changes: 98 additions & 0 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
adminapi "cloud.google.com/go/spanner/admin/database/apiv1"
adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -403,6 +404,103 @@ func (c *connector) closeClients() (err error) {
return err
}

// RunTransaction runs the given function in a transaction on the given database.
// If the connection is a connection to a Spanner database, the transaction will
// automatically be retried if the transaction is aborted by Spanner. Any other
// errors will be propagated to the caller and the transaction will be rolled
// back. The transaction will be committed if the supplied function did not
// return an error.
//
// If the connection is to a non-Spanner database, no retries will be attempted,
// and any error that occurs during the transaction will be propagated to the
// caller.
//
// The application should *NOT* call tx.Commit() or tx.Rollback(). This is done
// automatically by this function, depending on whether the transaction function
// returned an error or not.
//
// This function will never return ErrAbortedDueToConcurrentModification.
func RunTransaction(ctx context.Context, db *sql.DB, opts *sql.TxOptions, f func(ctx context.Context, tx *sql.Tx) error) error {
// Get a connection from the pool that we can use to run a transaction.
// Getting a connection here already makes sure that we can reserve this
// connection exclusively for the duration of this method. That again
// allows us to temporarily change the state of the connection (e.g. set
// the retryAborts flag to false).
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()

// We don't need to keep track of a running checksum for retries when using
// this method, so we disable internal retries.
// Retries will instead be handled by the loop below.
origRetryAborts := false
var spannerConn SpannerConn
if err := conn.Raw(func(driverConn any) error {
var ok bool
spannerConn, ok = driverConn.(SpannerConn)
if !ok {
// It is not a Spanner connection, so just ignore and continue without any special handling.
return nil
}
origRetryAborts = spannerConn.RetryAbortsInternally()
return spannerConn.SetRetryAbortsInternally(false)
}); err != nil {
return err
}
// Reset the flag for internal retries after the transaction (if applicable).
if origRetryAborts {
defer func() { _ = spannerConn.SetRetryAbortsInternally(origRetryAborts) }()
}

tx, err := conn.BeginTx(ctx, opts)
if err != nil {
return err
}
for {
err = f(ctx, tx)
if err == nil {
err = tx.Commit()
if err == nil {
return nil
}
}
// Rollback and return the error if:
// 1. The connection is not a Spanner connection.
// 2. Or the error code is not Aborted.
if spannerConn == nil || spanner.ErrCode(err) != codes.Aborted {
// We don't really need to call Rollback here if the error happened
// during the Commit. However, the SQL package treats this as a no-op
// and just returns an ErrTxDone if we do, so this is simpler than
// keeping track of where the error happened.
_ = tx.Rollback()
return err
}

// The transaction was aborted by Spanner.
// Back off and retry the entire transaction.
if delay, ok := spanner.ExtractRetryDelay(err); ok {
err = gax.Sleep(ctx, delay)
if err != nil {
// We need to 'roll back' the transaction here to tell the sql
// package that there is no active transaction on the connection
// anymore. It does not actually roll back the transaction, as it
// has already been aborted by Spanner.
_ = tx.Rollback()
return err
}
}

// TODO: Reset the existing transaction for retry instead of creating a new one.
_ = tx.Rollback()
tx, err = conn.BeginTx(ctx, opts)
if err != nil {
return err
}
}
}

// SpannerConn is the public interface for the raw Spanner connection for the
// sql driver. This interface can be used with the db.Conn().Raw() method.
type SpannerConn interface {
Expand Down
Loading

0 comments on commit 3731176

Please sign in to comment.