Skip to content

Commit

Permalink
Add PostIndex handler support and update CosmosEvent to handle the so…
Browse files Browse the repository at this point in the history
…urce of the event.
  • Loading branch information
jorgecuesta committed Jan 15, 2025
1 parent 1cf7758 commit 71f8f33
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 15 deletions.
13 changes: 13 additions & 0 deletions packages/common-cosmos/src/project/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
CosmosMessageHandler,
CustomModule,
CosmosTxFilter,
CosmosPostIndexBlockHandler,
} from '@subql/types-cosmos';
import {plainToClass, Transform, Type} from 'class-transformer';
import {
Expand Down Expand Up @@ -115,6 +116,16 @@ export class EventHandler implements CosmosEventHandler {
handler!: string;
}

export class PostIndexHandler implements CosmosPostIndexBlockHandler {
@IsEnum(CosmosHandlerKind, {groups: [CosmosHandlerKind.PostIndex]})
kind!: CosmosHandlerKind.PostIndex;
@IsString()
handler!: string;
@IsOptional()
@Type(() => BlockFilter)
filter?: CosmosBlockFilter;
}

export class CustomHandler implements CosmosCustomHandler {
@IsString()
kind!: string;
Expand All @@ -138,6 +149,8 @@ export class RuntimeMapping implements CosmosMapping {
return plainToClass(TransactionHandler, handler);
case CosmosHandlerKind.Block:
return plainToClass(BlockHandler, handler);
case CosmosHandlerKind.PostIndex:
return plainToClass(PostIndexHandler, handler);
default:
throw new Error(`handler ${(handler as any).kind} not supported`);
}
Expand Down
6 changes: 6 additions & 0 deletions packages/common-cosmos/src/project/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ export function isEventHandlerProcessor<E>(
return hp.baseHandlerKind === CosmosHandlerKind.Event;
}

export function isPostIndexHandlerProcessor<E>(
hp: SecondLayerHandlerProcessorArray<CosmosHandlerKind, DefaultFilter, unknown>
): hp is SecondLayerHandlerProcessor<CosmosHandlerKind.PostIndex, DefaultFilter, E> {
return hp.baseHandlerKind === CosmosHandlerKind.PostIndex;
}

export function isCosmosTemplates(
templatesData: any,
specVersion: string
Expand Down
1 change: 1 addition & 0 deletions packages/node/src/indexer/dictionary/v1/dictionaryV1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ function buildDictionaryQueryEntries(
if (!filterList.length) return [];
switch (baseHandlerKind) {
case CosmosHandlerKind.Block:
case CosmosHandlerKind.PostIndex:
for (const filter of filterList as CosmosBlockFilter[]) {
if (filter.modulo === undefined) {
return [];
Expand Down
1 change: 1 addition & 0 deletions packages/node/src/indexer/dynamic-ds.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ export class DynamicDsService extends BaseDynamicDsService<
return handler;
case CosmosHandlerKind.Transaction:
case CosmosHandlerKind.Block:
case CosmosHandlerKind.PostIndex:
default:
return handler;
}
Expand Down
57 changes: 50 additions & 7 deletions packages/node/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { Injectable } from '@nestjs/common';
import {
isBlockHandlerProcessor,
isPostIndexHandlerProcessor,
isTransactionHandlerProcessor,
isMessageHandlerProcessor,
isEventHandlerProcessor,
Expand Down Expand Up @@ -111,22 +112,49 @@ export class IndexerManager extends BaseIndexerManager<
): Promise<void> {
await this.indexBlockContent(blockContent, dataSources, getVM);

const msgsTxMap = {};
const evntsTxMap = {};

blockContent.messages.forEach((msg) => {
if (!msgsTxMap[msg.tx.hash]) msgsTxMap[msg.tx.hash] = [];
msgsTxMap[msg.tx.hash].push(msg);
});

blockContent.events.forEach((event) => {
const key = `${event.tx?.hash}`;
if (!evntsTxMap[key]) {
evntsTxMap[key] = { msg: [], nonMsg: [] };
}
// if we get any number, is a right value.
if (typeof event.msg?.idx === 'number') evntsTxMap[key].msg.push(event);
else evntsTxMap[key].nonMsg.push(event);
});

for (const evt of blockContent.beginBlockEvents ?? []) {
await this.indexEvent(evt, dataSources, getVM);
}

for (const tx of blockContent.transactions) {
await this.indexTransaction(tx, dataSources, getVM);
const msgs = blockContent.messages.filter(
(msg) => msg.tx.hash === tx.hash,
);
// not efficient at all because for every transaction it iterates over the whole list of messages again.
// const msgs = blockContent.messages.filter(
// (msg) => msg.tx.hash === tx.hash,
// );
const msgs = msgsTxMap[tx.hash] ?? [];
for (const msg of msgs) {
await this.indexMessage(msg, dataSources, getVM);
const events = blockContent.events.filter(
(event) => event.tx.hash === tx.hash && event.msg?.idx === msg.idx,
);
// Not efficient at all because for every transaction it iterates over the whole list of events again.
// Also, miss the non-msg related events.
// const events = blockContent.events.filter(
// (event) => event.tx.hash === tx.hash && event.msg?.idx === msg.idx,
// );
const eventMap = evntsTxMap[tx.hash] ?? { msg: [], nonMsg: [] };

for (const evt of eventMap.msg) {
await this.indexEvent(evt, dataSources, getVM);
}

for (const evt of events) {
for (const evt of eventMap.nonMsg) {
await this.indexEvent(evt, dataSources, getVM);
}
}
Expand All @@ -139,6 +167,8 @@ export class IndexerManager extends BaseIndexerManager<
for (const evt of blockContent.finalizeBlockEvents ?? []) {
await this.indexEvent(evt, dataSources, getVM);
}

await this.postIndexHook(blockContent, dataSources, getVM);
}

private async indexBlockContent(
Expand Down Expand Up @@ -181,6 +211,16 @@ export class IndexerManager extends BaseIndexerManager<
}
}

private async postIndexHook(
block: BlockContent,
dataSources: CosmosDatasource[],
getVM: (d: CosmosDatasource) => Promise<IndexerSandbox>,
): Promise<void> {
for (const ds of dataSources) {
await this.indexData(CosmosHandlerKind.PostIndex, block.block, ds, getVM);
}
}

protected async prepareFilteredData<T = any>(
kind: CosmosHandlerKind,
data: T,
Expand Down Expand Up @@ -216,18 +256,21 @@ type ProcessorTypeMap = {
[CosmosHandlerKind.Event]: typeof isEventHandlerProcessor;
[CosmosHandlerKind.Transaction]: typeof isTransactionHandlerProcessor;
[CosmosHandlerKind.Message]: typeof isMessageHandlerProcessor;
[CosmosHandlerKind.PostIndex]: typeof isPostIndexHandlerProcessor;
};

const ProcessorTypeMap = {
[CosmosHandlerKind.Block]: isBlockHandlerProcessor,
[CosmosHandlerKind.Event]: isEventHandlerProcessor,
[CosmosHandlerKind.Transaction]: isTransactionHandlerProcessor,
[CosmosHandlerKind.Message]: isMessageHandlerProcessor,
[CosmosHandlerKind.PostIndex]: isPostIndexHandlerProcessor,
};

const FilterTypeMap = {
[CosmosHandlerKind.Block]: CosmosUtil.filterBlock,
[CosmosHandlerKind.Transaction]: CosmosUtil.filterTx,
[CosmosHandlerKind.Event]: CosmosUtil.filterEvent,
[CosmosHandlerKind.Message]: CosmosUtil.filterMessageData,
[CosmosHandlerKind.PostIndex]: CosmosUtil.filterBlock,
};
29 changes: 26 additions & 3 deletions packages/node/src/utils/cosmos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
CosmosMessage,
CosmosBlockFilter,
CosmosTxFilter,
CosmosEventKind,
} from '@subql/types-cosmos';
import { isObjectLike } from 'lodash';
import { isLong } from 'long';
Expand Down Expand Up @@ -342,6 +343,7 @@ export function wrapBlockBeginAndEndEvents(
block: CosmosBlock,
events: TxEvent[],
idxOffset: number,
kind: CosmosEventKind,
): CosmosEvent[] {
return events.map(
(event) =>
Expand All @@ -352,6 +354,7 @@ export function wrapBlockBeginAndEndEvents(
msg: null,
tx: null,
log: null,
kind: kind,
} as unknown as CosmosEvent),
);
}
Expand All @@ -371,14 +374,20 @@ export function wrapEvent(
): CosmosEvent[] {
const events: CosmosEvent[] = [];
for (const tx of txs) {
const appendEvent = (msg: CosmosMessage, event: TxEvent, log: Log) => {
const appendEvent = (
msg: CosmosMessage | undefined,
event: TxEvent,
log: Log,
kind: CosmosEventKind,
) => {
events.push({
idx: idxOffset++,
block,
tx,
msg,
event,
log,
kind,
});
};

Expand Down Expand Up @@ -410,7 +419,7 @@ export function wrapEvent(
continue;
}
for (let i = 0; i < log.events.length; i++) {
appendEvent(msg, log.events[i], log);
appendEvent(msg, log.events[i], log, CosmosEventKind.Message);
}
}
} else if (tx.tx?.events) {
Expand All @@ -424,6 +433,12 @@ export function wrapEvent(

// Event doesn't have a message
if (eventMsgIndex === undefined) {
appendEvent(
undefined,
txEvent,
{ events: [], log: '', msg_index: -1 },
CosmosEventKind.Transaction,
);
continue;
}

Expand All @@ -435,7 +450,12 @@ export function wrapEvent(
}

// TODO does a log still exist in Comet38?
appendEvent(msg, txEvent, { events: [], log: '', msg_index: -1 });
appendEvent(
msg,
txEvent,
{ events: [], log: '', msg_index: -1 },
CosmosEventKind.Message,
);
}
} else {
// For some tests that have invalid data
Expand Down Expand Up @@ -562,6 +582,7 @@ export class LazyBlockContent implements BlockContent {
this.block,
[...results.beginBlockEvents],
this._eventIdx,
CosmosEventKind.BeginBlock,
);
this._eventIdx += this._wrappedBeginBlockEvents.length;
}
Expand All @@ -582,6 +603,7 @@ export class LazyBlockContent implements BlockContent {
this.block,
[...results.endBlockEvents],
this._eventIdx,
CosmosEventKind.EndBlock,
);
this._eventIdx += this._wrappedEndBlockEvents.length;
}
Expand All @@ -600,6 +622,7 @@ export class LazyBlockContent implements BlockContent {
this.block,
[...results.finalizeBlockEvents],
this._eventIdx,
CosmosEventKind.FinalizeBlock,
);
this._eventIdx += this._wrappedFinalizedBlockEvents.length;
}
Expand Down
14 changes: 12 additions & 2 deletions packages/types/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,23 @@ export interface CosmosMessage<T = any> {
};
}

export enum CosmosEventKind {
BeginBlock = 'begin_block',
EndBlock = 'end_block',
FinalizeBlock = 'finalize_block',
Message = 'message',
Transaction = 'transaction',
}

export interface CosmosEvent {
idx: number;
block: CosmosBlock;
tx: CosmosTransaction;
msg: CosmosMessage;
// tx and msg are optional because this is a shared interface that is use with begin, end and finalize block events.
tx?: CosmosTransaction;
msg?: CosmosMessage;
log: Log;
event: TxEvent;
kind: CosmosEventKind;
}

export type DynamicDatasourceCreator = (name: string, args: Record<string, unknown>) => Promise<void>;
Expand Down
20 changes: 17 additions & 3 deletions packages/types/src/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,26 @@ export enum CosmosHandlerKind {
* Handler for Cosmos events.
*/
Event = 'cosmos/EventHandler',
/**
* Post-processing functions
*/
PostIndex = 'cosmos/PostIndexHandler',
}

export type CosmosRuntimeHandlerInputMap = {
[CosmosHandlerKind.Block]: CosmosBlock;
[CosmosHandlerKind.Transaction]: CosmosTransaction;
[CosmosHandlerKind.Message]: CosmosMessage;
[CosmosHandlerKind.Event]: CosmosEvent;
[CosmosHandlerKind.PostIndex]: CosmosBlock;
};

type CosmosRuntimeFilterMap = {
[CosmosHandlerKind.Block]: CosmosBlockFilter;
[CosmosHandlerKind.Transaction]: CosmosTxFilter;
[CosmosHandlerKind.Message]: CosmosMessageFilter;
[CosmosHandlerKind.Event]: CosmosEventFilter;
[CosmosHandlerKind.PostIndex]: CosmosBlockFilter;
};

/**
Expand Down Expand Up @@ -198,6 +204,12 @@ export type CosmosMessageHandler = CosmosCustomHandler<CosmosHandlerKind.Message
*/
export type CosmosEventHandler = CosmosCustomHandler<CosmosHandlerKind.Event, CosmosEventFilter>;

/**
* Represents a handler for Cosmos post-index blocks.
* @type {CosmosCustomHandler<CosmosHandlerKind.PostIndex, CosmosBlockFilter>}
*/
export type CosmosPostIndexBlockHandler = CosmosCustomHandler<CosmosHandlerKind.PostIndex, CosmosBlockFilter>;

/**
* Represents a generic custom handler for Cosmos.
* @interface
Expand All @@ -208,7 +220,7 @@ export interface CosmosCustomHandler<K extends string = string, F = Record<strin
/**
* The kind of handler. For `cosmos/Runtime` datasources this is either `Block`, `Transaction`, `Message` or `Event` kinds.
* The value of this will determine the filter options as well as the data provided to your handler function
* @type {CosmosHandlerKind.Block | CosmosHandlerKind.Transaction | CosmosHandlerKind.Message | CosmosHandlerKind.Event | string }
* @type {CosmosHandlerKind.Block | CosmosHandlerKind.Transaction | CosmosHandlerKind.Message | CosmosHandlerKind.Event | CosmosHandlerKind.PostIndex | string }
* @example
* kind: CosmosHandlerKind.Block // Defined with an enum, this is used for runtime datasources
* @example
Expand All @@ -231,7 +243,8 @@ export type CosmosRuntimeHandler =
| CosmosBlockHandler
| CosmosTransactionHandler
| CosmosMessageHandler
| CosmosEventHandler;
| CosmosEventHandler
| CosmosPostIndexBlockHandler;

export type CosmosHandler = CosmosRuntimeHandler | CosmosCustomHandler;

Expand Down Expand Up @@ -398,7 +411,8 @@ export type SecondLayerHandlerProcessorArray<
| SecondLayerHandlerProcessor<CosmosHandlerKind.Block, F, T, DS>
| SecondLayerHandlerProcessor<CosmosHandlerKind.Transaction, F, T, DS>
| SecondLayerHandlerProcessor<CosmosHandlerKind.Message, F, T, DS>
| SecondLayerHandlerProcessor<CosmosHandlerKind.Event, F, T, DS>;
| SecondLayerHandlerProcessor<CosmosHandlerKind.Event, F, T, DS>
| SecondLayerHandlerProcessor<CosmosHandlerKind.PostIndex, F, T, DS>;

export type CosmosDatasourceProcessor<
K extends string,
Expand Down

0 comments on commit 71f8f33

Please sign in to comment.