Skip to content

Commit

Permalink
Cherry picking some LogPoller changes from the main repo (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara authored Nov 16, 2023
2 parents b25df79 + a4b98b9 commit e275ee8
Show file tree
Hide file tree
Showing 51 changed files with 377 additions and 201 deletions.
4 changes: 3 additions & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (disabled) UnregisterFilter(name string, qopts ...pg.QOpt) error { return E

func (disabled) HasFilter(name string) bool { return false }

func (disabled) LatestBlock(qopts ...pg.QOpt) (int64, error) { return -1, ErrDisabled }
func (disabled) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) {
return LogPollerBlock{}, ErrDisabled
}

func (disabled) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {
return nil, ErrDisabled
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 {
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber)
latest, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(ctx))
return latest + 1
return latest.BlockNumber + 1
}

func (th *TestHarness) assertDontHave(t *testing.T, start, end int) {
Expand Down
47 changes: 11 additions & 36 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type LogPoller interface {
RegisterFilter(filter Filter, qopts ...pg.QOpt) error
UnregisterFilter(name string, qopts ...pg.QOpt) error
HasFilter(name string) bool
LatestBlock(qopts ...pg.QOpt) (int64, error)
LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error)
GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error)

// General querying
Expand Down Expand Up @@ -676,9 +676,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
}

lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks)
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithQueryer(tx))
})
err = lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithParentCtx(ctx))
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
return err
Expand Down Expand Up @@ -747,21 +745,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
// These deletes are bounded by reorg depth, so they are
// fast and should not slow down the log readers.
err3 := lp.orm.DeleteBlocksAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
if err3 != nil {
lp.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err3)
return err3
}
err3 = lp.orm.DeleteLogsAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
if err3 != nil {
lp.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err3)
return err3
}
return nil
})
err2 = lp.orm.DeleteLogsAndBlocksAfter(blockAfterLCA.Number, pg.WithParentCtx(ctx))
if err2 != nil {
// If we error on db commit, we can't know if the tx went through or not.
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
Expand Down Expand Up @@ -846,20 +830,11 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
return
}
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp.Unix())
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber, pg.WithQueryer(tx)); err2 != nil {
return err2
}
if len(logs) == 0 {
return nil
}
return lp.orm.InsertLogs(convertLogs(logs,
[]LogPollerBlock{{BlockNumber: currentBlockNumber,
BlockTimestamp: currentBlock.Timestamp}},
lp.lggr,
lp.ec.ConfiguredChainID(),
), pg.WithQueryer(tx))
})
block := NewLogPollerBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber)
err = lp.orm.InsertLogsWithBlock(
convertLogs(logs, []LogPollerBlock{block}, lp.lggr, lp.ec.ConfiguredChainID()),
block,
)
if err != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
return
Expand Down Expand Up @@ -1018,13 +993,13 @@ func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.

// LatestBlock returns the latest block the log poller is on. It tracks blocks to be able
// to detect reorgs.
func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) {
func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) {
b, err := lp.orm.SelectLatestBlock(qopts...)
if err != nil {
return 0, err
return LogPollerBlock{}, err
}

return b.BlockNumber, nil
return *b, nil
}

func (lp *logPoller) BlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) {
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestLogPoller_Replay(t *testing.T) {
lp.PollAndSaveLogs(tctx, 4)
latest, err := lp.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(4), latest)
require.Equal(t, int64(4), latest.BlockNumber)

t.Run("abort before replayStart received", func(t *testing.T) {
// Replay() should abort immediately if caller's context is cancelled before request signal is read
Expand Down
39 changes: 25 additions & 14 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func Test_BackupLogPoller(t *testing.T) {
body.Transactions = types.Transactions{} // number of tx's must match # of logs for GetLogs() to succeed
rawdb.WriteBody(th.EthDB, h.Hash(), h.Number.Uint64(), body)

currentBlock := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(35), currentBlock)
currentBlockNumber := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(35), currentBlockNumber)

// simulate logs becoming available
rawdb.WriteReceipts(th.EthDB, h.Hash(), h.Number.Uint64(), receipts)
Expand Down Expand Up @@ -342,12 +342,12 @@ func Test_BackupLogPoller(t *testing.T) {
markBlockAsFinalized(t, th, 34)

// Run ordinary poller + backup poller at least once
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1)
currentBlock, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1)
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))

require.Equal(t, int64(37), currentBlock+1)
require.Equal(t, int64(37), currentBlock.BlockNumber+1)

// logs still shouldn't show up, because we don't want to backfill the last finalized log
// to help with reorg detection
Expand All @@ -359,11 +359,11 @@ func Test_BackupLogPoller(t *testing.T) {
markBlockAsFinalized(t, th, 35)

// Run ordinary poller + backup poller at least once more
th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1)
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1)
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))

require.Equal(t, int64(38), currentBlock+1)
require.Equal(t, int64(38), currentBlock.BlockNumber+1)

// all 3 logs in block 34 should show up now, thanks to backup logger
logs, err = th.LogPoller.Logs(30, 37, EmitterABI.Events["Log1"].ID, th.EmitterAddress1,
Expand Down Expand Up @@ -471,6 +471,13 @@ func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) {
// 1 -> 2 -> ...
th.PollAndSaveLogs(ctx, 1)

// Check that latest block has the same properties as the head
latestBlock, err := th.LogPoller.LatestBlock()
require.NoError(t, err)
assert.Equal(t, latestBlock.BlockNumber, header.Number.Int64())
assert.Equal(t, latestBlock.FinalizedBlockNumber, header.Number.Int64())
assert.Equal(t, latestBlock.BlockHash, header.Hash())

// Register filter
err = th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "Test Emitter",
Expand Down Expand Up @@ -619,7 +626,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
require.Len(t, gethLogs, 2)

lb, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.PollAndSaveLogs(context.Background(), lb+1)
th.PollAndSaveLogs(context.Background(), lb.BlockNumber+1)
lg1, err := th.LogPoller.Logs(0, 20, EmitterABI.Events["Log1"].ID, th.EmitterAddress1,
pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
Expand Down Expand Up @@ -667,9 +674,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1.
ec.Commit()
}
currentBlock := int64(1)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock)
currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
currentBlockNumber := int64(1)
lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber)
currentBlock, err := lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
matchesGeth := func() bool {
// Check every block is identical
Expand Down Expand Up @@ -719,7 +726,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
require.NoError(t, err1)
t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash())
}
lp.PollAndSaveLogs(testutils.Context(t), currentBlock)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber)
currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
}
Expand Down Expand Up @@ -1245,7 +1252,7 @@ func TestGetReplayFromBlock(t *testing.T) {
require.NoError(t, err)
latest, err := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
assert.Equal(t, latest, fromBlock)
assert.Equal(t, latest.BlockNumber, fromBlock)

// Should take min(latest, requested) in this case requested.
requested = int64(7)
Expand Down Expand Up @@ -1551,6 +1558,10 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000)
// Should return error before the first poll and save
_, err := th.LogPoller.LatestBlock()
require.Error(t, err)

// Mark first block as finalized
h := th.Client.Blockchain().CurrentHeader()
th.Client.Blockchain().SetFinalized(h)
Expand All @@ -1562,7 +1573,7 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {

th.PollAndSaveLogs(ctx, 1)

latestBlock, err := th.ORM.SelectLatestBlock()
latestBlock, err := th.LogPoller.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(numberOfBlocks), latestBlock.BlockNumber)
require.Equal(t, tt.expectedFinalizedBlock, latestBlock.FinalizedBlockNumber)
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions core/chains/evm/logpoller/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,12 @@ func (l *Log) ToGethLog() types.Log {
Index: uint(l.LogIndex),
}
}

func NewLogPollerBlock(blockHash common.Hash, blockNumber int64, timestamp time.Time, finalizedBlockNumber int64) LogPollerBlock {
return LogPollerBlock{
BlockHash: blockHash,
BlockNumber: blockNumber,
BlockTimestamp: timestamp,
FinalizedBlockNumber: finalizedBlockNumber,
}
}
22 changes: 6 additions & 16 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,15 @@ func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QC
}
}

func (o *ObservedORM) Q() pg.Q {
return o.ORM.Q()
}

func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogs", func() error {
return o.ORM.InsertLogs(logs, qopts...)
})
}

func (o *ObservedORM) InsertBlock(hash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlock int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertBlock", func() error {
return o.ORM.InsertBlock(hash, blockNumber, blockTimestamp, lastFinalizedBlock, qopts...)
func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogsWithBlock", func() error {
return o.ORM.InsertLogsWithBlock(logs, block, qopts...)
})
}

Expand All @@ -102,21 +98,15 @@ func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error {
})
}

func (o *ObservedORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksAfter", func() error {
return o.ORM.DeleteBlocksAfter(start, qopts...)
})
}

func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksBefore", func() error {
return o.ORM.DeleteBlocksBefore(end, qopts...)
})
}

func (o *ObservedORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAfter", func() error {
return o.ORM.DeleteLogsAfter(start, qopts...)
func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error {
return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...)
})
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMultipleMetricsArePublished(t *testing.T) {
_, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(0, []common.Address{{}}, []common.Hash{{}}, 1, pg.WithParentCtx(ctx))
_, _ = orm.SelectIndexedLogsCreatedAfter(common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx))
_ = orm.InsertLogs([]Log{}, pg.WithParentCtx(ctx))
_ = orm.InsertBlock(common.Hash{}, 1, time.Now(), 0, pg.WithParentCtx(ctx))
_ = orm.InsertLogsWithBlock([]Log{}, NewLogPollerBlock(common.Hash{}, 1, time.Now(), 0), pg.WithParentCtx(ctx))

require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration))
require.Equal(t, 10, testutil.CollectAndCount(orm.datasetSize))
Expand Down
Loading

0 comments on commit e275ee8

Please sign in to comment.