diff --git a/packages/backend/src/Application.ts b/packages/backend/src/Application.ts index a5bd3a85..2df71886 100644 --- a/packages/backend/src/Application.ts +++ b/packages/backend/src/Application.ts @@ -3,6 +3,7 @@ import { Logger } from '@l2beat/backend-tools' import { ApiServer } from './api/ApiServer' import { Config } from './config' import { ApplicationModule } from './modules/ApplicationModule' +import { createEthereumDiscoveryModule } from './modules/EthereumDiscoveryModule' import { createHealthModule } from './modules/HealthModule' import { Database } from './peripherals/database/shared/Database' @@ -19,6 +20,7 @@ export class Application { const modules: (ApplicationModule | undefined)[] = [ createHealthModule(config), + createEthereumDiscoveryModule(database, logger), ] const apiServer = new ApiServer( diff --git a/packages/backend/src/indexers/BlockNumberIndexer.test.ts b/packages/backend/src/indexers/BlockNumberIndexer.test.ts new file mode 100644 index 00000000..db8269f3 --- /dev/null +++ b/packages/backend/src/indexers/BlockNumberIndexer.test.ts @@ -0,0 +1,195 @@ +import { assert, Logger } from '@l2beat/backend-tools' +import { Hash256, UnixTime } from '@lz/libs' +import { expect, mockFn, mockObject } from 'earl' + +import { + BlockchainClient, + BlockFromClient, +} from '../peripherals/clients/BlockchainClient' +import { + BlockNumberRecord, + BlockNumberRepository, +} from '../peripherals/database/BlockNumberRepository' +import { IndexerStateRepository } from '../peripherals/database/IndexerStateRepository' +import { BlockNumberIndexer } from './BlockNumberIndexer' +import { ClockIndexer } from './ClockIndexer' + +describe(BlockNumberIndexer.name, () => { + describe(BlockNumberIndexer.prototype.update.name, () => { + it('downloads a new block and returns its timestamp without reorg', async () => { + const blockNumberIndexer = new BlockNumberIndexer( + mockObject({ + getBlockNumberAtOrBefore, + getBlock: async (number) => mockBlock(number), + }), + mockObject({ + findByNumber: async (number) => mockBlockRecord(number), + addMany: async () => [0], + add: async () => 0, + }), + mockObject(), + 0, + mockObject({ + subscribe: () => {}, + }), + Logger.SILENT, + ) + + // from stays at 0 as we do not check against `from` value + expect(await blockNumberIndexer.update(0, 1)).toEqual(0) + expect(await blockNumberIndexer.update(0, 2000)).toEqual(1000) + expect(await blockNumberIndexer.update(1000, 2000)).toEqual(2000) + }) + + it('downloads a new block and returns its timestamp with reorg', async () => { + const memoryBlockNumberRepository = mockBlockNumberRepository([ + blockToRecord(BLOCKS[1]!), + ]) + + const reorgedBlock = { + number: 2, + hash: Hash256.random().toString(), + parentHash: HASH1, + timestamp: 1500, + } + const reorgedBlockRecord: BlockNumberRecord = { + blockNumber: reorgedBlock.number, + blockHash: Hash256(reorgedBlock.hash), + timestamp: new UnixTime(reorgedBlock.timestamp), + } + const blockNumberIndexer = new BlockNumberIndexer( + mockObject({ + getBlockNumberAtOrBefore, + getBlock: mockFn(async (number: number) => + mockBlock(number), + ).resolvesToOnce(reorgedBlock), + }), + memoryBlockNumberRepository, + mockObject({ + findById: async () => ({ id: 'BlockNumberIndexer', height: 1 }), + }), + 1, + mockObject({ + subscribe: () => {}, + }), + Logger.DEBUG, + ) + + await blockNumberIndexer.start() + + // saves block to database + expect(await blockNumberIndexer.update(0, 2000)).toEqual(1500) + expect(memoryBlockNumberRepository.add).toHaveBeenCalledWith( + reorgedBlockRecord, + ) + + expect(await blockNumberIndexer.update(0, 3000)).toEqual(1000) + expect(await blockNumberIndexer.update(1000, 3000)).toEqual(3000) + expect(memoryBlockNumberRepository.addMany).toHaveBeenCalledTimes(1) + expect(memoryBlockNumberRepository.addMany).toHaveBeenNthCalledWith( + 1, + [BLOCKS[2], BLOCKS[3]].map(blockToRecord), + ) + }) + }) +}) + +const HASH0 = Hash256.random() +const HASH1 = Hash256.random() +const HASH2 = Hash256.random() +const HASH3 = Hash256.random() +const HASH4 = Hash256.random() + +const BLOCKS = [ + { + number: 0, + hash: HASH0.toString(), + parentHash: '', + timestamp: 0, + }, + { + number: 1, + hash: HASH1.toString(), + parentHash: HASH0.toString(), + timestamp: 1000, + }, + { + number: 2, + hash: HASH2.toString(), + parentHash: HASH1.toString(), + timestamp: 2000, + }, + { + number: 3, + hash: HASH3.toString(), + parentHash: HASH2.toString(), + timestamp: 3000, + }, + { + number: 4, + hash: HASH4.toString(), + parentHash: HASH3.toString(), + timestamp: 4000, + }, +] as const + +function mockBlockRecord(blockNumber: number): BlockNumberRecord | undefined { + if (blockNumber === 0) { + return + } + const block = BLOCKS.find((b) => b.number === blockNumber) + assert(block, `Block not found for given number: ${blockNumber}`) + return { + blockNumber: block.number, + blockHash: Hash256(block.hash), + timestamp: new UnixTime(block.timestamp), + } +} + +function mockBlock(blockId: number | Hash256): BlockFromClient { + console.log('mockBlock', blockId) + if (typeof blockId === 'number') { + const block = BLOCKS.find((b) => b.number === blockId) + assert(block, `Block not found for given number: ${blockId}`) + return block + } + + const block = BLOCKS.find((b) => b.hash === blockId.toString()) + assert(block, `Block not found for given hash: ${blockId}`) + return block +} + +function getBlockNumberAtOrBefore(timestamp: UnixTime): Promise { + const block = BLOCKS.filter((b) => b.timestamp <= timestamp.toNumber()) + .sort((a, b) => b.timestamp - a.timestamp) + .shift() + assert(block, `Block not found for given timestamp: ${timestamp.toString()}`) + console.log('getBlockNumberAtOrBefore', timestamp.toString(), block.number) + return Promise.resolve(block.number) +} + +function mockBlockNumberRepository(initialStorage: BlockNumberRecord[]) { + const blockNumberStorage: BlockNumberRecord[] = [...initialStorage] + + return mockObject({ + findByNumber: async (number) => + blockNumberStorage.find((bnr) => bnr.blockNumber === number), + findLast: async () => blockNumberStorage.at(-1), + addMany: async (blocks: BlockNumberRecord[]) => { + blockNumberStorage.push(...blocks) + return blocks.map((b) => b.blockNumber) + }, + add: async (block: BlockNumberRecord) => { + blockNumberStorage.push(block) + return block.blockNumber + }, + }) +} + +function blockToRecord(blockFromClient: BlockFromClient): BlockNumberRecord { + return { + blockNumber: blockFromClient.number, + blockHash: Hash256(blockFromClient.hash), + timestamp: new UnixTime(blockFromClient.timestamp), + } +} diff --git a/packages/backend/src/indexers/BlockNumberIndexer.ts b/packages/backend/src/indexers/BlockNumberIndexer.ts new file mode 100644 index 00000000..f243af80 --- /dev/null +++ b/packages/backend/src/indexers/BlockNumberIndexer.ts @@ -0,0 +1,126 @@ +import { Logger } from '@l2beat/backend-tools' +import { ChildIndexer } from '@l2beat/uif' +import { Hash256, UnixTime } from '@lz/libs' + +import { BlockchainClient } from '../peripherals/clients/BlockchainClient' +import { + BlockNumberRecord, + BlockNumberRepository, +} from '../peripherals/database/BlockNumberRepository' +import { IndexerStateRepository } from '../peripherals/database/IndexerStateRepository' +import { ClockIndexer } from './ClockIndexer' + +export class BlockNumberIndexer extends ChildIndexer { + private lastKnownNumber = 0 + private reorgedBlocks = [] as BlockNumberRecord[] + private readonly id: string + + constructor( + private readonly blockchainClient: BlockchainClient, + private readonly blockRepository: BlockNumberRepository, + private readonly indexerRepository: IndexerStateRepository, + private readonly startBlock: number, + clockIndexer: ClockIndexer, + logger: Logger, + ) { + super(logger, [clockIndexer]) + this.id = 'BlockDownloader' // this should be unique across all indexers + } + + override async start(): Promise { + await super.start() + this.lastKnownNumber = + (await this.blockRepository.findLast())?.blockNumber ?? this.startBlock + } + + async update(_fromTimestamp: number, toTimestamp: number): Promise { + if (this.reorgedBlocks.length > 0) { + // we do not need to check if lastKnown < to because we are sure that + // those blocks are from the past + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.lastKnownNumber = this.reorgedBlocks.at(-1)!.blockNumber + await this.blockRepository.addMany(this.reorgedBlocks) + this.reorgedBlocks = [] + } + + const tip = await this.blockchainClient.getBlockNumberAtOrBefore( + new UnixTime(toTimestamp), + ) + if (tip <= this.lastKnownNumber) { + return toTimestamp + } + + return await this.advanceChain(this.lastKnownNumber + 1) + } + + async invalidate(to: number): Promise { + await this.blockRepository.deleteAfter(to) + return to + } + + private async advanceChain(blockNumber: number): Promise { + let [block, parent] = await Promise.all([ + this.blockchainClient.getBlock(blockNumber), + this.getKnownBlock(blockNumber - 1), + ]) + + if (Hash256(block.parentHash) !== parent.blockHash) { + const changed = [block] + + let current = blockNumber + while (Hash256(block.parentHash) !== parent.blockHash) { + current-- + ;[block, parent] = await Promise.all([ + this.blockchainClient.getBlock(Hash256(block.parentHash)), + this.getKnownBlock(current - 1), + ]) + changed.push(block) + } + + this.reorgedBlocks = changed.reverse().map((block) => { + return { + blockNumber: block.number, + blockHash: Hash256(block.hash), + timestamp: new UnixTime(block.timestamp), + } + }) + + return parent.timestamp.toNumber() + } + + const record: BlockNumberRecord = { + blockNumber: block.number, + blockHash: Hash256(block.hash), + timestamp: new UnixTime(block.timestamp), + } + await this.blockRepository.add(record) + this.lastKnownNumber = block.number + + return block.timestamp + } + + async setSafeHeight(height: number): Promise { + await this.indexerRepository.addOrUpdate({ id: this.id, height }) + } + + async getSafeHeight(): Promise { + const record = await this.indexerRepository.findById(this.id) + return record?.height ?? 0 + } + + private async getKnownBlock(blockNumber: number): Promise { + const known = await this.blockRepository.findByNumber(blockNumber) + if (known) { + return known + } + const downloaded = await this.blockchainClient.getBlock(blockNumber) + + const record: BlockNumberRecord = { + blockNumber: downloaded.number, + blockHash: Hash256(downloaded.hash), + timestamp: new UnixTime(downloaded.timestamp), + } + await this.blockRepository.add(record) + return record + } +} diff --git a/packages/backend/src/indexers/ClockIndexer.ts b/packages/backend/src/indexers/ClockIndexer.ts index 54be0a8e..a55cfefe 100644 --- a/packages/backend/src/indexers/ClockIndexer.ts +++ b/packages/backend/src/indexers/ClockIndexer.ts @@ -14,6 +14,10 @@ export class ClockIndexer extends RootIndexer { } tick(): Promise { - return Promise.resolve(Date.now()) + return Promise.resolve(getTimeSeconds()) } } + +function getTimeSeconds(): number { + return Math.floor(Date.now() / 1000) +} diff --git a/packages/backend/src/modules/EthereumDiscoveryModule.ts b/packages/backend/src/modules/EthereumDiscoveryModule.ts new file mode 100644 index 00000000..4fea7423 --- /dev/null +++ b/packages/backend/src/modules/EthereumDiscoveryModule.ts @@ -0,0 +1,41 @@ +import { Logger } from '@l2beat/backend-tools' +import { JsonRpcProvider } from 'ethers' + +import { BlockNumberIndexer } from '../indexers/BlockNumberIndexer' +import { ClockIndexer } from '../indexers/ClockIndexer' +import { BlockchainClient } from '../peripherals/clients/BlockchainClient' +import { BlockNumberRepository } from '../peripherals/database/BlockNumberRepository' +import { IndexerStateRepository } from '../peripherals/database/IndexerStateRepository' +import { Database } from '../peripherals/database/shared/Database' +import { ApplicationModule } from './ApplicationModule' + +export function createEthereumDiscoveryModule( + database: Database, + logger: Logger, +): ApplicationModule { + const blockRepository = new BlockNumberRepository(database, logger) + const indexerRepository = new IndexerStateRepository(database, logger) + + const provider = new JsonRpcProvider( + 'https://eth-mainnet.g.alchemy.com/v2/CLeXrqsc9lGb40KK9gRIbhQKGiakgp-S', + ) + const blockchainClient = new BlockchainClient(provider, logger) + + const clockIndexer = new ClockIndexer(logger, 10 * 1000) + const blockNumberIndexer = new BlockNumberIndexer( + blockchainClient, + blockRepository, + indexerRepository, + 18127698, + clockIndexer, + logger, + ) + + return { + routers: [], + start: async () => { + await clockIndexer.start() + await blockNumberIndexer.start() + }, + } +} diff --git a/packages/backend/src/peripherals/clients/BlockchainClient.ts b/packages/backend/src/peripherals/clients/BlockchainClient.ts index e470326f..fddb0d22 100644 --- a/packages/backend/src/peripherals/clients/BlockchainClient.ts +++ b/packages/backend/src/peripherals/clients/BlockchainClient.ts @@ -1,7 +1,17 @@ -import { Logger } from '@l2beat/backend-tools' -import { Bytes, EthereumAddress, RateLimitedProvider, UnixTime } from '@lz/libs' +import { assert, Logger } from '@l2beat/backend-tools' +import { + Bytes, + EthereumAddress, + Hash256, + RateLimitedProvider, + UnixTime, +} from '@lz/libs' import { Block, Log, Provider } from 'ethers' +export type BlockFromClient = Pick< + Block, + 'timestamp' | 'number' | 'parentHash' +> & { hash: string } export class BlockchainClient { private readonly provider: RateLimitedProvider @@ -37,10 +47,6 @@ export class BlockchainClient { ): Promise<{ timestamp: number }> => { const block = await this.getBlock(number) - if (!block) { - throw new Error(`Block not found for given number: ${number}`) - } - return { timestamp: block.timestamp, } @@ -49,8 +55,16 @@ export class BlockchainClient { return getBlockNumberAtOrBefore(timestamp, start, end, getBlockTimestamp) } - async getBlock(blockNumber: number): Promise { - return this.provider.getBlock(blockNumber) + async getBlock(blockId: number | Hash256): Promise { + const block = await this.provider.getBlock(blockId) + assert(block, `Block not found for given number: ${blockId}`) + assert(block.hash, `Block hash not found for given number: ${blockId}`) + return { + timestamp: block.timestamp, + number: block.number, + parentHash: block.parentHash, + hash: block.hash, + } } async call(parameters: CallParameters, blockTag: BlockTag): Promise { diff --git a/packages/backend/src/peripherals/database/BlockNumberRepository.test.ts b/packages/backend/src/peripherals/database/BlockNumberRepository.test.ts index 7250b3a5..5d16bc0a 100644 --- a/packages/backend/src/peripherals/database/BlockNumberRepository.test.ts +++ b/packages/backend/src/peripherals/database/BlockNumberRepository.test.ts @@ -1,5 +1,5 @@ import { Logger } from '@l2beat/backend-tools' -import { UnixTime } from '@lz/libs' +import { Hash256, UnixTime } from '@lz/libs' import { expect } from 'earl' import { setupDatabaseTestSuite } from '../../test/database' @@ -16,10 +16,7 @@ describe(BlockNumberRepository.name, () => { afterEach(() => repository.deleteAll()) it('adds single record and queries it', async () => { - const record: BlockNumberRecord = { - blockNumber: 1, - timestamp: UnixTime.now(), - } + const record = mockRecord(1) await repository.addMany([record]) @@ -35,18 +32,9 @@ describe(BlockNumberRepository.name, () => { it('adds multiple records and queries them', async () => { const records: BlockNumberRecord[] = [ - { - blockNumber: 1, - timestamp: UnixTime.now().add(-2, 'hours'), - }, - { - blockNumber: 2, - timestamp: UnixTime.now().add(-1, 'hours'), - }, - { - blockNumber: 3, - timestamp: UnixTime.now(), - }, + mockRecord(1), + mockRecord(2), + mockRecord(3), ] await repository.addMany(records) @@ -56,16 +44,7 @@ describe(BlockNumberRepository.name, () => { }) it('deletes all records', async () => { - await repository.addMany([ - { - blockNumber: 1, - timestamp: UnixTime.now().add(-1, 'hours'), - }, - { - blockNumber: 2, - timestamp: UnixTime.now(), - }, - ]) + await repository.addMany([mockRecord(1), mockRecord(2)]) await repository.deleteAll() @@ -74,12 +53,8 @@ describe(BlockNumberRepository.name, () => { }) it('deletes all records after a block number', async () => { - const start = UnixTime.now() const range = new Array(10).fill(null).map((_, i) => i + 1) - const records: BlockNumberRecord[] = range.map((i) => ({ - timestamp: start.add(i - range.length, 'hours'), // ascending timestamps - blockNumber: i, - })) + const records = range.map(mockRecord) await repository.addMany(records) await repository.deleteAfter(5) @@ -89,10 +64,7 @@ describe(BlockNumberRepository.name, () => { }) it('gets by number', async () => { - const record: BlockNumberRecord = { - blockNumber: 1, - timestamp: UnixTime.now(), - } + const record = mockRecord(1) await repository.addMany([record]) @@ -102,41 +74,38 @@ describe(BlockNumberRepository.name, () => { }) it('gets last by number', async () => { - const start = UnixTime.now() expect(await repository.findLast()).toEqual(undefined) - const block = { blockNumber: 11813208, timestamp: start } + const block = mockRecord(69420) await repository.addMany([block]) expect(await repository.findLast()).toEqual(block) - const earlierBlock = { - blockNumber: 11813206, - timestamp: start.add(-1, 'hours'), - } - const laterBlock = { - blockNumber: 11813209, - timestamp: start.add(1, 'hours'), - } + const earlierBlock = mockRecord(69419) + const laterBlock = mockRecord(69421) await repository.addMany([laterBlock, earlierBlock]) expect(await repository.findLast()).toEqual(laterBlock) }) it('gets all blocks in range between given numbers (inclusive)', async () => { - const start = UnixTime.now() const range = new Array(20) .fill(null) .map((_, i) => i + 1) .slice(9) // 10 ... 20 inclusive - const blocks = range.map((i) => ({ - blockNumber: i, - timestamp: start.add(i - range.length, 'hours'), // ascending timestamps - })) + const blocks = range.map(mockRecord) await repository.addMany(blocks) expect(await repository.getAllInRange(13, 17)).toEqual(blocks.slice(3, 8)) }) }) + +function mockRecord(blockNumber: number): BlockNumberRecord { + return { + blockNumber, + blockHash: Hash256('0x' + blockNumber.toString(16).padStart(64, '0')), + timestamp: UnixTime.now().add(blockNumber, 'hours'), + } +} diff --git a/packages/backend/src/peripherals/database/BlockNumberRepository.ts b/packages/backend/src/peripherals/database/BlockNumberRepository.ts index 345d4398..3ecebe2e 100644 --- a/packages/backend/src/peripherals/database/BlockNumberRepository.ts +++ b/packages/backend/src/peripherals/database/BlockNumberRepository.ts @@ -1,5 +1,5 @@ import { Logger } from '@l2beat/backend-tools' -import { UnixTime } from '@lz/libs' +import { Hash256, UnixTime } from '@lz/libs' import type { BlockNumberRow } from 'knex/types/tables' import { BaseRepository, CheckConvention } from './shared/BaseRepository' @@ -8,6 +8,7 @@ import { Database } from './shared/Database' export interface BlockNumberRecord { timestamp: UnixTime blockNumber: number + blockHash: Hash256 } export class BlockNumberRepository extends BaseRepository { @@ -16,6 +17,13 @@ export class BlockNumberRepository extends BaseRepository { this.autoWrap>(this) } + async add(record: BlockNumberRecord): Promise { + const row = toRow(record) + const knex = await this.knex() + await knex('block_numbers').insert(row).returning('block_number') + return row.block_number + } + async addMany(records: BlockNumberRecord[]): Promise { const rows: BlockNumberRow[] = records.map(toRow) const knex = await this.knex() @@ -74,6 +82,7 @@ export class BlockNumberRepository extends BaseRepository { function toRow(record: BlockNumberRecord): BlockNumberRow { return { block_number: record.blockNumber, + block_hash: record.blockHash, unix_timestamp: record.timestamp.toDate(), } } @@ -81,6 +90,7 @@ function toRow(record: BlockNumberRecord): BlockNumberRow { function toRecord(row: BlockNumberRow): BlockNumberRecord { return { blockNumber: row.block_number, + blockHash: Hash256(row.block_hash), timestamp: UnixTime.fromDate(row.unix_timestamp), } } diff --git a/packages/backend/src/peripherals/database/migrations/003_block_hash.ts b/packages/backend/src/peripherals/database/migrations/003_block_hash.ts new file mode 100644 index 00000000..b9e3795f --- /dev/null +++ b/packages/backend/src/peripherals/database/migrations/003_block_hash.ts @@ -0,0 +1,26 @@ +/* + ====== IMPORTANT NOTICE ====== + +DO NOT EDIT OR RENAME THIS FILE + +This is a migration file. Once created the file should not be renamed or edited, +because migrations are only run once on the production server. + +If you find that something was incorrectly set up in the `up` function you +should create a new migration file that fixes the issue. + +*/ + +import { Knex } from 'knex' + +export async function up(knex: Knex): Promise { + await knex.schema.alterTable('block_numbers', (table) => { + table.string('block_hash').notNullable() + }) +} + +export async function down(knex: Knex): Promise { + await knex.schema.alterTable('block_numbers', (table) => { + table.dropColumn('block_hash') + }) +} diff --git a/packages/backend/src/peripherals/database/shared/types.ts b/packages/backend/src/peripherals/database/shared/types.ts index 91494a3a..193acda6 100644 --- a/packages/backend/src/peripherals/database/shared/types.ts +++ b/packages/backend/src/peripherals/database/shared/types.ts @@ -3,6 +3,7 @@ declare module 'knex/types/tables' { interface BlockNumberRow { unix_timestamp: Date block_number: number + block_hash: string } interface IndexerStateRow {