Skip to content

Commit

Permalink
Merge pull request #190 from iotaledger/feat/batch-accepted-tx
Browse files Browse the repository at this point in the history
Batch insert accepted tx and reduce db table contention
  • Loading branch information
alexsporn authored Mar 28, 2024
2 parents 18ded1a + 89e2d1b commit 7c2e469
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 35 deletions.
109 changes: 82 additions & 27 deletions components/indexer/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/http"
"slices"
"strings"
"time"

Expand All @@ -16,7 +17,9 @@ import (
"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/hive.go/app/shutdown"
"github.com/iotaledger/hive.go/db"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/runtime/timeutil"
"github.com/iotaledger/hive.go/sql"
"github.com/iotaledger/inx-app/pkg/httpserver"
"github.com/iotaledger/inx-app/pkg/nodebridge"
Expand Down Expand Up @@ -98,6 +101,21 @@ func provide(c *dig.Container) error {
func run() error {
indexerInitWait := make(chan struct{})

pendingAcceptedTransactions := shrinkingmap.New[iotago.SlotIndex, []*nodebridge.AcceptedTransaction]()

nextPendingAcceptedSlot := func() iotago.SlotIndex {
keys := pendingAcceptedTransactions.Keys()
switch len(keys) {
case 0:
return 0
case 1:
return keys[0]
default:
slices.Sort(keys)
return keys[0]
}
}

// create a background worker that handles the indexer events
if err := Component.Daemon().BackgroundWorker("Indexer", func(ctx context.Context) {
Component.LogInfo("Starting Indexer")
Expand All @@ -123,6 +141,14 @@ func run() error {
if err != nil {
return err
}

// check if we have any pending accepted transactions for this slot or previous slots and drop them
for nextSlot := nextPendingAcceptedSlot(); nextSlot > 0 && nextSlot <= ledgerUpdate.Slot; nextSlot = nextPendingAcceptedSlot() {
if deleted := pendingAcceptedTransactions.Delete(nextSlot); deleted {
Component.LogInfof("Drop pending accepted transactions for slot %d", nextSlot)
}
}

if err := deps.Indexer.CommitLedgerUpdate(ledgerUpdate); err != nil {
return err
}
Expand Down Expand Up @@ -152,22 +178,44 @@ func run() error {
}

Component.LogInfo("Starting AcceptedTransactions ... done")
if err := deps.NodeBridge.ListenToAcceptedTransactions(ctx, func(tx *nodebridge.AcceptedTransaction) error {
ts := time.Now()
ledgerUpdate, err := LedgerUpdateFromNodeBridgeAcceptedTransaction(tx)
if err != nil {
return err

ticker := timeutil.NewTicker(func() {
nextSlot := nextPendingAcceptedSlot()
if nextSlot == 0 {
return
}
if err := deps.Indexer.AcceptLedgerUpdate(ledgerUpdate); err != nil {
if ierrors.Is(err, indexer.ErrLedgerUpdateSkipped) {
Component.LogInfof("Skipped accepted transaction %s at slot %d with %d new and %d consumed outputs", tx.TransactionID.ToHex(), tx.Slot, len(tx.Created), len(tx.Consumed))
return nil

ts := time.Now()
if txs, deleted := pendingAcceptedTransactions.DeleteAndReturn(nextSlot); deleted {
ledgerUpdate, err := LedgerUpdateFromNodeBridgeAcceptedTransactions(txs)
if err != nil {
deps.ShutdownHandler.SelfShutdown(fmt.Sprintf("LedgerUpdateFromNodeBridgeAcceptedTransactions failed, error: %s", err), false)
}

return err
if err := deps.Indexer.AcceptLedgerUpdate(ledgerUpdate); err != nil {
if ierrors.Is(err, indexer.ErrLedgerUpdateSkipped) {
Component.LogInfof("Skipped accepted transactions batch at slot %d with %d new and %d consumed outputs", ledgerUpdate.Slot, len(ledgerUpdate.Created), len(ledgerUpdate.Consumed))
return
}

deps.ShutdownHandler.SelfShutdown(fmt.Sprintf("AcceptLedgerUpdate failed, error: %s", err), false)
}

Component.LogInfof("Applying accepted transactions batch at slot %d with %d new and %d consumed outputs took %s", ledgerUpdate.Slot, len(ledgerUpdate.Created), len(ledgerUpdate.Consumed), time.Since(ts).Truncate(time.Millisecond))
}
}, 1*time.Second, ctx)

defer ticker.Shutdown()

Component.LogInfof("Applying accepted transaction %s at slot %d with %d new and %d consumed outputs took %s", tx.TransactionID.ToHex(), tx.Slot, len(tx.Created), len(tx.Consumed), time.Since(ts).Truncate(time.Millisecond))
if err := deps.NodeBridge.ListenToAcceptedTransactions(ctx, func(tx *nodebridge.AcceptedTransaction) error {
pendingAcceptedTransactions.Compute(tx.Slot, func(currentTxs []*nodebridge.AcceptedTransaction, exists bool) []*nodebridge.AcceptedTransaction {
Component.LogDebugf("Batching accepted transaction %s at slot %d", tx.TransactionID.ToHex(), tx.Slot)
if !exists {
return []*nodebridge.AcceptedTransaction{tx}
}

return append(currentTxs, tx)
})

return nil
}); err != nil {
Expand Down Expand Up @@ -441,28 +489,35 @@ func LedgerUpdateFromNodeBridge(update *nodebridge.LedgerUpdate) (*indexer.Ledge
}, nil
}

func LedgerUpdateFromNodeBridgeAcceptedTransaction(tx *nodebridge.AcceptedTransaction) (*indexer.LedgerUpdate, error) {
consumed := make([]*indexer.LedgerOutput, len(tx.Consumed))
for i, output := range tx.Consumed {
consumed[i] = &indexer.LedgerOutput{
OutputID: output.OutputID,
Output: output.Output,
BookedAt: output.Metadata.Included.Slot,
SpentAt: output.Metadata.Spent.Slot,
}
func LedgerUpdateFromNodeBridgeAcceptedTransactions(txs []*nodebridge.AcceptedTransaction) (*indexer.LedgerUpdate, error) {
if len(txs) == 0 {
return nil, ierrors.New("no transactions provided")
}

created := make([]*indexer.LedgerOutput, len(tx.Created))
for i, output := range tx.Created {
created[i] = &indexer.LedgerOutput{
OutputID: output.OutputID,
Output: output.Output,
BookedAt: output.Metadata.Included.Slot,
var consumed []*indexer.LedgerOutput
var created []*indexer.LedgerOutput

for _, tx := range txs {
for _, output := range tx.Consumed {
consumed = append(consumed, &indexer.LedgerOutput{
OutputID: output.OutputID,
Output: output.Output,
BookedAt: output.Metadata.Included.Slot,
SpentAt: output.Metadata.Spent.Slot,
})
}

for _, output := range tx.Created {
created = append(created, &indexer.LedgerOutput{
OutputID: output.OutputID,
Output: output.Output,
BookedAt: output.Metadata.Included.Slot,
})
}
}

return &indexer.LedgerUpdate{
Slot: tx.Slot,
Slot: txs[0].Slot,
Consumed: consumed,
Created: created,
}, nil
Expand Down
51 changes: 43 additions & 8 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package indexer

import (
"sync"

"github.com/ethereum/go-ethereum/common/hexutil"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -39,6 +41,9 @@ type Indexer struct {
log.Logger
db *gorm.DB
engine db.Engine

lastCommittedSlot iotago.SlotIndex
lastCommittedSlotMutex sync.RWMutex
}

func NewIndexer(dbParams sql.DatabaseParameters, logger log.Logger) (*Indexer, error) {
Expand Down Expand Up @@ -470,12 +475,12 @@ func (i *Indexer) AutoMigrate() error {

func (i *Indexer) AcceptLedgerUpdate(update *LedgerUpdate) error {
return i.db.Transaction(func(tx *gorm.DB) error {
status := Status{}
if err := tx.Take(&status).Error; err != nil {
return err
}
if update.Slot <= status.CommittedSlot {
return ierrors.Wrapf(ErrLedgerUpdateSkipped, "accepted slot %d is not greater than committed slot %d", update.Slot, status.CommittedSlot)
i.lastCommittedSlotMutex.RLock()
lastCommitted := i.lastCommittedSlot
i.lastCommittedSlotMutex.RUnlock()

if update.Slot <= lastCommitted {
return ierrors.Wrapf(ErrLedgerUpdateSkipped, "accepted slot %d is not greater than last committed slot %d", update.Slot, lastCommitted)
}

spentOutputs := make(map[iotago.OutputID]struct{})
Expand All @@ -501,7 +506,7 @@ func (i *Indexer) AcceptLedgerUpdate(update *LedgerUpdate) error {
}

func (i *Indexer) CommitLedgerUpdate(update *LedgerUpdate) error {
return i.db.Transaction(func(tx *gorm.DB) error {
if err := i.db.Transaction(func(tx *gorm.DB) error {
// Cleanup uncommitted changes for this update
if err := removeUncommittedChangesUpUntilSlot(update.Slot, tx); err != nil {
return err
Expand All @@ -528,7 +533,18 @@ func (i *Indexer) CommitLedgerUpdate(update *LedgerUpdate) error {
tx.Model(&Status{}).Where("id = ?", 1).Update("committed_slot", update.Slot)

return nil
})
}); err != nil {
return err
}

i.lastCommittedSlotMutex.Lock()
defer i.lastCommittedSlotMutex.Unlock()

if i.lastCommittedSlot < update.Slot {
i.lastCommittedSlot = update.Slot
}

return nil
}

func (i *Indexer) Status() (*Status, error) {
Expand All @@ -541,10 +557,29 @@ func (i *Indexer) Status() (*Status, error) {
return nil, err
}

i.lastCommittedSlotMutex.RLock()
val := i.lastCommittedSlot
i.lastCommittedSlotMutex.RUnlock()

// Only get write lock if the new committed slot is greater than the last committed slot
if status.CommittedSlot > val {
i.lastCommittedSlotMutex.Lock()
defer i.lastCommittedSlotMutex.Unlock()

if status.CommittedSlot > i.lastCommittedSlot {
i.lastCommittedSlot = status.CommittedSlot
}
}

return status, nil
}

func (i *Indexer) Clear() error {
i.lastCommittedSlotMutex.Lock()
defer i.lastCommittedSlotMutex.Unlock()

i.lastCommittedSlot = 0

// Drop all tables
if err := i.db.Migrator().DropTable(dbTables...); err != nil {
return err
Expand Down

0 comments on commit 7c2e469

Please sign in to comment.