Skip to content

Commit

Permalink
Add support for batch processing of Cosmos transactions, messages, an…
Browse files Browse the repository at this point in the history
…d events

Introduces new handlers and utilities for batch processing, enhancing efficiency when handling multiple Cosmos entities at once. Updated relevant models, filters, and indexer logic to accommodate these batch handlers. This improves the flexibility and performance of Cosmos data processing.
  • Loading branch information
jorgecuesta committed Jan 17, 2025
1 parent 71f8f33 commit 9e71130
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 41 deletions.
38 changes: 38 additions & 0 deletions packages/common-cosmos/src/project/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import {
CustomModule,
CosmosTxFilter,
CosmosPostIndexBlockHandler,
BatchCosmosTransactionHandler,
BatchCosmosMessageHandler,
BatchCosmosEventHandler,
} from '@subql/types-cosmos';
import {plainToClass, Transform, Type} from 'class-transformer';
import {
Expand Down Expand Up @@ -116,6 +119,35 @@ export class EventHandler implements CosmosEventHandler {
handler!: string;
}

export class BatchTransactionHandler implements BatchCosmosTransactionHandler {
@IsEnum(CosmosHandlerKind, {groups: [CosmosHandlerKind.BatchTransaction]})
kind!: CosmosHandlerKind.BatchTransaction;
@IsString()
handler!: string;
}

export class BatchMessageHandler implements BatchCosmosMessageHandler {
@IsEnum(CosmosHandlerKind, {groups: [CosmosHandlerKind.BatchMessage]})
kind!: CosmosHandlerKind.BatchMessage;
@IsString()
handler!: string;
@IsOptional()
@ValidateNested()
@Type(() => MessageFilter)
filter?: CosmosMessageFilter;
}

export class BatchEventHandler implements BatchCosmosEventHandler {
@IsOptional()
@ValidateNested()
@Type(() => EventFilter)
filter?: CosmosEventFilter;
@IsEnum(CosmosHandlerKind, {groups: [CosmosHandlerKind.BatchEvent]})
kind!: CosmosHandlerKind.BatchEvent;
@IsString()
handler!: string;
}

export class PostIndexHandler implements CosmosPostIndexBlockHandler {
@IsEnum(CosmosHandlerKind, {groups: [CosmosHandlerKind.PostIndex]})
kind!: CosmosHandlerKind.PostIndex;
Expand Down Expand Up @@ -151,6 +183,12 @@ export class RuntimeMapping implements CosmosMapping {
return plainToClass(BlockHandler, handler);
case CosmosHandlerKind.PostIndex:
return plainToClass(PostIndexHandler, handler);
case CosmosHandlerKind.BatchTransaction:
return plainToClass(BatchTransactionHandler, handler);
case CosmosHandlerKind.BatchEvent:
return plainToClass(BatchEventHandler, handler);
case CosmosHandlerKind.BatchMessage:
return plainToClass(BatchMessageHandler, handler);
default:
throw new Error(`handler ${(handler as any).kind} not supported`);
}
Expand Down
18 changes: 18 additions & 0 deletions packages/common-cosmos/src/project/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ export function isPostIndexHandlerProcessor<E>(
return hp.baseHandlerKind === CosmosHandlerKind.PostIndex;
}

export function isBatchEventHandlerProcessor<E>(
hp: SecondLayerHandlerProcessorArray<CosmosHandlerKind, DefaultFilter, unknown>
): hp is SecondLayerHandlerProcessor<CosmosHandlerKind.BatchEvent, DefaultFilter, E> {
return hp.baseHandlerKind === CosmosHandlerKind.BatchEvent;
}

export function isBatchTransactionHandlerProcessor<E>(
hp: SecondLayerHandlerProcessorArray<CosmosHandlerKind, DefaultFilter, unknown>
): hp is SecondLayerHandlerProcessor<CosmosHandlerKind.BatchTransaction, DefaultFilter, E> {
return hp.baseHandlerKind === CosmosHandlerKind.BatchTransaction;
}

export function isBatchMessageHandlerProcessor<E>(
hp: SecondLayerHandlerProcessorArray<CosmosHandlerKind, DefaultFilter, unknown>
): hp is SecondLayerHandlerProcessor<CosmosHandlerKind.BatchMessage, DefaultFilter, E> {
return hp.baseHandlerKind === CosmosHandlerKind.BatchMessage;
}

export function isCosmosTemplates(
templatesData: any,
specVersion: string
Expand Down
10 changes: 6 additions & 4 deletions packages/node/src/indexer/dictionary/v1/dictionaryV1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ function getBaseHandlerFilters<T extends CosmosHandlerFilter>(
}
}

// Adding batches move complexity over 22 and the eslint rule is 20
// eslint-disable-next-line complexity
function buildDictionaryQueryEntries(
dataSources: CosmosDatasource[],
getDsProcessor: GetDsProcessor,
Expand Down Expand Up @@ -139,7 +141,8 @@ function buildDictionaryQueryEntries(
}
}
break;
case CosmosHandlerKind.Message: {
case CosmosHandlerKind.Message:
case CosmosHandlerKind.BatchMessage:
for (const filter of filterList as CosmosMessageFilter[]) {
if (filter.type !== undefined) {
queryEntries.push(messageFilterToQueryEntry(filter));
Expand All @@ -148,8 +151,8 @@ function buildDictionaryQueryEntries(
}
}
break;
}
case CosmosHandlerKind.Event: {
case CosmosHandlerKind.Event:
case CosmosHandlerKind.BatchEvent:
for (const filter of filterList as CosmosEventFilter[]) {
if (filter.type !== undefined) {
queryEntries.push(eventFilterToQueryEntry(filter));
Expand All @@ -158,7 +161,6 @@ function buildDictionaryQueryEntries(
}
}
break;
}
default:
}
}
Expand Down
7 changes: 5 additions & 2 deletions packages/node/src/indexer/dynamic-ds.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import {
DynamicDsService as BaseDynamicDsService,
} from '@subql/node-core';
import { CosmosDatasource, CosmosHandlerKind } from '@subql/types-cosmos';
import { plainToClass, ClassConstructor } from 'class-transformer';
import { validateSync, IsOptional, IsObject } from 'class-validator';
import { ClassConstructor, plainToClass } from 'class-transformer';
import { IsObject, IsOptional, validateSync } from 'class-validator';
import { SubqueryProject } from '../configure/SubqueryProject';
import { DsProcessorService } from './ds-processor.service';

Expand Down Expand Up @@ -83,6 +83,7 @@ export class DynamicDsService extends BaseDynamicDsService<
dsObj.mapping.handlers = dsObj.mapping.handlers.map((handler) => {
switch (handler.kind) {
case CosmosHandlerKind.Event:
case CosmosHandlerKind.BatchEvent:
assert(
handler.filter,
'Dynamic datasources must have some predfined filter',
Expand All @@ -106,6 +107,7 @@ export class DynamicDsService extends BaseDynamicDsService<
}
return handler;
case CosmosHandlerKind.Message:
case CosmosHandlerKind.BatchMessage:
assert(
handler.filter,
'Dynamic datasources must have some predfined filter',
Expand All @@ -123,6 +125,7 @@ export class DynamicDsService extends BaseDynamicDsService<
}
return handler;
case CosmosHandlerKind.Transaction:
case CosmosHandlerKind.BatchTransaction:
case CosmosHandlerKind.Block:
case CosmosHandlerKind.PostIndex:
default:
Expand Down
Loading

0 comments on commit 9e71130

Please sign in to comment.