Skip to content

Commit

Permalink
Implemented ref counts for multiaddresses
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsporn committed Sep 26, 2023
1 parent 79cf158 commit 7e27b6d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 37 deletions.
59 changes: 40 additions & 19 deletions pkg/indexer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ func typeOf[T any]() string {
return reflect.TypeOf(t).Elem().Name()
}

func typeIsRefCountable[T any]() bool {
//nolint:gocritic // We cannot use T(nil) here
_, ok := interface{}(new(T)).(refCountable)
return ok
}

type refCountable interface {
refCountDelta() int
}

type batcher[T any] struct {
*logger.WrappedLogger

Expand Down Expand Up @@ -92,25 +102,24 @@ func (b *batcher[T]) Run(ctx context.Context, workerCount int) {
type inserter[T any] struct {
*logger.WrappedLogger

name string
db *gorm.DB
wg sync.WaitGroup
ignoreConflicts bool
name string
db *gorm.DB
wg sync.WaitGroup
}

func newImporter[T any](db *gorm.DB, log *logger.Logger, ignoreConflicts bool) *inserter[T] {
func newImporter[T any](db *gorm.DB, log *logger.Logger) *inserter[T] {
w := &inserter[T]{
WrappedLogger: logger.NewWrappedLogger(log),
name: typeOf[T](),
db: db,
ignoreConflicts: ignoreConflicts,
WrappedLogger: logger.NewWrappedLogger(log),
name: typeOf[T](),
db: db,
}

return w
}

//nolint:golint,revive // false positive.
func (i *inserter[T]) Run(ctx context.Context, workerCount int, input <-chan []T) {
useRefCounts := typeIsRefCountable[T]()
for n := 0; n < workerCount; n++ {
workerName := fmt.Sprintf("inserter-%s-%d", i.name, n)
i.wg.Add(1)
Expand All @@ -131,8 +140,20 @@ func (i *inserter[T]) Run(ctx context.Context, workerCount int, input <-chan []T

batch := b
if err := i.db.Transaction(func(tx *gorm.DB) error {
if i.ignoreConflicts {
tx = tx.Clauses(clause.OnConflict{DoNothing: true})
if useRefCounts {
for _, item := range batch {
if itemWithRefCount, ok := interface{}(item).(refCountable); ok {
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.Assignments(map[string]interface{}{"ref_count": gorm.Expr("ref_count + ?", itemWithRefCount.refCountDelta())}),
}).Create(item).Error; err != nil {
return err
}
} else {
return fmt.Errorf("item %T does not implement RefCountable", item)
}
}
return nil
}

return tx.Create(batch).Error
Expand All @@ -158,10 +179,10 @@ type processor[T fmt.Stringer] struct {
importer *inserter[T]
}

func newProcessor[T fmt.Stringer](ctx context.Context, db *gorm.DB, log *logger.Logger, ignoreConflicts bool) *processor[T] {
func newProcessor[T fmt.Stringer](ctx context.Context, db *gorm.DB, log *logger.Logger) *processor[T] {
p := &processor[T]{
batcher: newBatcher[T](log),
importer: newImporter[T](db, log, ignoreConflicts),
importer: newImporter[T](db, log),
}
p.batcher.Run(ctx, perBatcherWorkers)
p.importer.Run(ctx, perImporterWorkers, p.batcher.output)
Expand Down Expand Up @@ -210,12 +231,12 @@ func newImportTransaction(ctx context.Context, db *gorm.DB, log *logger.Logger)
t := &ImportTransaction{
WrappedLogger: logger.NewWrappedLogger(log),
db: dbSession,
basic: newProcessor[*basicOutput](ctx, dbSession, log, false),
nft: newProcessor[*nft](ctx, dbSession, log, false),
account: newProcessor[*account](ctx, dbSession, log, false),
foundry: newProcessor[*foundry](ctx, dbSession, log, false),
delegation: newProcessor[*delegation](ctx, dbSession, log, false),
multiAddress: newProcessor[*multiaddress](ctx, dbSession, log, true),
basic: newProcessor[*basicOutput](ctx, dbSession, log),
nft: newProcessor[*nft](ctx, dbSession, log),
account: newProcessor[*account](ctx, dbSession, log),
foundry: newProcessor[*foundry](ctx, dbSession, log),
delegation: newProcessor[*delegation](ctx, dbSession, log),
multiAddress: newProcessor[*multiaddress](ctx, dbSession, log),
}

return t
Expand Down
11 changes: 2 additions & 9 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package indexer
import (
"github.com/pkg/errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"

"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/inx-app/pkg/nodebridge"

Check failure on line 8 in pkg/indexer/indexer.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/indexer/indexer.go#L8

could not import github.com/iotaledger/inx-app/pkg/nodebridge (-: # github.com/iotaledger/inx-app/pkg/nodebridge
Raw output
pkg/indexer/indexer.go:8:2: could not import github.com/iotaledger/inx-app/pkg/nodebridge (-: # github.com/iotaledger/inx-app/pkg/nodebridge
Expand Down Expand Up @@ -32,7 +31,6 @@ type Indexer struct {
}

func NewIndexer(dbParams database.Params, log *logger.Logger) (*Indexer, error) {

db, engine, err := database.NewWithDefaultSettings(dbParams, true, log)
if err != nil {
return nil, err
Expand Down Expand Up @@ -114,7 +112,7 @@ func processSpent(spent *inx.LedgerSpent, api iotago.API, tx *gorm.DB) error {
return tx.Where("output_id = ?", outputID[:]).Delete(&delegation{}).Error
}

return nil
return deleteMultiAddressesFromAddresses(tx, addressesInOutput(iotaOutput))
}

func processOutput(output *inx.LedgerOutput, api iotago.API, tx *gorm.DB) error {
Expand All @@ -134,12 +132,7 @@ func processOutput(output *inx.LedgerOutput, api iotago.API, tx *gorm.DB) error
return err
}

multiAddresses, err := multiAddressesForAddresses(addressesInOutput(unwrapped)...)
if err != nil {
return err
}

return tx.Clauses(clause.OnConflict{DoNothing: true}).Create(multiAddresses).Error
return insertMultiAddressesFromAddresses(tx, addressesInOutput(unwrapped))
}

func entryForOutput(outputID iotago.OutputID, output iotago.Output, slotBooked iotago.SlotIndex) (interface{}, error) {
Expand Down
64 changes: 55 additions & 9 deletions pkg/indexer/multiaddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,27 @@ import (
"encoding/hex"
"fmt"

"gorm.io/gorm"
"gorm.io/gorm/clause"

"github.com/iotaledger/hive.go/lo"
iotago "github.com/iotaledger/iota.go/v4"
)

type multiaddress struct {
AddressID []byte `gorm:"primaryKey;notnull"`
Data []byte `gorm:"notnull"`
RefCount int
}

func (m *multiaddress) String() string {
return fmt.Sprintf("multiaddress => AddressID: %s", hex.EncodeToString(m.AddressID))
}

func (m *multiaddress) refCountDelta() int {
return m.RefCount
}

func multiAddressesForAddresses(addresses ...iotago.Address) ([]*multiaddress, error) {
multiAddressFromAddress := func(address iotago.Address) *iotago.MultiAddress {
if multi, isMulti := address.(*iotago.MultiAddress); isMulti {
Expand All @@ -33,28 +42,65 @@ func multiAddressesForAddresses(addresses ...iotago.Address) ([]*multiaddress, e
}

// Store all passed addresses if they are or contain a MultiAddress.
// We also de-dup the addresses here to avoid double insertions.
alreadyKnownAddresses := make(map[string]struct{}, 0)
multiAddresses := make([]*multiaddress, 0)
// We increase the counter for duplicate addresses
multiAddresses := make(map[string]*multiaddress, 0)
for _, address := range addresses {
if multiAddress := multiAddressFromAddress(address); multiAddress != nil {
if _, alreadyKnown := alreadyKnownAddresses[multiAddress.Key()]; !alreadyKnown {
if multiAddr, alreadyKnown := multiAddresses[multiAddress.Key()]; !alreadyKnown {
addrData, err := iotago.CommonSerixAPI().Encode(context.TODO(), multiAddress)
if err != nil {
return nil, err
}

multiAddresses = append(multiAddresses, &multiaddress{
multiAddresses[multiAddress.Key()] = &multiaddress{
AddressID: multiAddress.ID(),
Data: addrData,
})

alreadyKnownAddresses[multiAddress.Key()] = struct{}{}
RefCount: 1,
}
} else {
multiAddr.RefCount++
}
}
}

return multiAddresses, nil
return lo.Values(multiAddresses), nil
}

func insertMultiAddressesFromAddresses(tx *gorm.DB, addresses []iotago.Address) error {
multiAddresses, err := multiAddressesForAddresses(addresses...)
if err != nil {
return err
}

for _, multiAddr := range multiAddresses {
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.Assignments(map[string]interface{}{"ref_count": gorm.Expr("ref_count + ?", multiAddr.RefCount)}),
}).Create(multiAddresses).Error; err != nil {
return err
}
}

return nil
}

func deleteMultiAddressesFromAddresses(tx *gorm.DB, addresses []iotago.Address) error {
multiAddresses, err := multiAddressesForAddresses(addresses...)
if err != nil {
return err
}

for _, multiAddr := range multiAddresses {
if err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.Assignments(map[string]interface{}{"ref_count": gorm.Expr("ref_count - ?", multiAddr.RefCount)}),
}).Create(multiAddresses).Error; err != nil {
return err
}
}

// Delete all addresses where no references exist anymore
return tx.Where("ref_count = ?", 0).Delete(&multiaddress{}).Error
}

func (i *Indexer) MultiAddressForReference(address *iotago.MultiAddressReference) (*iotago.MultiAddress, error) {
Expand Down

0 comments on commit 7e27b6d

Please sign in to comment.