From f8d5b198466f749ece3da873400a617e4714861c Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Wed, 20 Nov 2024 16:11:37 +0100 Subject: [PATCH 1/2] Restoring previous version of pruning (without left join) --- core/chains/evm/logpoller/orm.go | 32 +++++----- core/chains/evm/logpoller/orm_test.go | 61 +++---------------- .../tomls/ccip1.4-stress/baseline.toml | 2 +- 3 files changed, 27 insertions(+), 68 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 0b5a8f4bd4..c574124b98 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -314,30 +314,34 @@ type Exp struct { ShouldDelete bool } -// DeleteExpiredLogs removes any logs which either: -// - don't match any currently registered filters, or -// - have a timestamp older than any matching filter's retention, UNLESS there is at -// least one matching filter with retention=0 func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { var err error var result sql.Result - query := `DELETE FROM evm.logs + if limit > 0 { + result, err = o.ds.ExecContext(ctx, ` + DELETE FROM evm.logs WHERE (evm_chain_id, address, event_sig, block_number) IN ( SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number FROM evm.logs l - LEFT JOIN ( - SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention + INNER JOIN ( + SELECT address, event, MAX(retention) AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event - ) r ON l.address = r.address AND l.event_sig = r.event - WHERE l.evm_chain_id = $1 AND -- Must be WHERE rather than ON due to LEFT JOIN - r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)` - - if limit > 0 { - result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit) + HAVING NOT 0 = ANY(ARRAY_AGG(retention)) + ) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event + AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') + LIMIT $2 + )`, ubig.New(o.chainID), limit) } else { - result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID)) + result, err = o.ds.ExecContext(ctx, `WITH r AS + ( SELECT address, event, MAX(retention) AS retention + FROM evm.log_poller_filters WHERE evm_chain_id=$1 + GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention)) + ) DELETE FROM evm.logs l USING r + WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event + AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT) + ubig.New(o.chainID)) } if err != nil { diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index ba66e166eb..5ba1071a88 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -187,7 +187,6 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { } func TestORM(t *testing.T) { - t.Parallel() th := SetupTH(t, lpOpts) o1 := th.ORM o2 := th.ORM2 @@ -335,36 +334,6 @@ func TestORM(t *testing.T) { }, })) - // Insert a couple logs on a different chain, to make sure - // these aren't affected by any operations on the chain LogPoller - // is managing. - require.NoError(t, o2.InsertLogs(ctx, []logpoller.Log{ - { - EvmChainId: ubig.New(th.ChainID2), - LogIndex: 8, - BlockHash: common.HexToHash("0x1238"), - BlockNumber: int64(17), - EventSig: topic2, - Topics: [][]byte{topic2[:]}, - Address: common.HexToAddress("0x1236"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("same log on unrelated chain"), - BlockTimestamp: time.Now(), - }, - { - EvmChainId: ubig.New(th.ChainID2), - LogIndex: 9, - BlockHash: common.HexToHash("0x1999"), - BlockNumber: int64(18), - EventSig: topic, - Topics: [][]byte{topic[:], topic2[:]}, - Address: common.HexToAddress("0x5555"), - TxHash: common.HexToHash("0x1543"), - Data: []byte("different log on unrelated chain"), - BlockTimestamp: time.Now(), - }, - })) - t.Log(latest.BlockNumber) logs, err := o1.SelectLogsByBlockRange(ctx, 1, 17) require.NoError(t, err) @@ -485,38 +454,24 @@ func TestORM(t *testing.T) { require.NoError(t, err) require.Len(t, logs, 8) - // Delete expired logs with page limit + // Delete expired logs time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period - deleted, err := o1.DeleteExpiredLogs(ctx, 2) + deleted, err := o1.DeleteExpiredLogs(ctx, 0) require.NoError(t, err) - assert.Equal(t, int64(2), deleted) - - // Delete expired logs without page limit - deleted, err = o1.DeleteExpiredLogs(ctx, 0) - require.NoError(t, err) - assert.Equal(t, int64(2), deleted) - - // Ensure that both of the logs from the second chain are still there - logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2) - require.NoError(t, err) - assert.Len(t, logs, 1) - logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x5555"), topic) - require.NoError(t, err) - assert.Len(t, logs, 1) - + assert.Equal(t, int64(1), deleted) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - // It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all - // 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1 - // of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12. - assert.Len(t, logs, 4) + // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) + // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything + // matching filter12 should be kept regardless of what other filters it matches. + assert.Len(t, logs, 7) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) require.NoError(t, err) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - assert.Zero(t, len(logs)) + require.Zero(t, len(logs)) } type PgxLogger struct { diff --git a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml index d78cd12595..6244eaccff 100644 --- a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml +++ b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml @@ -15,7 +15,7 @@ # If you want to use a specific commit or a branch you need to switch to the internal ECR in `~/.testsecrets` # E2E_TEST_CHAINLINK_IMAGE=".dkr.ecr..amazonaws.com/chainlink-ccip" [CCIP.Env.NewCLCluster.Common.ChainlinkImage] -version = "2.14.0-ccip1.5.0" +version = "2.17.0-ccip1.5.11-beta.0" [CCIP] [CCIP.ContractVersions] From 802803025ed2fd9ec68fb357fd02cf208e994a9e Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 21 Nov 2024 08:18:26 +0100 Subject: [PATCH 2/2] Restoring previous version of pruning (without left join) --- .../evm/logpoller/observability_test.go | 7 +-- core/chains/evm/logpoller/orm_test.go | 46 +++++++++++++++++-- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 2f502438bb..826c39d3a2 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -119,12 +119,7 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420"))) - rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3) - require.NoError(t, err) - require.Equal(t, int64(3), rowsAffected) - assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) - - rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0) + rowsAffected, err := orm.DeleteBlocksBefore(ctx, 30, 0) require.NoError(t, err) require.Equal(t, int64(2), rowsAffected) assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete")) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 5ba1071a88..5809b55d45 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -187,6 +187,7 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { } func TestORM(t *testing.T) { + t.Parallel() th := SetupTH(t, lpOpts) o1 := th.ORM o2 := th.ORM2 @@ -334,6 +335,36 @@ func TestORM(t *testing.T) { }, })) + // Insert a couple logs on a different chain, to make sure + // these aren't affected by any operations on the chain LogPoller + // is managing. + require.NoError(t, o2.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 8, + BlockHash: common.HexToHash("0x1238"), + BlockNumber: int64(17), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("same log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 9, + BlockHash: common.HexToHash("0x1999"), + BlockNumber: int64(18), + EventSig: topic, + Topics: [][]byte{topic[:], topic2[:]}, + Address: common.HexToAddress("0x5555"), + TxHash: common.HexToHash("0x1543"), + Data: []byte("different log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + })) + t.Log(latest.BlockNumber) logs, err := o1.SelectLogsByBlockRange(ctx, 1, 17) require.NoError(t, err) @@ -454,11 +485,20 @@ func TestORM(t *testing.T) { require.NoError(t, err) require.Len(t, logs, 8) - // Delete expired logs + // Delete expired logs with page limit time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period - deleted, err := o1.DeleteExpiredLogs(ctx, 0) + deleted, err := o1.DeleteExpiredLogs(ctx, 2) require.NoError(t, err) assert.Equal(t, int64(1), deleted) + + // Ensure that both of the logs from the second chain are still there + logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2) + require.NoError(t, err) + assert.Len(t, logs, 1) + logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x5555"), topic) + require.NoError(t, err) + assert.Len(t, logs, 1) + logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) @@ -471,7 +511,7 @@ func TestORM(t *testing.T) { require.NoError(t, err) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - require.Zero(t, len(logs)) + assert.Zero(t, len(logs)) } type PgxLogger struct {