From 3c0b66b92c8772682d4a2f0bd26a22806aaad75d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20T=C3=B3rz?= Date: Mon, 17 Jul 2023 15:49:45 +0200 Subject: [PATCH] improve LiveL2TransactionDownloader --- packages/backend/src/Application.ts | 26 +++ .../src/config/starkex/apex-mainnet.ts | 5 +- packages/backend/src/core/IDataSyncService.ts | 5 +- .../src/core/PerpetualValidiumSyncService.ts | 10 +- .../src/core/sync/L2TransactionDownloader.ts | 104 ------------ .../core/sync/LiveL2TransactionDownloader.ts | 154 ++++++++++++++++++ .../src/core/sync/SyncScheduler.test.ts | 120 +++++++++++--- .../backend/src/core/sync/SyncScheduler.ts | 8 +- .../src/peripherals/database/KeyValueStore.ts | 1 + .../database/L2TransactionRepository.ts | 1 + .../peripherals/starkware/schema/regexes.ts | 2 +- 11 files changed, 306 insertions(+), 130 deletions(-) delete mode 100644 packages/backend/src/core/sync/L2TransactionDownloader.ts create mode 100644 packages/backend/src/core/sync/LiveL2TransactionDownloader.ts diff --git a/packages/backend/src/Application.ts b/packages/backend/src/Application.ts index 5d7bee43b..619c61daa 100644 --- a/packages/backend/src/Application.ts +++ b/packages/backend/src/Application.ts @@ -55,6 +55,8 @@ import { SpotValidiumSyncService } from './core/SpotValidiumSyncService' import { SpotValidiumUpdater } from './core/SpotValidiumUpdater' import { StatusService } from './core/StatusService' import { BlockDownloader } from './core/sync/BlockDownloader' +import { Clock } from './core/sync/Clock' +import { LiveL2TransactionDownloader } from './core/sync/LiveL2TransactionDownloader' import { SyncScheduler } from './core/sync/SyncScheduler' import { TransactionStatusService } from './core/TransactionStatusService' import { UserService } from './core/UserService' @@ -85,6 +87,7 @@ import { TokenInspector } from './peripherals/ethereum/TokenInspector' import { AvailabilityGatewayClient } from './peripherals/starkware/AvailabilityGatewayClient' import { FeederGatewayClient } from './peripherals/starkware/FeederGatewayClient' import { FetchClient } from './peripherals/starkware/FetchClient' +import { L2TransactionClient } from './peripherals/starkware/L2TransactionClient' import { handleServerError, reportError } from './tools/ErrorReporter' import { Logger } from './tools/Logger' import { shouldShowL2Transactions } from './utils/shouldShowL2Transactions' @@ -100,6 +103,8 @@ export class Application { reportError, }) + const clock = new Clock() + // #endregion tools // #region peripherals @@ -219,6 +224,7 @@ export class Application { let stateTransitionCollector: IStateTransitionCollector let feederGatewayCollector: FeederGatewayCollector | undefined + let l2TransactionDownloader: LiveL2TransactionDownloader | undefined if (config.starkex.dataAvailabilityMode === 'validium') { const availabilityGatewayClient = new AvailabilityGatewayClient( @@ -235,6 +241,24 @@ export class Application { ) stateTransitionCollector = perpetualValidiumStateTransitionCollector + const l2TransactionClient = config.starkex.l2TransactionApi + ? new L2TransactionClient( + config.starkex.l2TransactionApi, + fetchClient + ) + : undefined + + l2TransactionDownloader = l2TransactionClient + ? new LiveL2TransactionDownloader( + l2TransactionClient, + l2TransactionRepository, + stateUpdateRepository, + kvStore, + clock, + logger + ) + : undefined + const feederGatewayClient = config.starkex.feederGateway ? new FeederGatewayClient( config.starkex.feederGateway, @@ -278,6 +302,7 @@ export class Application { perpetualValidiumUpdater, withdrawalAllowedCollector, feederGatewayCollector, + l2TransactionDownloader, logger ) } else { @@ -644,6 +669,7 @@ export class Application { if (config.enableSync) { transactionStatusService.start() await syncScheduler.start() + await l2TransactionDownloader?.start() await blockDownloader.start() } diff --git a/packages/backend/src/config/starkex/apex-mainnet.ts b/packages/backend/src/config/starkex/apex-mainnet.ts index 881ddffa4..573f1bb7a 100644 --- a/packages/backend/src/config/starkex/apex-mainnet.ts +++ b/packages/backend/src/config/starkex/apex-mainnet.ts @@ -37,11 +37,14 @@ export function getApexMainnetConfig(): StarkexConfig { auth: clientAuth, }, l2TransactionApi: { - getUrl: (startId, expectCount) => { + getTransactionsUrl: (startId, expectCount) => { return `${getEnv( 'APEX_TRANSACTION_API_URL' )}?startApexId=${startId}&expectCount=${expectCount}` }, + getThirdPartyIdByTransactionIdUrl: (transactionId) => { + return `${getEnv('APEX_THIRD_PARTY_ID_API_URL')}?txId=${transactionId}` + }, auth: clientAuth, }, collateralAsset: { diff --git a/packages/backend/src/core/IDataSyncService.ts b/packages/backend/src/core/IDataSyncService.ts index 0fccc2f94..6ce7f7d59 100644 --- a/packages/backend/src/core/IDataSyncService.ts +++ b/packages/backend/src/core/IDataSyncService.ts @@ -4,11 +4,12 @@ import { PerpetualRollupStateTransition } from './PerpetualRollupUpdater' import { ValidiumStateTransition } from './PerpetualValidiumUpdater' export interface IDataSyncService { - sync(blockRange: BlockRange): Promise + sync(blockRange: BlockRange, isTip?: boolean): Promise + // I made isTip optional but as soon as we will support other types like PerpetualRollup etc. we will need to make it required. processStateTransitions( stateTransitions: | ValidiumStateTransition[] | PerpetualRollupStateTransition[] - ): Promise + ): Promise discardAfter(blockNumber: BlockNumber): Promise } diff --git a/packages/backend/src/core/PerpetualValidiumSyncService.ts b/packages/backend/src/core/PerpetualValidiumSyncService.ts index 82381e364..49cec13b3 100644 --- a/packages/backend/src/core/PerpetualValidiumSyncService.ts +++ b/packages/backend/src/core/PerpetualValidiumSyncService.ts @@ -13,6 +13,7 @@ import { PerpetualValidiumUpdater, ValidiumStateTransition, } from './PerpetualValidiumUpdater' +import { LiveL2TransactionDownloader } from './sync/LiveL2TransactionDownloader' export class PerpetualValidiumSyncService implements IDataSyncService { constructor( @@ -24,12 +25,15 @@ export class PerpetualValidiumSyncService implements IDataSyncService { private readonly perpetualValidiumUpdater: PerpetualValidiumUpdater, private readonly withdrawalAllowedCollector: WithdrawalAllowedCollector, private readonly feederGatewayCollector: FeederGatewayCollector | undefined, + private readonly L2TransactionDownloader: + | LiveL2TransactionDownloader + | undefined, private readonly logger: Logger ) { this.logger = logger.for(this) } - async sync(blockRange: BlockRange) { + async sync(blockRange: BlockRange, isTip: boolean) { const userRegistrations = await this.userRegistrationCollector.collect( blockRange ) @@ -49,6 +53,10 @@ export class PerpetualValidiumSyncService implements IDataSyncService { await this.processStateTransitions(stateTransitions) await this.feederGatewayCollector?.collect() + + if (isTip) { + await this.L2TransactionDownloader?.enableSync() + } } async processStateTransitions(stateTransitions: ValidiumStateTransition[]) { diff --git a/packages/backend/src/core/sync/L2TransactionDownloader.ts b/packages/backend/src/core/sync/L2TransactionDownloader.ts deleted file mode 100644 index e797bdb3e..000000000 --- a/packages/backend/src/core/sync/L2TransactionDownloader.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { KeyValueStore } from '../../peripherals/database/KeyValueStore' -import { L2TransactionRepository } from '../../peripherals/database/L2TransactionRepository' -import { L2TransactionClient } from '../../peripherals/starkware/L2TransactionClient' -import { Logger } from '../../tools/Logger' -import { Clock } from './Clock' - -export class L2TransactionDownloader { - private clock = new Clock() - private PAGE_SIZE = 100 - private isRunning = false - private lastSyncedThirdPartyId: number | undefined - - constructor( - private readonly l2TransactionClient: L2TransactionClient, - private readonly l2TransactionRepository: L2TransactionRepository, - private readonly keyValueStore: KeyValueStore, - private readonly logger: Logger - ) { - this.logger = this.logger.for(this) - } - - start() { - this.clock.onEvery('15s', () => this.downloadNewTransactions()) - } - - async downloadNewTransactions() { - if (this.isRunning) { - return - } - this.isRunning = true - - this.logger.info('Starting L2 transaction downloader') - const lastIncluded = await this.l2TransactionRepository.findLatestIncluded() - if (!lastIncluded) { - this.isRunning = false - return - } - - this.lastSyncedThirdPartyId = Number( - await this.keyValueStore.findByKey('lastSyncedThirdPartyId') - ) - - let thirdPartyIdToSync = this.lastSyncedThirdPartyId - ? this.lastSyncedThirdPartyId - : await this.l2TransactionClient.getThirdPartyIdByTransactionId( - lastIncluded.transactionId - ) - - if (!thirdPartyIdToSync) { - this.isRunning = false - return - } - - while (true) { - this.logger.info(thirdPartyIdToSync.toString()) - const transactions = await this.addTransactions(thirdPartyIdToSync) - - if (!transactions) { - break - } - this.lastSyncedThirdPartyId = - transactions[transactions.length - 1]?.thirdPartyId - - thirdPartyIdToSync += this.PAGE_SIZE - } - - if (this.lastSyncedThirdPartyId) { - await this.keyValueStore.addOrUpdate({ - key: 'lastSyncedThirdPartyId', - value: this.lastSyncedThirdPartyId.toString(), - }) - } - this.isRunning = false - } - - private async addTransactions(thirdPartyId: number) { - this.logger.info(`Downloading transactions from ${thirdPartyId}`) - const transactions = - await this.l2TransactionClient.getPerpetualTransactions( - thirdPartyId, - this.PAGE_SIZE - ) - - if (!transactions) { - this.logger.info('No transactions found') - return - } - - await this.l2TransactionRepository.runInTransactionWithLockedTable( - async (trx) => { - for (const transaction of transactions) { - await this.l2TransactionRepository.add( - { - transactionId: transaction.transactionId, - data: transaction.transaction, - }, - trx - ) - } - } - ) - return transactions - } -} diff --git a/packages/backend/src/core/sync/LiveL2TransactionDownloader.ts b/packages/backend/src/core/sync/LiveL2TransactionDownloader.ts new file mode 100644 index 000000000..6fd1270b0 --- /dev/null +++ b/packages/backend/src/core/sync/LiveL2TransactionDownloader.ts @@ -0,0 +1,154 @@ +import { Knex } from 'knex' + +import { KeyValueStore } from '../../peripherals/database/KeyValueStore' +import { L2TransactionRepository } from '../../peripherals/database/L2TransactionRepository' +import { StateUpdateRepository } from '../../peripherals/database/StateUpdateRepository' +import { L2TransactionClient } from '../../peripherals/starkware/L2TransactionClient' +import { PerpetualL2Transaction } from '../../peripherals/starkware/toPerpetualTransactions' +import { Logger } from '../../tools/Logger' +import { Clock } from './Clock' + +export class LiveL2TransactionDownloader { + private PAGE_SIZE = 100 + private isRunning = false + private lastSyncedThirdPartyId: number | undefined + private enabled = false + + constructor( + private readonly l2TransactionClient: L2TransactionClient, + private readonly l2TransactionRepository: L2TransactionRepository, + private readonly stateUpdateRepository: StateUpdateRepository, + private readonly keyValueStore: KeyValueStore, + private readonly clock: Clock, + private readonly logger: Logger + ) { + this.logger = this.logger.for(this) + } + + async start() { + this.logger.info('Starting L2 transaction downloader') + + await this.initialize() + this.clock.onEvery('5s', () => this.sync()) + } + + private async initialize() { + const lastSyncedThirdPartyId = await this.keyValueStore.findByKey( + 'lastSyncedThirdPartyId' + ) + if (!lastSyncedThirdPartyId) { + return + } + + this.enabled = true + this.lastSyncedThirdPartyId = lastSyncedThirdPartyId + } + + async enableSync() { + if (this.enabled) { + return + } + + const lastStateUpdate = await this.stateUpdateRepository.findLast() + if (!lastStateUpdate) { + return + } + const lastIncluded = await this.l2TransactionRepository.findLatestIncluded() + if (!lastIncluded) { + return + } + + if (lastStateUpdate.id !== lastIncluded.stateUpdateId) { + return + } + + const lastSyncedThirdPartyId = + await this.l2TransactionClient.getThirdPartyIdByTransactionId( + lastIncluded.transactionId + ) + if (!lastSyncedThirdPartyId) { + return + } + this.logger.info('Enabling L2 transaction downloader') + await this.updateLastSyncedThirdPartyId(lastSyncedThirdPartyId) + this.enabled = true + } + + private async sync() { + if (this.isRunning) { + return + } + this.isRunning = true + + const lastSyncedThirdPartyId = this.lastSyncedThirdPartyId + if (!lastSyncedThirdPartyId) { + this.isRunning = false + return + } + + await this.l2TransactionRepository.runInTransactionWithLockedTable( + async (trx) => { + await this.downloadAndAddTransactions(lastSyncedThirdPartyId + 1, trx) + } + ) + } + + private async downloadAndAddTransactions( + thirdPartyId: number, + trx: Knex.Transaction + ) { + this.logger.info(`Downloading live transactions from ${thirdPartyId}`) + + const transactions = + await this.l2TransactionClient.getPerpetualTransactions( + thirdPartyId, + this.PAGE_SIZE + ) + + if (!transactions) { + this.logger.info('No transactions found') + this.isRunning = false + return + } + + await this.addTransactions(transactions, trx) + + const lastSyncedThirdPartyId = thirdPartyId + this.PAGE_SIZE + await this.updateLastSyncedThirdPartyId(lastSyncedThirdPartyId, trx) + + if (transactions.length === this.PAGE_SIZE) { + await this.downloadAndAddTransactions(lastSyncedThirdPartyId, trx) + } else { + this.isRunning = false + } + } + + private async addTransactions( + transactions: PerpetualL2Transaction[], + trx: Knex.Transaction + ) { + for (const transaction of transactions) { + await this.l2TransactionRepository.add( + { + transactionId: transaction.transactionId, + data: transaction.transaction, + }, + trx + ) + } + } + + private async updateLastSyncedThirdPartyId( + lastSyncedThirdPartyId: number, + trx?: Knex.Transaction + ) { + await this.keyValueStore.addOrUpdate( + { + key: 'lastSyncedThirdPartyId', + value: lastSyncedThirdPartyId, + }, + trx + ) + this.lastSyncedThirdPartyId = lastSyncedThirdPartyId + } +} diff --git a/packages/backend/src/core/sync/SyncScheduler.test.ts b/packages/backend/src/core/sync/SyncScheduler.test.ts index 336c39165..773b6cb8a 100644 --- a/packages/backend/src/core/sync/SyncScheduler.test.ts +++ b/packages/backend/src/core/sync/SyncScheduler.test.ts @@ -6,12 +6,13 @@ import { BlockRange } from '../../model' import { KeyValueStore } from '../../peripherals/database/KeyValueStore' import { Logger } from '../../tools/Logger' import { PerpetualRollupSyncService } from '../PerpetualRollupSyncService' +import { PerpetualValidiumSyncService } from '../PerpetualValidiumSyncService' import { Preprocessor } from '../preprocessing/Preprocessor' import { BlockDownloader } from './BlockDownloader' import { SyncScheduler } from './SyncScheduler' import { Block } from './syncSchedulerReducer' -describe(SyncScheduler.name, () => { +describe.only(SyncScheduler.name, () => { const block = (number: number): Block => ({ number, hash: Hash256.fake(number.toString()), @@ -94,11 +95,12 @@ describe(SyncScheduler.name, () => { describe(SyncScheduler.prototype.dispatch.name, () => { it('handles a successful sync', async () => { + const isTip = true const mockKeyValueStore = mockObject({ addOrUpdate: mockFn().resolvesTo('lastBlockNumberSynced'), }) const blockDownloader = mockObject() - const dataSyncService = mockObject({ + const dataSyncService = mockObject({ sync: async () => {}, discardAfter: async () => {}, }) @@ -113,6 +115,8 @@ describe(SyncScheduler.name, () => { Logger.SILENT, { earliestBlock: 1_000_000 } ) + const mockIsTipFn = mockFn().returns(isTip) + syncScheduler.isTip = mockIsTipFn syncScheduler.dispatch({ type: 'initialized', @@ -121,9 +125,11 @@ describe(SyncScheduler.name, () => { }) await waitForExpect(() => { + expect(mockIsTipFn).toHaveBeenOnlyCalledWith(1_000_003) expect(dataSyncService.discardAfter).toHaveBeenOnlyCalledWith(1_000_000) expect(dataSyncService.sync).toHaveBeenOnlyCalledWith( - new BlockRange([block(1_000_001), block(1_000_002)]) + new BlockRange([block(1_000_001), block(1_000_002)]), + isTip ) expect(mockKeyValueStore.addOrUpdate).toHaveBeenOnlyCalledWith({ key: 'lastBlockNumberSynced', @@ -134,11 +140,12 @@ describe(SyncScheduler.name, () => { }) it('handles a failing sync', async () => { + const isTip = false const mockKeyValueStore = mockObject({ addOrUpdate: mockFn().resolvesTo('lastBlockNumberSynced'), }) const blockDownloader = mockObject() - const dataSyncService = mockObject({ + const dataSyncService = mockObject({ sync: mockFn().rejectsWith(new Error('oops')), discardAfter: async () => {}, }) @@ -153,6 +160,8 @@ describe(SyncScheduler.name, () => { Logger.SILENT, { earliestBlock: 1_000_000 } ) + const mockIsTipFn = mockFn().returns(isTip) + syncScheduler.isTip = mockIsTipFn syncScheduler.dispatch({ type: 'initialized', @@ -161,8 +170,10 @@ describe(SyncScheduler.name, () => { }) await waitForExpect(() => { + expect(mockIsTipFn).toHaveBeenOnlyCalledWith(1_000_003) expect(dataSyncService.sync).toHaveBeenOnlyCalledWith( - new BlockRange([block(1_000_001), block(1_000_002)]) + new BlockRange([block(1_000_001), block(1_000_002)]), + isTip ) expect(mockKeyValueStore.addOrUpdate).not.toHaveBeenCalled() }) @@ -172,11 +183,12 @@ describe(SyncScheduler.name, () => { }) it('handles a successful discardAfter', async () => { + const isTip = true const mockKeyValueStore = mockObject({ addOrUpdate: mockFn().resolvesTo('lastBlockNumberSynced'), }) const blockDownloader = mockObject() - const dataSyncService = mockObject({ + const dataSyncService = mockObject({ sync: async () => {}, discardAfter: async () => {}, }) @@ -191,6 +203,8 @@ describe(SyncScheduler.name, () => { Logger.SILENT, { earliestBlock: 1_000_000 } ) + const mockIsTipFn = mockFn().returns(isTip) + syncScheduler.isTip = mockIsTipFn syncScheduler.dispatch({ type: 'initialized', @@ -207,8 +221,10 @@ describe(SyncScheduler.name, () => { expect(dataSyncService.discardAfter).toHaveBeenNthCalledWith(1, 999_999) expect(dataSyncService.discardAfter).toHaveBeenNthCalledWith(2, 999_999) + expect(mockIsTipFn).toHaveBeenOnlyCalledWith(1_000_002) expect(dataSyncService.sync).toHaveBeenOnlyCalledWith( - new BlockRange([block(1_000_000), block(1_000_001)]) + new BlockRange([block(1_000_000), block(1_000_001)]), + isTip ) expect(mockKeyValueStore.addOrUpdate).toHaveBeenCalledTimes(2) @@ -271,9 +287,11 @@ describe(SyncScheduler.name, () => { }) describe(SyncScheduler.prototype.handleSync.name, () => { + const maxBlockNumber = 10 + it('triggers data sync only if block range is inside the limit', async () => { - const maxBlockNumber = 10 - const dataSyncService = mockObject({ + const isTip = true + const dataSyncService = mockObject({ discardAfter: async () => {}, sync: async () => {}, }) @@ -291,28 +309,92 @@ describe(SyncScheduler.name, () => { Logger.SILENT, { earliestBlock: 1, maxBlockNumber } ) + const mockIsTipFn = mockFn().returns(isTip) + syncScheduler.isTip = mockIsTipFn - await syncScheduler.handleSync( - new BlockRange([block(maxBlockNumber - 2), block(maxBlockNumber - 1)]) - ) + const blocks = new BlockRange([ + block(maxBlockNumber - 2), + block(maxBlockNumber - 1), + ]) + + await syncScheduler.handleSync(blocks) await waitForExpect(() => { + expect(mockIsTipFn).toHaveBeenOnlyCalledWith(blocks.end) expect(dataSyncService.discardAfter).toHaveBeenCalledTimes(1) - expect(dataSyncService.sync).toHaveBeenCalledTimes(1) + expect(dataSyncService.sync).toHaveBeenOnlyCalledWith( + expect.a(BlockRange), + isTip + ) expect(mockKeyValueStore.addOrUpdate).toHaveBeenCalledTimes(1) expect(preprocessor.sync).toHaveBeenCalled() }) + }) - await syncScheduler.handleSync( - new BlockRange([block(maxBlockNumber), block(maxBlockNumber + 1)]) + it('skips data sync if block range is outside the limit', async () => { + const dataSyncService = mockObject({ + discardAfter: mockFn(), + sync: mockFn(), + }) + const mockKeyValueStore = mockObject({ + addOrUpdate: mockFn(), + }) + const preprocessor = mockObject>({ + sync: mockFn(), + }) + + const syncScheduler = new SyncScheduler( + mockKeyValueStore, + mockObject(), + dataSyncService, + preprocessor, + Logger.SILENT, + { earliestBlock: 1, maxBlockNumber } ) + const mockIsTipFn = mockFn().returns(false) + syncScheduler.isTip = mockIsTipFn + + const blocks = new BlockRange([ + block(maxBlockNumber), + block(maxBlockNumber + 1), + ]) + + await syncScheduler.handleSync(blocks) await waitForExpect(() => { - expect(dataSyncService.discardAfter).toHaveBeenCalledTimes(1) - expect(dataSyncService.sync).toHaveBeenCalledTimes(1) - expect(mockKeyValueStore.addOrUpdate).toHaveBeenCalledTimes(1) - expect(preprocessor.sync).toHaveBeenCalled() + expect(mockIsTipFn).not.toHaveBeenCalled() + expect(dataSyncService.discardAfter).not.toHaveBeenCalled() + expect(dataSyncService.sync).not.toHaveBeenCalled() + expect(mockKeyValueStore.addOrUpdate).not.toHaveBeenCalled() + expect(preprocessor.sync).not.toHaveBeenCalled() }) }) }) + + describe(SyncScheduler.prototype.isTip.name, () => { + const syncScheduler = new SyncScheduler( + mockObject(), + mockObject(), + mockObject(), + mockObject>(), + Logger.SILENT, + { earliestBlock: 1, maxBlockNumber: 10 } + ) + + beforeEach(() => { + syncScheduler.dispatch({ + type: 'initialized', + lastSynced: 1_000_000, + knownBlocks: [block(1_000_001), block(1_000_002)], + }) + }) + + it('returns false if the block range is not the tip', () => { + expect(syncScheduler.isTip(1_000_001)).toEqual(false) + }) + + it('returns true if the block range is the tip', () => { + expect(syncScheduler.isTip(1_000_003)).toEqual(true) + }) + }) }) diff --git a/packages/backend/src/core/sync/SyncScheduler.ts b/packages/backend/src/core/sync/SyncScheduler.ts index 6dec08320..18d194d8d 100644 --- a/packages/backend/src/core/sync/SyncScheduler.ts +++ b/packages/backend/src/core/sync/SyncScheduler.ts @@ -75,7 +75,6 @@ export class SyncScheduler { name: 'action', execute: async () => { this.logger.debug({ method: 'effect', effect: effect.type }) - if (effect.type === 'sync') { await this.handleSync(effect.blocks) } else { @@ -86,6 +85,10 @@ export class SyncScheduler { } } + isTip(syncedBlockNumber: number) { + return this.state.remaining.end === syncedBlockNumber + } + async handleSync(blocks: BlockRange) { if (blocks.end > this.maxBlockNumber) { this.logger.info( @@ -102,8 +105,9 @@ export class SyncScheduler { return } try { + const isTip = this.isTip(blocks.end) await this.dataSyncService.discardAfter(blocks.start - 1) - await this.dataSyncService.sync(blocks) + await this.dataSyncService.sync(blocks, isTip) await this.kvStore.addOrUpdate({ key: 'lastBlockNumberSynced', value: blocks.end - 1, diff --git a/packages/backend/src/peripherals/database/KeyValueStore.ts b/packages/backend/src/peripherals/database/KeyValueStore.ts index 9d81b7566..db4db8183 100644 --- a/packages/backend/src/peripherals/database/KeyValueStore.ts +++ b/packages/backend/src/peripherals/database/KeyValueStore.ts @@ -11,6 +11,7 @@ export type KeyValueRecord = z.infer export const KeyValueRecord = z.union([ z.object({ key: z.literal('softwareMigrationNumber'), value: stringAsInt() }), z.object({ key: z.literal('lastBlockNumberSynced'), value: stringAsInt() }), + z.object({ key: z.literal('lastSyncedThirdPartyId'), value: stringAsInt() }), z.object({ key: z.literal('userStatisticsPreprocessorCaughtUp'), value: stringAsBoolean(), diff --git a/packages/backend/src/peripherals/database/L2TransactionRepository.ts b/packages/backend/src/peripherals/database/L2TransactionRepository.ts index 99afe1362..74b14ae2e 100644 --- a/packages/backend/src/peripherals/database/L2TransactionRepository.ts +++ b/packages/backend/src/peripherals/database/L2TransactionRepository.ts @@ -295,6 +295,7 @@ export class L2TransactionRepository extends BaseRepository { const knex = await this.knex() const results = await knex('l2_transactions') .select('state_update_id') + .whereNotNull('state_update_id') .orderBy('state_update_id', 'desc') .limit(1) .first() diff --git a/packages/backend/src/peripherals/starkware/schema/regexes.ts b/packages/backend/src/peripherals/starkware/schema/regexes.ts index c1fc64167..cdb47bdb4 100644 --- a/packages/backend/src/peripherals/starkware/schema/regexes.ts +++ b/packages/backend/src/peripherals/starkware/schema/regexes.ts @@ -6,6 +6,6 @@ export const PedersenHash = z.string().regex(/^0[a-f\d]{63}$/) export const Hash256_0x = z.string().regex(/^0x[a-f\d]{1,64}$/) export const Hash256 = z.string().regex(/^[a-f\d]{1,64}$/) export const StarkKey0x = z.string().regex(/^0x[a-f\d]{1,64}$/) -export const AssetHash0x = z.string().regex(/^0x[a-f\d]{0,63}$/) +export const AssetHash0x = z.string().regex(/^0x[a-f\d]{1,64}$/) export const AssetId = z.string().regex(/^0x[a-f\d]{30}$/) export const EthereumAddress = z.string().regex(/^0x[a-fA-F0-9]{40}$/)