Skip to content

Commit

Permalink
1. Fix redis goroutine leak
Browse files Browse the repository at this point in the history
  • Loading branch information
maranqz committed Feb 3, 2024
1 parent 1596f02 commit c878cfb
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 57 deletions.
1 change: 1 addition & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
pull_request:
branches:
- main
- v1
name: Test
env:
GO_TARGET_VERSION: 1.21
Expand Down
4 changes: 4 additions & 0 deletions gorm/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import (
// Example demonstrates the implementation of the Repository pattern by trm.Manager.
func Example() {
db, err := gorm.Open(sqlite.Open("file:test.db?mode=memory"))

checkErr(err)
sqlDB, err := db.DB()
checkErr(err)
defer sqlDB.Close()

// Migrate the schema
checkErr(db.AutoMigrate(&userRow{}))
Expand Down
22 changes: 11 additions & 11 deletions gorm/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewTransaction(
opts *sql.TxOptions,
db *gorm.DB,
) (context.Context, *Transaction, error) {
tr := &Transaction{
t := &Transaction{
tx: nil,
txMutex: sync.Mutex{},
active: drivers.NewIsClosed(),
Expand All @@ -46,28 +46,28 @@ func NewTransaction(
db = db.WithContext(ctx)
// Used closure to avoid implementing nested transactions.
err = db.Transaction(func(tx *gorm.DB) error {
tr.tx = tx
t.tx = tx

wg.Done()

<-tr.activeClosure.Closed()
<-t.activeClosure.Closed()

return tr.activeClosure.Err()
return t.activeClosure.Err()
}, opts)

tr.txMutex.Lock()
defer tr.txMutex.Unlock()
tx := tr.tx
t.txMutex.Lock()
defer t.txMutex.Unlock()
tx := t.tx

if tx != nil {
// Return error from transaction rollback
// Error from commit returns from db.Transaction closure
if errors.Is(err, drivers.ErrRollbackTr) &&
tx.Error != nil {
err = tr.tx.Error
err = t.tx.Error
}

tr.active.CloseWithCause(err)
t.active.CloseWithCause(err)
} else {
wg.Done()
}
Expand All @@ -79,9 +79,9 @@ func NewTransaction(
return ctx, nil, err
}

go tr.awaitDone(ctx)
go t.awaitDone(ctx)

return ctx, tr, nil
return ctx, t, nil
}

func (t *Transaction) awaitDone(ctx context.Context) {
Expand Down
3 changes: 2 additions & 1 deletion gorm/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestTransaction_awaitDone_byRollback(t *testing.T) {
require.NoError(t, err)

f := NewDefaultFactory(dbgorm)
ctx := context.Background()
ctx, _ := context.WithCancel(context.Background())

wg := sync.WaitGroup{}
wg.Add(1)
Expand All @@ -284,6 +284,7 @@ func TestTransaction_awaitDone_byRollback(t *testing.T) {

require.NoError(t, tr.Rollback(ctx))
require.False(t, tr.IsActive())
require.ErrorIs(t, tr.Rollback(ctx), sql.ErrTxDone)
}()

wg.Wait()
Expand Down
35 changes: 30 additions & 5 deletions redis/settings.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package redis

import (
"sync"

"github.com/go-redis/redis/v8"

"github.com/avito-tech/go-transaction-manager/trm"
Expand All @@ -20,7 +22,9 @@ type Settings struct {
isMulti *bool
watchKeys []string
txDecorator []TxDecorator
ret *[]redis.Cmder

ret *[]redis.Cmder
muRet sync.RWMutex
}

// NewSettings creates Settings.
Expand All @@ -31,6 +35,7 @@ func NewSettings(trms trm.Settings, oo ...Opt) (Settings, error) {
watchKeys: nil,
txDecorator: nil,
ret: nil,
muRet: sync.RWMutex{},
}

for _, o := range oo {
Expand Down Expand Up @@ -68,8 +73,8 @@ func (s Settings) EnrichBy(in trm.Settings) trm.Settings {
s = s.SetTxDecorators(external.TxDecorators()...)
}

if s.Return() == nil {
s = s.SetReturn(external.Return())
if s.ReturnPtr() == nil {
s = s.SetReturn(external.ReturnPtr())
}
}

Expand Down Expand Up @@ -135,11 +140,31 @@ func (s Settings) setTxDecorator(in ...TxDecorator) Settings {
return s
}

// Return returns []redis.Cmder from Transaction.
func (s Settings) Return() *[]redis.Cmder {
func (s Settings) ReturnPtr() *[]redis.Cmder {
s.muRet.RLock()
defer s.muRet.RUnlock()

return s.ret
}

// Return returns []redis.Cmder from Transaction.
func (s Settings) Return() []redis.Cmder {
res := s.ReturnPtr()
if res != nil {
return *s.ReturnPtr()
}

return nil
}

// AppendReturn append []redis.Cmder from Transaction.
func (s *Settings) AppendReturn(cmds ...redis.Cmder) {
s.muRet.Lock()
defer s.muRet.Unlock()

*s.ret = append(*s.ret, cmds...)
}

// SetReturn sets link to save []redis.Cmder from Transaction.
func (s Settings) SetReturn(in *[]redis.Cmder) Settings {
return s.setReturn(in)
Expand Down
76 changes: 49 additions & 27 deletions redis/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ type TxDecorator func(tx Cmdable, db redis.Cmdable) Cmdable

// Transaction is trm.Transaction for sqlx.Tx.
type Transaction struct {
tx Cmdable
// err is used to close transaction and get error from it
err chan error
active *drivers.IsClose
tx txInterface
active *drivers.IsClose
activeClosure *drivers.IsClose
}

// NewTransaction creates trm.Transaction for sqlx.Tx.
Expand All @@ -31,9 +30,9 @@ func NewTransaction(
s Settings,
) (context.Context, *Transaction, error) {
t := &Transaction{
err: make(chan error),
tx: nil,
active: drivers.NewIsClosed(),
tx: nil,
active: drivers.NewIsClosed(),
activeClosure: drivers.NewIsClosed(),
}

var err error
Expand All @@ -52,8 +51,8 @@ func NewTransaction(

cmds, err = fn(ctx, func(pipe redis.Pipeliner) error {
t.tx = &tx{
tx: rtx,
Cmdable: pipe,
tx: rtx,
Pipeliner: pipe,
}

for _, d := range s.TxDecorators() {
Expand All @@ -62,18 +61,20 @@ func NewTransaction(

wg.Done()

return <-t.err
<-t.activeClosure.Closed()

return t.activeClosure.Err()
})

if len(cmds) > 0 && s.Return() != nil {
*s.Return() = append(*s.Return(), cmds...)
if len(cmds) > 0 && s.ReturnPtr() != nil {
s.AppendReturn(cmds...)
}

return err
}, s.WatchKeys()...)

if t.tx != nil {
t.err <- err
t.active.CloseWithCause(err)
} else {
wg.Done()
}
Expand All @@ -97,7 +98,8 @@ func (t *Transaction) awaitDone(ctx context.Context) {

select {
case <-ctx.Done():
t.active.Close()
// Rollback will be called by context.Err()
t.activeClosure.Close()
case <-t.active.Closed():
}
}
Expand All @@ -109,32 +111,52 @@ func (t *Transaction) Transaction() interface{} {
}

// Commit closes the trm.Transaction.
func (t *Transaction) Commit(_ context.Context) error {
defer t.active.Close()
func (t *Transaction) Commit(ctx context.Context) error {
select {
case <-t.active.Closed():
cmds, err := t.tx.Exec(ctx)

// TODO process cmds
_ = cmds

// TODO deadlock
t.err <- nil
return err
default:
t.activeClosure.Close()

return <-t.err
<-t.active.Closed()

return t.active.Err()
}
}

// Rollback the trm.Transaction.
func (t *Transaction) Rollback(_ context.Context) error {
defer t.active.Close()
select {
case <-t.active.Closed():
return t.tx.Discard()
default:
t.activeClosure.CloseWithCause(drivers.ErrRollbackTr)

// TODO deadlock
t.err <- errRollbackTx
<-t.active.Closed()

err := <-t.err
err := t.active.Err()
if errors.Is(err, drivers.ErrRollbackTr) {
return nil
}

if errors.Is(err, errRollbackTx) {
return nil
}
// unreachable code, because of go-redis doesn't process error from Close
// https://github.com/redis/go-redis/blob/v8.11.5/tx.go#L69
// https://github.com/redis/go-redis/blob/v8.11.5/pipeline.go#L130

return err
return err
}
}

// IsActive returns true if the transaction started but not committed or rolled back.
func (t *Transaction) IsActive() bool {
return t.active.IsActive()
}

func (t *Transaction) Closed() <-chan struct{} {
return t.active.Closed()
}
Loading

0 comments on commit c878cfb

Please sign in to comment.