Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogPoller support for MaxLogsKept #1338

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
9f9148a
Manually merged in from chainlink repo:
reductionista Sep 17, 2024
990662d
Manually merge in FilteredLogs receive []Expression from upstream
reductionista Sep 24, 2024
c738f58
use tx in insertLogsWithinTx
reductionista Sep 24, 2024
f2152f3
Add id column as PRIMARY KEY for evm.logs & evm.log_poller_blocks
reductionista Sep 17, 2024
c4192c3
Clean up db indexes
reductionista Sep 13, 2024
bac89ac
Fix 2 unrelated bugs I noticed
reductionista Sep 13, 2024
de696bb
Update ExpiredLogs query
reductionista Sep 13, 2024
9f4b268
Update test for fromBlock >= :block_number
reductionista Sep 16, 2024
5a9eb23
Increase staggering of initial pruning runs
reductionista Sep 17, 2024
741579f
Decrease retention periods for CCIP events, for testing
reductionista Sep 17, 2024
54f9ea4
Fix bug in merged commit from develop
reductionista Sep 17, 2024
bb7ca3b
restore retention periods
reductionista Sep 18, 2024
44ccdc9
Set LogPrunePageSize = 2001
reductionista Sep 18, 2024
388ca20
merge whitespace differences from chainlink repo
reductionista Sep 24, 2024
6bb5f2a
sync from chainlink repo
reductionista Sep 24, 2024
dcf3ad6
Update DeleteBlocksBefore query to use block_number index instead of …
reductionista Sep 19, 2024
ab32e1a
Merge unknown changes to tests from chainlink repo
reductionista Sep 24, 2024
27fe6c6
Changes in orm_test.go from "Split off SelectUnmatchedLogs from Delet…
reductionista Sep 24, 2024
b06ea6c
Rename LogIds & some changes to orm_test.go from chainlink repo
reductionista Sep 24, 2024
428e7f8
Fix UnmatchedLogs query
reductionista Sep 25, 2024
9c4e902
Reduce 20x to 5x for testing
reductionista Sep 25, 2024
3ff0e21
LogKeepBlocksDepth = 1000
reductionista Sep 25, 2024
b993451
Add debugging : TODO Remove!
reductionista Sep 25, 2024
012345f
Fix event_bindings merge issue
reductionista Sep 25, 2024
b9648b0
Add helper function for tickers
reductionista Sep 25, 2024
23b6617
Stagger initial unmatched logs prune
reductionista Sep 26, 2024
5e18824
Reorganize sql migration script, adding comments to clarify
reductionista Sep 26, 2024
de81d71
add WHERE evm_chain_id and handle sql.ErrNoRows
reductionista Sep 28, 2024
385f2bb
Add PruneExcessLogs for MaxLogsKept
reductionista Sep 26, 2024
8dbb78c
Update contract_transmitter.go for MaxLogsKept
reductionista Sep 26, 2024
3970897
Set MaxLogsKept to 1
reductionista Sep 5, 2024
3ba99f1
Fix unreachable code
reductionista Sep 26, 2024
e73e343
Update branch
reductionista Sep 27, 2024
9206356
Add temporary indexes for comparing sizes
reductionista Sep 27, 2024
bf980bb
Set MaxLogPageSize to 0, to test performance with no LIMIT in query
reductionista Sep 27, 2024
14cad9e
Re-use block-range paging from DeleteBlocksBefore for SelectExcessLogs
reductionista Sep 29, 2024
b121498
Add test for ExecPagedQuery
reductionista Sep 29, 2024
e1bc72c
Change LogPollerPageSize from 0 to 2000
reductionista Sep 29, 2024
826a2a1
Add block_number >= lower
reductionista Sep 29, 2024
c0e4678
Use ExecPagedQuery for SelectUnmatchedLogIDs
reductionista Sep 29, 2024
cbb4ef6
Update tests
reductionista Oct 1, 2024
94ef8ac
Fix golint errors
reductionista Oct 1, 2024
4547055
Reduce LogPollerPrunePageSize 2000 -> 1000
reductionista Oct 1, 2024
eaaf877
Merge in test for multichain issue hotfix
reductionista Oct 1, 2024
8d61e38
Improve logging
reductionista Oct 1, 2024
ef5716e
Add test for SelectExcessLogIDs
reductionista Oct 2, 2024
01c1a2f
Change sql comments to go comments
reductionista Oct 11, 2024
523effb
Only activate count-based log pruning when needed
reductionista Oct 2, 2024
56f2163
Fix setting of countBasedPruningActive flag
reductionista Oct 8, 2024
4eb72ec
Refactor ExecPagedQuery into method of generic type RangedQuery[T]
reductionista Oct 3, 2024
a83c3cb
sql.ErrRows fix
reductionista Oct 9, 2024
a7b9328
Address remaining PR comments
reductionista Oct 11, 2024
2500099
NoOfTokensWithDynamicPrice = 0
reductionista Oct 11, 2024
1dc3d7d
Lower LogPrunePageSize from 1000 to 700
reductionista Oct 12, 2024
4494f58
MaxLogsKept=0
reductionista Oct 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brown-geese-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

FilteredLogs receive Expression instead of whole KeyFilter. #internal
5 changes: 5 additions & 0 deletions .changeset/sweet-pumas-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#bugfix Addresses 2 minor issues with the pruning of LogPoller's db tables: logs not matching any filter will now be pruned, and rows deleted are now properly reported for observability
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
golang 1.22.5
golang 1.22.7
mockery 2.43.2
nodejs 20.13.1
pnpm 9.4.0
Expand All @@ -8,4 +8,4 @@ zig 0.11.0
golangci-lint 1.59.1
protoc 25.1
python 3.10.5
task 3.35.1
task 3.35.1
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash,
return nil, ErrDisabled
}

func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) {
func (d disabled) FilteredLogs(_ context.Context, _ []query.Expression, _ query.LimitAndSort, _ string) ([]Log, error) {
return nil, ErrDisabled
}

Expand Down
204 changes: 166 additions & 38 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math/big"
"math/rand"
"sort"
"strings"
"sync"
Expand All @@ -24,8 +25,8 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -68,7 +69,7 @@ type LogPoller interface {
LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// chainlink-common query filtering
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -131,7 +132,8 @@ type logPoller struct {
// Usually the only way to recover is to manually remove the offending logs and block from the database.
// LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should
// recover automatically without needing to restart the LogPoller.
finalityViolated *atomic.Bool
finalityViolated *atomic.Bool
countBasedLogPruningActive *atomic.Bool
}

type Opts struct {
Expand All @@ -157,24 +159,25 @@ type Opts struct {
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller {
return &logPoller{
stopCh: make(chan struct{}),
ec: ec,
orm: orm,
headTracker: headTracker,
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: opts.PollPeriod,
backupPollerBlockDelay: opts.BackupPollerBlockDelay,
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
stopCh: make(chan struct{}),
ec: ec,
orm: orm,
headTracker: headTracker,
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: opts.PollPeriod,
backupPollerBlockDelay: opts.BackupPollerBlockDelay,
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
countBasedLogPruningActive: new(atomic.Bool),
}
}

Expand Down Expand Up @@ -212,6 +215,12 @@ func (filter *Filter) Contains(other *Filter) bool {
if other == nil {
return true
}
if other.Retention != filter.Retention {
return false
}
if other.MaxLogsKept != filter.MaxLogsKept {
return false
}
addresses := make(map[common.Address]interface{})
for _, addr := range filter.Addresses {
addresses[addr] = struct{}{}
Expand Down Expand Up @@ -277,14 +286,17 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error {
lp.lggr.Warnw("Filter already present, no-op", "name", filter.Name, "filter", filter)
return nil
}
lp.lggr.Warnw("Updating existing filter with more events or addresses", "name", filter.Name, "filter", filter)
lp.lggr.Warnw("Updating existing filter", "name", filter.Name, "filter", filter)
}

if err := lp.orm.InsertFilter(ctx, filter); err != nil {
return pkgerrors.Wrap(err, "error inserting filter")
}
lp.filters[filter.Name] = filter
lp.filterDirty = true
if filter.MaxLogsKept > 0 {
lp.countBasedLogPruningActive.Store(true)
}
return nil
}

Expand Down Expand Up @@ -540,18 +552,47 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i
return mathutil.Min(requested, lastProcessed.BlockNumber), nil
}

// loadFilters loads the filters from db, and activates count-based Log Pruning
// if required by any of the filters
func (lp *logPoller) loadFilters(ctx context.Context) error {
filters, err := lp.lockAndLoadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}
if lp.countBasedLogPruningActive.Load() {
return nil
}
for _, filter := range filters {
if filter.MaxLogsKept != 0 {
lp.countBasedLogPruningActive.Store(true)
return nil
}
}
return nil
}

// lockAndLoadFilters is the part of loadFilters() requiring a filterMu lock
func (lp *logPoller) lockAndLoadFilters(ctx context.Context) (filters map[string]Filter, err error) {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(ctx)

filters, err = lp.orm.LoadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
return filters, err
}

lp.filters = filters
lp.filterDirty = true
return nil
return filters, nil
}

// tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period
func tickStaggeredDelay(minDelay time.Duration, period time.Duration) <-chan time.Time {
return time.After(minDelay + timeutil.JitterPct(1.0).Apply(period/2))
}

func tickWithDefaultJitter(interval time.Duration) <-chan time.Time {
return time.After(services.DefaultJitter.Apply(interval))
}

func (lp *logPoller) run() {
Expand Down Expand Up @@ -638,31 +679,62 @@ func (lp *logPoller) backgroundWorkerRun() {
ctx, cancel := lp.stopCh.NewCtx()
defer cancel()

blockPruneShortInterval := lp.pollPeriod * 100
blockPruneInterval := blockPruneShortInterval * 10
logPruneShortInterval := lp.pollPeriod * 241 // no common factors with 100
logPruneInterval := logPruneShortInterval * 10

// Avoid putting too much pressure on the database by staggering the pruning of old blocks and logs.
// Usually, node after restart will have some work to boot the plugins and other services.
// Deferring first prune by minutes reduces risk of putting too much pressure on the database.
blockPruneTick := time.After(5 * time.Minute)
logPruneTick := time.After(10 * time.Minute)
// Deferring first prune by at least 5 mins reduces risk of putting too much pressure on the database.
blockPruneTick := tickStaggeredDelay(5*time.Minute, blockPruneInterval)
logPruneTick := tickStaggeredDelay(5*time.Minute, logPruneInterval)

// Start initial prune of unmatched logs after 5-15 successful expired log prunes, so that not all chains start
// around the same time. After that, every 20 successful expired log prunes.
successfulExpiredLogPrunes := 1 + rand.Intn(4)

for {
select {
case <-ctx.Done():
return
case <-blockPruneTick:
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000))
lp.lggr.Infow("pruning old blocks")
blockPruneTick = tickWithDefaultJitter(blockPruneInterval)
if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil {
lp.lggr.Errorw("Unable to prune old blocks", "err", err)
lp.lggr.Errorw("unable to prune old blocks", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new blocks
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100))
blockPruneTick = tickWithDefaultJitter(blockPruneShortInterval)
lp.lggr.Warnw("reached page limit while pruning old blocks")
} else {
lp.lggr.Debugw("finished pruning old blocks")
}
case <-logPruneTick:
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
logPruneTick = tickWithDefaultJitter(logPruneInterval)
lp.lggr.Infof("pruning expired logs")
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
lp.lggr.Errorw("unable to prune expired logs", "err", err)
} else if !allRemoved {
lp.lggr.Warnw("reached page limit while pruning expired logs")
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
logPruneTick = tickWithDefaultJitter(logPruneShortInterval)
} else if successfulExpiredLogPrunes >= 4 {
// Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times
// since the last time unmatched logs were pruned
lp.lggr.Infof("finished pruning expired logs: pruning unmatched logs")
if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil {
lp.lggr.Errorw("unable to prune unmatched logs", "err", err)
} else if !allRemoved {
lp.lggr.Warnw("reached page limit while pruning unmatched logs")
logPruneTick = tickWithDefaultJitter(logPruneShortInterval)
} else {
lp.lggr.Debugw("finished pruning unmatched logs")
successfulExpiredLogPrunes = 0
}
} else {
lp.lggr.Debugw("finished pruning expired logs")
successfulExpiredLogPrunes++
}
}
}
Expand Down Expand Up @@ -1065,7 +1137,8 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
}

// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, then it
// will always return true unless there is an actual error.
func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
latestBlock, err := lp.orm.SelectLatestBlock(ctx)
if err != nil {
Expand All @@ -1089,10 +1162,46 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// PruneExpiredLogs logs that are older than their retention period defined in Filter.
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
// PruneExpiredLogs will attempt to remove any logs which have passed their retention period. Returns whether all expired
// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
done := true

rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize)
if err != nil {
lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err)
return false, err
} else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize {
done = false
}

if !lp.countBasedLogPruningActive.Load() {
return done, err
}

rowIDs, err := lp.orm.SelectExcessLogIDs(ctx, lp.logPrunePageSize)
if err != nil {
lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err)
return false, err
}
rowsRemoved, err = lp.orm.DeleteLogsByRowID(ctx, rowIDs)
if err != nil {
lp.lggr.Errorw("Unable to prune excess logs", "err", err)
} else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize {
done = false
}
return done, err
}

// PruneUnmatchedLogs will attempt to remove any logs which no longer match a registered filter. Returns whether all unmatched
// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered
func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) {
ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize)
if err != nil {
return false, err
}
rowsRemoved, err := lp.orm.DeleteLogsByRowID(ctx, ids)

return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

Expand Down Expand Up @@ -1518,6 +1627,25 @@ func EvmWord(i uint64) common.Hash {
return common.BytesToHash(b)
}

func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName)
}

// Where is a query.Where wrapper that ignores the Key and returns a slice of query.Expression rather than query.KeyFilter.
// If no expressions are provided, or an error occurs, an empty slice is returned.
func Where(expressions ...query.Expression) ([]query.Expression, error) {
filter, err := query.Where(
"",
expressions...,
)

if err != nil {
return []query.Expression{}, err
}

if filter.Expressions == nil {
return []query.Expression{}, nil
}

return filter.Expressions, nil
}
Loading
Loading