Skip to content

Commit

Permalink
Merge pull request #727 from oasisprotocol/ptrus/feature/consensus-ac…
Browse files Browse the repository at this point in the history
…counts-first-activity

consensus accounts first activity
  • Loading branch information
ptrus authored Jul 19, 2024
2 parents 25b10c5 + e01000c commit 9d1282a
Show file tree
Hide file tree
Showing 27 changed files with 12,118 additions and 7 deletions.
1 change: 1 addition & 0 deletions .changelog/727.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
consensus: accounts first activity
30 changes: 30 additions & 0 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
sdkConfig "github.com/oasisprotocol/oasis-sdk/client-sdk/go/config"

"github.com/oasisprotocol/nexus/analyzer/consensus/static"
"github.com/oasisprotocol/nexus/analyzer/util/addresses"
"github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api/transaction"
genesis "github.com/oasisprotocol/nexus/coreapi/v22.2.11/genesis/api"
Expand Down Expand Up @@ -115,6 +116,16 @@ func (m *processor) PreWork(ctx context.Context) error {
}
m.logger.Info("registered special addresses")

// Insert static account first activity timestamps.
batch = &storage.QueryBatch{}
if err = static.QueueConsensusAccountsFirstActivity(batch, m.history.ChainName, m.logger); err != nil {
return err
}
if err = m.target.SendBatch(ctx, batch); err != nil {
return err
}
m.logger.Info("inserted static account first active timestamps")

return nil
}

Expand All @@ -129,6 +140,14 @@ func (m *processor) FinalizeFastSync(ctx context.Context, lastFastSyncHeight int
return err
}

// Recompute account first activity for all accounts scanned during fast-sync.
batch := &storage.QueryBatch{}
m.logger.Info("computing account first activity for all accounts scanned during fast-sync")
batch.Queue(queries.ConsensusAccountsFirstActivityRecompute)
if err := m.target.SendBatch(ctx, batch); err != nil {
return fmt.Errorf("recomputing consensus accounts first activity: %w", err)
}

// Fetch a data snapshot (= genesis doc) from the node; see function docstring.
firstSlowSyncHeight := lastFastSyncHeight + 1
r, err := m.history.RecordForHeight(firstSlowSyncHeight)
Expand Down Expand Up @@ -172,6 +191,7 @@ func (m *processor) aggregateFastSyncTables(ctx context.Context) error {
if err := m.target.SendBatch(ctx, batch); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -538,6 +558,16 @@ func (m *processor) queueTxEventInserts(batch *storage.QueryBatch, data *consens
data.Height,
i,
)

if m.mode != analyzer.FastSyncMode {
// Set the first activity for sender if not set yet.
// Skip in fast sync mode; it will be recomputed at fast-sync finalization.
batch.Queue(
queries.ConsensusAccountFirstActivityUpsert,
addr,
data.BlockHeader.Time.UTC(),
)
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions analyzer/consensus/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (mg *GenesisProcessor) addStakingBackendMigrations(batch *storage.QueryBatc
account.Escrow.Active.TotalShares.ToBigInt(),
account.Escrow.Debonding.Balance.ToBigInt(),
account.Escrow.Debonding.TotalShares.ToBigInt(),
document.Time.UTC(),
)
}

Expand All @@ -188,6 +189,7 @@ func (mg *GenesisProcessor) addStakingBackendMigrations(batch *storage.QueryBatc
account.Escrow.Active.TotalShares.ToBigInt(),
account.Escrow.Debonding.Balance.ToBigInt(),
account.Escrow.Debonding.TotalShares.ToBigInt(),
document.Time.UTC(),
)
}

Expand Down
109 changes: 109 additions & 0 deletions analyzer/consensus/static/accounts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package static

import (
"embed"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/common"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage"
)

//go:embed accounts
var genesisAccountsFs embed.FS

var (
mainnetAccounts = &accounts{}
testnetAccounts = &accounts{}
)

type accounts struct {
tss []time.Time
accts [][]string
}

// Parses a file with a list of accounts. The accounts are parsed from the file (one per line);
// the metadata is parsed from the filename, which should be of the form "<name>_<timestamp>.*".
func mustReadAddrs(path string) (time.Time, []string) {
content, err := genesisAccountsFs.ReadFile(path)
if err != nil {
panic(err)
}
lines := strings.Split(strings.TrimSpace(string(content)), "\n")

accts := make([]string, 0, len(lines))
for _, line := range lines {
if !strings.HasPrefix(line, "//") {
accts = append(accts, line)
}
}

nameParts := strings.Split(filepath.Base(path), "_")
ts, err := strconv.ParseInt(strings.Split(nameParts[1], ".")[0], 10, 64)
if err != nil {
panic(err)
}

return time.Unix(ts, 0), accts
}

func init() {
mainnet := "accounts/mainnet"
testnet := "accounts/testnet"

// Mainnet.
files, err := genesisAccountsFs.ReadDir(mainnet)
if err != nil {
panic(err)
}
for _, file := range files {
ts, accts := mustReadAddrs(path.Join(mainnet, file.Name()))
mainnetAccounts.tss = append(mainnetAccounts.tss, ts)
mainnetAccounts.accts = append(mainnetAccounts.accts, accts)
}

// Testnet.
files, err = genesisAccountsFs.ReadDir(testnet)
if err != nil {
panic(err)
}
for _, file := range files {
ts, accts := mustReadAddrs(path.Join(testnet, file.Name()))
testnetAccounts.tss = append(testnetAccounts.tss, ts)
testnetAccounts.accts = append(testnetAccounts.accts, accts)
}
}

// QueueConsensusAccountsFirstActivity queues upserts for the first activity of consensus accounts.
//
// We need these static lists because we do not have access to the transaction history all the way back
// to the initial chain (pre-Cobalt). Therefore we use static lists of accounts existing at the network
// dump-restore upgrades for each pre-Cobalt upgrade and insert their approximate first_activity date.
func QueueConsensusAccountsFirstActivity(batch *storage.QueryBatch, chainName common.ChainName, logger *log.Logger) error {
var accounts *accounts
switch chainName {
case common.ChainNameMainnet:
accounts = mainnetAccounts
case common.ChainNameTestnet:
accounts = testnetAccounts
default:
return nil
}

for i := range accounts.accts {
for _, account := range accounts.accts[i] {
batch.Queue(
queries.ConsensusAccountFirstActivityUpsert,
account,
accounts.tss[i].UTC(),
)
}
}

return nil
}
Loading

0 comments on commit 9d1282a

Please sign in to comment.