Skip to content

Commit

Permalink
feat: 🎸 add local MQ to provide an MQ like dev experience
Browse files Browse the repository at this point in the history
when artemis is not configured an in memory implementation will be used
so that AMQP process mode can be used in dev
  • Loading branch information
polymath-eric committed Jul 29, 2024
1 parent fbe0526 commit 056c997
Show file tree
Hide file tree
Showing 35 changed files with 424 additions and 151 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,16 @@ AMQP is a form on offline processing where the payload will be published on an A
1. A signer process subscribes to `Requests`. For each message it generates a signature, and publishes a message on `Signatures`
1. A submitter process subscribes to `Signatures` and submits to the chain. It publishes to `Finalizations`, for consumer applications to subscribe to

To use AMQP mode a message broker must be configured. The implementation assumes [ArtemisMQ](https://activemq.apache.org/components/artemis/) is used, with an AMQP acceptor. In theory any AMQP 1.0 compliant broker should work though.
When using AMQP mode a message broker should be configured. The current implementation assumes [ArtemisMQ](https://activemq.apache.org/components/artemis/) is used, with an AMQP acceptor. Other message queues can be implemented as needed.

If using AMQP, it is strongly recommended to use a persistent data store (i.e postgres). There are two tables related to AMQP processing: `offline_tx` and `offline_event`:
- `offline_tx` is a table for the submitter process. This provides a convenient way to query submitted transactions, and to detect ones rejected by the chain for some reason
- `offline_event` is a table for the recorder process. This uses Artemis diverts to record every message exchanged in the process, serving as an audit log

If using the project's compose file, an Artemis console will be exposed on `:8181` with `artemis` being both username and password.

If artemis config values are not set, then an in memory implementation will be defaulted to. This mode is not recommended for use in production environments since the messages are ephemeral.

### Webhooks (alpha)

Normally the endpoints that create transactions wait for block finalization before returning a response, which normally takes around 15 seconds. When processMode `submitWithCallback` is used the `webhookUrl` param must also be provided. The server will respond after submitting the transaction to the mempool with 202 (Accepted) status code instead of the usual 201 (Created).
Expand Down
2 changes: 1 addition & 1 deletion src/accounts/accounts.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ export class AccountsController {
@ApiOperation({
summary: 'Get Account details',
description:
'This endpoint retrieves the Account details for the given Account address. This includes the associated Identity DID, primary account for that Identity and Secondary Accounts with the Permissions and the Subsidy details',
'This endpoint retrieves the Account details for the given Account address. This includes the associated Identity DID, primary account for that Identity and Secondary Accounts with the Permissions and the Subsidy details',
})
@ApiParam({
name: 'account',
Expand Down
3 changes: 2 additions & 1 deletion src/accounts/models/account-details.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export class AccountDetailsModel {
readonly identity?: IdentityModel;

@ApiPropertyOptional({
description: 'MultiSig Account details',
description:
'The MultiSig for which the account is a signer for. Will not be set if the account is not a MultiSig signer',
type: MultiSigDetailsModel,
})
@Type(() => MultiSigDetailsModel)
Expand Down
2 changes: 1 addition & 1 deletion src/accounts/models/multi-sig-details.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { SignerModel } from '~/identities/models/signer.model';

export class MultiSigDetailsModel {
@ApiProperty({
description: 'Secondary accounts with permissions',
description: 'Signing accounts for the multiSig',
isArray: true,
type: SignerModel,
})
Expand Down
14 changes: 4 additions & 10 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { ConfigModule } from '@nestjs/config';
import Joi from 'joi';

import { AccountsModule } from '~/accounts/accounts.module';
import { ArtemisModule } from '~/artemis/artemis.module';
import { AssetsModule } from '~/assets/assets.module';
import { AuthModule } from '~/auth/auth.module';
import { AuthStrategy } from '~/auth/strategies/strategies.consts';
Expand Down Expand Up @@ -102,15 +101,10 @@ import { UsersModule } from '~/users/users.module';
MetadataModule,
SubsidyModule,
NftsModule,
...(process.env.ARTEMIS_HOST
? [
ArtemisModule,
OfflineSignerModule,
OfflineSubmitterModule,
OfflineStarterModule,
OfflineRecorderModule,
]
: []),
OfflineSignerModule,
OfflineSubmitterModule,
OfflineStarterModule,
OfflineRecorderModule,
],
})
export class AppModule {}
13 changes: 0 additions & 13 deletions src/artemis/artemis.module.ts

This file was deleted.

4 changes: 2 additions & 2 deletions src/common/utils/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ export enum QueueName {
EventsLog = 'EventsLog',

Requests = 'Requests',
SignerRequests = 'SignerRequests',
SubmitterRequests = 'SubmitterRequests',

Signatures = 'Signatures',

Finalizations = 'Finalizations',
}
10 changes: 0 additions & 10 deletions src/datastore/interfaces/postgres-config.interface.ts

This file was deleted.

14 changes: 14 additions & 0 deletions src/message/artemis/artemis.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/* istanbul ignore file */

import { Module } from '@nestjs/common';

import { LoggerModule } from '~/logger/logger.module';
import { ArtemisService } from '~/message/artemis/artemis.service';
import { MessageService } from '~/message/common/message.service';

@Module({
imports: [LoggerModule],
providers: [{ provide: MessageService, useClass: ArtemisService }],
exports: [MessageService],
})
export class ArtemisModule {}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import { createMock, DeepMocked } from '@golevelup/ts-jest';
import { Test, TestingModule } from '@nestjs/testing';
import { BigNumber } from '@polymeshassociation/polymesh-sdk';
import { IsString } from 'class-validator';
import { when } from 'jest-when';
import { EventContext } from 'rhea-promise';

import { ArtemisService } from '~/artemis/artemis.service';
import { AppInternalError } from '~/common/errors';
import { clearEventLoop } from '~/common/utils';
import { AddressName, QueueName } from '~/common/utils/amqp';
import { mockPolymeshLoggerProvider } from '~/logger/mock-polymesh-logger';
import { PolymeshLogger } from '~/logger/polymesh-logger.service';
import { ArtemisService } from '~/message/artemis/artemis.service';
import { ArtemisConfig } from '~/message/artemis/utils';
import { MessageService } from '~/message/common/message.service';

const mockSend = jest.fn();
const mockConnectionClose = jest.fn();
Expand Down Expand Up @@ -68,31 +72,60 @@ jest.mock('rhea-promise', () => {
};
});

const mockConfig: ArtemisConfig = {
type: 'artemis',
port: 1,
username: 'someUser',
password: 'somePassword',
host: 'http://example.com',
operationTimeoutInSeconds: 10,
transport: 'tcp',
configured: true,
};

const mockReceipt = { id: 1 };
const otherMockReceipt = { id: 2 };

describe('ArtemisService generic test suite', () => {
const logger = createMock<PolymeshLogger>();

const service = new ArtemisService(logger, mockConfig);

mockSend.mockResolvedValue(mockReceipt);

MessageService.test(service);
});

describe('ArtemisService', () => {
let service: ArtemisService;
let logger: DeepMocked<PolymeshLogger>;
let configSpy: jest.SpyInstance;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [ArtemisService, mockPolymeshLoggerProvider],
providers: [
ArtemisService,
mockPolymeshLoggerProvider,
{ provide: 'ARTEMIS_CONFIG', useValue: mockConfig },
],
}).compile();

service = module.get<ArtemisService>(ArtemisService);
configSpy = jest.spyOn(service, 'isConfigured');
configSpy.mockReturnValue(true);
logger = module.get<typeof logger>(PolymeshLogger);
mockSend.mockResolvedValue(mockReceipt);
});

it('should be defined', () => {
expect(service).toBeDefined();
});

it('should throw an error if constructed with unconfigured config', () => {
expect(() => new ArtemisService(logger, { configured: false, type: 'artemis' })).toThrow(
AppInternalError
);
});

describe('sendMessage', () => {
it('should send a message', async () => {
const mockReceipt = 'mockReceipt';
const otherMockReceipt = 'otherMockReceipt';

const topicName = AddressName.Requests;
const body = { payload: 'some payload' };
const otherBody = { other: 'payload' };
Expand All @@ -103,10 +136,12 @@ describe('ArtemisService', () => {
.mockResolvedValue(otherMockReceipt);

const receipt = await service.sendMessage(topicName, body);
expect(receipt).toEqual(mockReceipt);
expect(receipt).toEqual(expect.objectContaining({ id: new BigNumber(mockReceipt.id) }));

const otherReceipt = await service.sendMessage(topicName, otherBody);
expect(otherReceipt).toEqual(otherMockReceipt);
expect(otherReceipt).toEqual(
expect.objectContaining({ id: new BigNumber(otherMockReceipt.id) })
);
});
});

Expand Down Expand Up @@ -169,14 +204,5 @@ describe('ArtemisService', () => {

expect(logger.error).toHaveBeenCalled();
});

it('should do no work if service is not configured', async () => {
configSpy.mockReturnValue(false);

const initialCallCount = mockConnectionClose.mock.calls.length;
await service.onApplicationShutdown();

expect(mockConnectionClose.mock.calls.length).toEqual(initialCallCount);
});
});
});
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { BigNumber } from '@polymeshassociation/polymesh-sdk';
import { validate } from 'class-validator';
import {
AwaitableSender,
AwaitableSendOptions,
Connection,
ConnectionOptions,
Container,
Delivery,
EventContext,
Receiver,
ReceiverEvents,
Expand All @@ -17,8 +17,9 @@ import {
import { AppInternalError } from '~/common/errors';
import { AddressName, QueueName } from '~/common/utils/amqp';
import { PolymeshLogger } from '~/logger/polymesh-logger.service';

type EventHandler<T> = (params: T) => Promise<void>;
import { ArtemisConfig } from '~/message/artemis/types';
import { MessageReceipt } from '~/message/common';
import { EventHandler, MessageService } from '~/message/common/message.service';

interface AddressEntry {
addressName: AddressName;
Expand All @@ -28,28 +29,31 @@ interface AddressEntry {
type AddressStore = Record<AddressName, AddressEntry>;

@Injectable()
export class ArtemisService implements OnApplicationShutdown {
export class ArtemisService extends MessageService implements OnApplicationShutdown {
private receivers: Receiver[] = [];
private addressStore: Partial<AddressStore> = {};
private connectionPromise?: Promise<Connection>;

constructor(private readonly logger: PolymeshLogger) {
constructor(
private readonly logger: PolymeshLogger,
@Inject('ARTEMIS_CONFIG') private readonly config: ArtemisConfig
) {
super();
this.logger.setContext(ArtemisService.name);
}
this.logger.debug('Artemis service initialized');

public isConfigured(): boolean {
return !!process.env.ARTEMIS_HOST;
if (!this.config.configured) {
throw new AppInternalError(
'Artemis service was constructed, but Artemis config values were not set'
);
}
}

public async onApplicationShutdown(signal?: string | undefined): Promise<void> {
this.logger.debug(
`artemis service received application shutdown request, sig: ${signal} - now closing connections`
);

if (!this.isConfigured()) {
return;
}

const closePromises = [
...this.receivers.map(receiver => receiver.close()),
...this.addressEntries().map(entry => entry.sender.close()),
Expand Down Expand Up @@ -88,16 +92,7 @@ export class ArtemisService implements OnApplicationShutdown {
}

private connectOptions(): ConnectionOptions {
const { ARTEMIS_HOST, ARTEMIS_USERNAME, ARTEMIS_PASSWORD, ARTEMIS_PORT } = process.env;

return {
port: Number(ARTEMIS_PORT),
host: ARTEMIS_HOST,
username: ARTEMIS_USERNAME,
password: ARTEMIS_PASSWORD,
operationTimeoutInSeconds: 10,
transport: 'tcp',
};
return this.config as ConnectionOptions;
}

private sendOptions(): AwaitableSendOptions {
Expand Down Expand Up @@ -139,14 +134,19 @@ export class ArtemisService implements OnApplicationShutdown {
};
}

public async sendMessage(publishOn: AddressName, body: unknown): Promise<Delivery> {
public async sendMessage(publishOn: AddressName, body: unknown): Promise<MessageReceipt> {
const { sender } = await this.getAddress(publishOn);

const message = { body };

const sendOptions = this.sendOptions();
this.logger.debug(`sending message on: ${publishOn}`);
return sender.send(message, sendOptions);
const receipt = await sender.send(message, sendOptions);

return {
id: new BigNumber(receipt.id),
topic: publishOn,
};
}

/**
Expand Down Expand Up @@ -206,6 +206,7 @@ export class ArtemisService implements OnApplicationShutdown {
});

receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
/* istanbul ignore next */
const receiverError = context?.receiver?.error;
this.logger.error(`an error occurred for receiver "${listenOn}": ${receiverError}`);
});
Expand Down
14 changes: 14 additions & 0 deletions src/message/artemis/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/* istanbul ignore file */

export type ArtemisConfig =
| {
type: 'artemis';
port: number;
host: string;
username: string;
password: string;
operationTimeoutInSeconds: number;
transport: string;
configured: true;
}
| { type: 'artemis'; configured: false };
Loading

0 comments on commit 056c997

Please sign in to comment.