From 7e27b6dc2025aa38fce0ae126f7d1284078966c3 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 26 Sep 2023 12:31:39 +0200 Subject: [PATCH] Implemented ref counts for multiaddresses --- pkg/indexer/import.go | 59 +++++++++++++++++++++++----------- pkg/indexer/indexer.go | 11 ++----- pkg/indexer/multiaddress.go | 64 +++++++++++++++++++++++++++++++------ 3 files changed, 97 insertions(+), 37 deletions(-) diff --git a/pkg/indexer/import.go b/pkg/indexer/import.go index ff916ff..bb13292 100644 --- a/pkg/indexer/import.go +++ b/pkg/indexer/import.go @@ -30,6 +30,16 @@ func typeOf[T any]() string { return reflect.TypeOf(t).Elem().Name() } +func typeIsRefCountable[T any]() bool { + //nolint:gocritic // We cannot use T(nil) here + _, ok := interface{}(new(T)).(refCountable) + return ok +} + +type refCountable interface { + refCountDelta() int +} + type batcher[T any] struct { *logger.WrappedLogger @@ -92,18 +102,16 @@ func (b *batcher[T]) Run(ctx context.Context, workerCount int) { type inserter[T any] struct { *logger.WrappedLogger - name string - db *gorm.DB - wg sync.WaitGroup - ignoreConflicts bool + name string + db *gorm.DB + wg sync.WaitGroup } -func newImporter[T any](db *gorm.DB, log *logger.Logger, ignoreConflicts bool) *inserter[T] { +func newImporter[T any](db *gorm.DB, log *logger.Logger) *inserter[T] { w := &inserter[T]{ - WrappedLogger: logger.NewWrappedLogger(log), - name: typeOf[T](), - db: db, - ignoreConflicts: ignoreConflicts, + WrappedLogger: logger.NewWrappedLogger(log), + name: typeOf[T](), + db: db, } return w @@ -111,6 +119,7 @@ func newImporter[T any](db *gorm.DB, log *logger.Logger, ignoreConflicts bool) * //nolint:golint,revive // false positive. func (i *inserter[T]) Run(ctx context.Context, workerCount int, input <-chan []T) { + useRefCounts := typeIsRefCountable[T]() for n := 0; n < workerCount; n++ { workerName := fmt.Sprintf("inserter-%s-%d", i.name, n) i.wg.Add(1) @@ -131,8 +140,20 @@ func (i *inserter[T]) Run(ctx context.Context, workerCount int, input <-chan []T batch := b if err := i.db.Transaction(func(tx *gorm.DB) error { - if i.ignoreConflicts { - tx = tx.Clauses(clause.OnConflict{DoNothing: true}) + if useRefCounts { + for _, item := range batch { + if itemWithRefCount, ok := interface{}(item).(refCountable); ok { + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.Assignments(map[string]interface{}{"ref_count": gorm.Expr("ref_count + ?", itemWithRefCount.refCountDelta())}), + }).Create(item).Error; err != nil { + return err + } + } else { + return fmt.Errorf("item %T does not implement RefCountable", item) + } + } + return nil } return tx.Create(batch).Error @@ -158,10 +179,10 @@ type processor[T fmt.Stringer] struct { importer *inserter[T] } -func newProcessor[T fmt.Stringer](ctx context.Context, db *gorm.DB, log *logger.Logger, ignoreConflicts bool) *processor[T] { +func newProcessor[T fmt.Stringer](ctx context.Context, db *gorm.DB, log *logger.Logger) *processor[T] { p := &processor[T]{ batcher: newBatcher[T](log), - importer: newImporter[T](db, log, ignoreConflicts), + importer: newImporter[T](db, log), } p.batcher.Run(ctx, perBatcherWorkers) p.importer.Run(ctx, perImporterWorkers, p.batcher.output) @@ -210,12 +231,12 @@ func newImportTransaction(ctx context.Context, db *gorm.DB, log *logger.Logger) t := &ImportTransaction{ WrappedLogger: logger.NewWrappedLogger(log), db: dbSession, - basic: newProcessor[*basicOutput](ctx, dbSession, log, false), - nft: newProcessor[*nft](ctx, dbSession, log, false), - account: newProcessor[*account](ctx, dbSession, log, false), - foundry: newProcessor[*foundry](ctx, dbSession, log, false), - delegation: newProcessor[*delegation](ctx, dbSession, log, false), - multiAddress: newProcessor[*multiaddress](ctx, dbSession, log, true), + basic: newProcessor[*basicOutput](ctx, dbSession, log), + nft: newProcessor[*nft](ctx, dbSession, log), + account: newProcessor[*account](ctx, dbSession, log), + foundry: newProcessor[*foundry](ctx, dbSession, log), + delegation: newProcessor[*delegation](ctx, dbSession, log), + multiAddress: newProcessor[*multiaddress](ctx, dbSession, log), } return t diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 336c3ef..b03bbd9 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -3,7 +3,6 @@ package indexer import ( "github.com/pkg/errors" "gorm.io/gorm" - "gorm.io/gorm/clause" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/inx-app/pkg/nodebridge" @@ -32,7 +31,6 @@ type Indexer struct { } func NewIndexer(dbParams database.Params, log *logger.Logger) (*Indexer, error) { - db, engine, err := database.NewWithDefaultSettings(dbParams, true, log) if err != nil { return nil, err @@ -114,7 +112,7 @@ func processSpent(spent *inx.LedgerSpent, api iotago.API, tx *gorm.DB) error { return tx.Where("output_id = ?", outputID[:]).Delete(&delegation{}).Error } - return nil + return deleteMultiAddressesFromAddresses(tx, addressesInOutput(iotaOutput)) } func processOutput(output *inx.LedgerOutput, api iotago.API, tx *gorm.DB) error { @@ -134,12 +132,7 @@ func processOutput(output *inx.LedgerOutput, api iotago.API, tx *gorm.DB) error return err } - multiAddresses, err := multiAddressesForAddresses(addressesInOutput(unwrapped)...) - if err != nil { - return err - } - - return tx.Clauses(clause.OnConflict{DoNothing: true}).Create(multiAddresses).Error + return insertMultiAddressesFromAddresses(tx, addressesInOutput(unwrapped)) } func entryForOutput(outputID iotago.OutputID, output iotago.Output, slotBooked iotago.SlotIndex) (interface{}, error) { diff --git a/pkg/indexer/multiaddress.go b/pkg/indexer/multiaddress.go index 40f60bd..30cca69 100644 --- a/pkg/indexer/multiaddress.go +++ b/pkg/indexer/multiaddress.go @@ -5,18 +5,27 @@ import ( "encoding/hex" "fmt" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/iotaledger/hive.go/lo" iotago "github.com/iotaledger/iota.go/v4" ) type multiaddress struct { AddressID []byte `gorm:"primaryKey;notnull"` Data []byte `gorm:"notnull"` + RefCount int } func (m *multiaddress) String() string { return fmt.Sprintf("multiaddress => AddressID: %s", hex.EncodeToString(m.AddressID)) } +func (m *multiaddress) refCountDelta() int { + return m.RefCount +} + func multiAddressesForAddresses(addresses ...iotago.Address) ([]*multiaddress, error) { multiAddressFromAddress := func(address iotago.Address) *iotago.MultiAddress { if multi, isMulti := address.(*iotago.MultiAddress); isMulti { @@ -33,28 +42,65 @@ func multiAddressesForAddresses(addresses ...iotago.Address) ([]*multiaddress, e } // Store all passed addresses if they are or contain a MultiAddress. - // We also de-dup the addresses here to avoid double insertions. - alreadyKnownAddresses := make(map[string]struct{}, 0) - multiAddresses := make([]*multiaddress, 0) + // We increase the counter for duplicate addresses + multiAddresses := make(map[string]*multiaddress, 0) for _, address := range addresses { if multiAddress := multiAddressFromAddress(address); multiAddress != nil { - if _, alreadyKnown := alreadyKnownAddresses[multiAddress.Key()]; !alreadyKnown { + if multiAddr, alreadyKnown := multiAddresses[multiAddress.Key()]; !alreadyKnown { addrData, err := iotago.CommonSerixAPI().Encode(context.TODO(), multiAddress) if err != nil { return nil, err } - multiAddresses = append(multiAddresses, &multiaddress{ + multiAddresses[multiAddress.Key()] = &multiaddress{ AddressID: multiAddress.ID(), Data: addrData, - }) - - alreadyKnownAddresses[multiAddress.Key()] = struct{}{} + RefCount: 1, + } + } else { + multiAddr.RefCount++ } } } - return multiAddresses, nil + return lo.Values(multiAddresses), nil +} + +func insertMultiAddressesFromAddresses(tx *gorm.DB, addresses []iotago.Address) error { + multiAddresses, err := multiAddressesForAddresses(addresses...) + if err != nil { + return err + } + + for _, multiAddr := range multiAddresses { + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.Assignments(map[string]interface{}{"ref_count": gorm.Expr("ref_count + ?", multiAddr.RefCount)}), + }).Create(multiAddresses).Error; err != nil { + return err + } + } + + return nil +} + +func deleteMultiAddressesFromAddresses(tx *gorm.DB, addresses []iotago.Address) error { + multiAddresses, err := multiAddressesForAddresses(addresses...) + if err != nil { + return err + } + + for _, multiAddr := range multiAddresses { + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.Assignments(map[string]interface{}{"ref_count": gorm.Expr("ref_count - ?", multiAddr.RefCount)}), + }).Create(multiAddresses).Error; err != nil { + return err + } + } + + // Delete all addresses where no references exist anymore + return tx.Where("ref_count = ?", 0).Delete(&multiaddress{}).Error } func (i *Indexer) MultiAddressForReference(address *iotago.MultiAddressReference) (*iotago.MultiAddress, error) {