Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 🎸 add local MQ to provide an MQ like dev experience #284

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,16 @@ AMQP is a form on offline processing where the payload will be published on an A
1. A signer process subscribes to `Requests`. For each message it generates a signature, and publishes a message on `Signatures`
1. A submitter process subscribes to `Signatures` and submits to the chain. It publishes to `Finalizations`, for consumer applications to subscribe to

To use AMQP mode a message broker must be configured. The implementation assumes [ArtemisMQ](https://activemq.apache.org/components/artemis/) is used, with an AMQP acceptor. In theory any AMQP 1.0 compliant broker should work though.
When using AMQP mode a message broker should be configured. The current implementation assumes [ArtemisMQ](https://activemq.apache.org/components/artemis/) is used, with an AMQP acceptor. Other message queues can be implemented as needed.

If using AMQP, it is strongly recommended to use a persistent data store (i.e postgres). There are two tables related to AMQP processing: `offline_tx` and `offline_event`:
- `offline_tx` is a table for the submitter process. This provides a convenient way to query submitted transactions, and to detect ones rejected by the chain for some reason
- `offline_event` is a table for the recorder process. This uses Artemis diverts to record every message exchanged in the process, serving as an audit log

If using the project's compose file, an Artemis console will be exposed on `:8181` with `artemis` being both username and password.

If artemis config values are not set, then an in memory implementation will be defaulted to. This mode is not recommended for use in production environments since the messages are ephemeral.

### Webhooks (alpha)

Normally the endpoints that create transactions wait for block finalization before returning a response, which normally takes around 15 seconds. When processMode `submitWithCallback` is used the `webhookUrl` param must also be provided. The server will respond after submitting the transaction to the mempool with 202 (Accepted) status code instead of the usual 201 (Created).
Expand Down
2 changes: 1 addition & 1 deletion src/accounts/accounts.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ export class AccountsController {
@ApiOperation({
summary: 'Get Account details',
description:
'This endpoint retrieves the Account details for the given Account address. This includes the associated Identity DID, primary account for that Identity and Secondary Accounts with the Permissions and the Subsidy details',
'This endpoint retrieves the Account details for the given Account address. This includes the associated Identity DID, primary account for that Identity and Secondary Accounts with the Permissions and the Subsidy details',
})
@ApiParam({
name: 'account',
Expand Down
3 changes: 2 additions & 1 deletion src/accounts/models/account-details.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export class AccountDetailsModel {
readonly identity?: IdentityModel;

@ApiPropertyOptional({
description: 'MultiSig Account details',
description:
'The MultiSig for which the account is a signer for. Will not be set if the account is not a MultiSig signer',
type: MultiSigDetailsModel,
})
@Type(() => MultiSigDetailsModel)
Expand Down
2 changes: 1 addition & 1 deletion src/accounts/models/multi-sig-details.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { SignerModel } from '~/identities/models/signer.model';

export class MultiSigDetailsModel {
@ApiProperty({
description: 'Secondary accounts with permissions',
description: 'Signing accounts for the multiSig',
isArray: true,
type: SignerModel,
})
Expand Down
14 changes: 4 additions & 10 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { ConfigModule } from '@nestjs/config';
import Joi from 'joi';

import { AccountsModule } from '~/accounts/accounts.module';
import { ArtemisModule } from '~/artemis/artemis.module';
import { AssetsModule } from '~/assets/assets.module';
import { AuthModule } from '~/auth/auth.module';
import { AuthStrategy } from '~/auth/strategies/strategies.consts';
Expand Down Expand Up @@ -102,15 +101,10 @@ import { UsersModule } from '~/users/users.module';
MetadataModule,
SubsidyModule,
NftsModule,
...(process.env.ARTEMIS_HOST
? [
ArtemisModule,
OfflineSignerModule,
OfflineSubmitterModule,
OfflineStarterModule,
OfflineRecorderModule,
]
: []),
OfflineSignerModule,
OfflineSubmitterModule,
OfflineStarterModule,
OfflineRecorderModule,
],
})
export class AppModule {}
13 changes: 0 additions & 13 deletions src/artemis/artemis.module.ts

This file was deleted.

4 changes: 2 additions & 2 deletions src/common/utils/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ export enum QueueName {
EventsLog = 'EventsLog',

Requests = 'Requests',
SignerRequests = 'SignerRequests',
SubmitterRequests = 'SubmitterRequests',

Signatures = 'Signatures',

Finalizations = 'Finalizations',
}
10 changes: 0 additions & 10 deletions src/datastore/interfaces/postgres-config.interface.ts

This file was deleted.

14 changes: 14 additions & 0 deletions src/message/artemis/artemis.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/* istanbul ignore file */

import { Module } from '@nestjs/common';

import { LoggerModule } from '~/logger/logger.module';
import { ArtemisService } from '~/message/artemis/artemis.service';
import { MessageService } from '~/message/common/message.service';

@Module({
imports: [LoggerModule],
providers: [{ provide: MessageService, useClass: ArtemisService }],
exports: [MessageService],
})
export class ArtemisModule {}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import { createMock, DeepMocked } from '@golevelup/ts-jest';
import { Test, TestingModule } from '@nestjs/testing';
import { BigNumber } from '@polymeshassociation/polymesh-sdk';
import { IsString } from 'class-validator';
import { when } from 'jest-when';
import { EventContext } from 'rhea-promise';

import { ArtemisService } from '~/artemis/artemis.service';
import { AppInternalError } from '~/common/errors';
import { clearEventLoop } from '~/common/utils';
import { AddressName, QueueName } from '~/common/utils/amqp';
import { mockPolymeshLoggerProvider } from '~/logger/mock-polymesh-logger';
import { PolymeshLogger } from '~/logger/polymesh-logger.service';
import { ArtemisService } from '~/message/artemis/artemis.service';
import { ArtemisConfig } from '~/message/artemis/utils';
import { MessageService } from '~/message/common/message.service';

const mockSend = jest.fn();
const mockConnectionClose = jest.fn();
Expand Down Expand Up @@ -68,31 +72,60 @@ jest.mock('rhea-promise', () => {
};
});

const mockConfig: ArtemisConfig = {
type: 'artemis',
port: 1,
username: 'someUser',
password: 'somePassword',
host: 'http://example.com',
operationTimeoutInSeconds: 10,
transport: 'tcp',
configured: true,
};

const mockReceipt = { id: 1 };
const otherMockReceipt = { id: 2 };

describe('ArtemisService generic test suite', () => {
const logger = createMock<PolymeshLogger>();

const service = new ArtemisService(logger, mockConfig);

mockSend.mockResolvedValue(mockReceipt);

MessageService.test(service);
});

describe('ArtemisService', () => {
let service: ArtemisService;
let logger: DeepMocked<PolymeshLogger>;
let configSpy: jest.SpyInstance;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [ArtemisService, mockPolymeshLoggerProvider],
providers: [
ArtemisService,
mockPolymeshLoggerProvider,
{ provide: 'ARTEMIS_CONFIG', useValue: mockConfig },
],
}).compile();

service = module.get<ArtemisService>(ArtemisService);
configSpy = jest.spyOn(service, 'isConfigured');
configSpy.mockReturnValue(true);
logger = module.get<typeof logger>(PolymeshLogger);
mockSend.mockResolvedValue(mockReceipt);
});

it('should be defined', () => {
expect(service).toBeDefined();
});

it('should throw an error if constructed with unconfigured config', () => {
expect(() => new ArtemisService(logger, { configured: false, type: 'artemis' })).toThrow(
AppInternalError
);
});

describe('sendMessage', () => {
it('should send a message', async () => {
const mockReceipt = 'mockReceipt';
const otherMockReceipt = 'otherMockReceipt';

const topicName = AddressName.Requests;
const body = { payload: 'some payload' };
const otherBody = { other: 'payload' };
Expand All @@ -103,10 +136,12 @@ describe('ArtemisService', () => {
.mockResolvedValue(otherMockReceipt);

const receipt = await service.sendMessage(topicName, body);
expect(receipt).toEqual(mockReceipt);
expect(receipt).toEqual(expect.objectContaining({ id: new BigNumber(mockReceipt.id) }));

const otherReceipt = await service.sendMessage(topicName, otherBody);
expect(otherReceipt).toEqual(otherMockReceipt);
expect(otherReceipt).toEqual(
expect.objectContaining({ id: new BigNumber(otherMockReceipt.id) })
);
});
});

Expand Down Expand Up @@ -169,14 +204,5 @@ describe('ArtemisService', () => {

expect(logger.error).toHaveBeenCalled();
});

it('should do no work if service is not configured', async () => {
configSpy.mockReturnValue(false);

const initialCallCount = mockConnectionClose.mock.calls.length;
await service.onApplicationShutdown();

expect(mockConnectionClose.mock.calls.length).toEqual(initialCallCount);
});
});
});
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { BigNumber } from '@polymeshassociation/polymesh-sdk';
import { validate } from 'class-validator';
import {
AwaitableSender,
AwaitableSendOptions,
Connection,
ConnectionOptions,
Container,
Delivery,
EventContext,
Receiver,
ReceiverEvents,
Expand All @@ -17,8 +17,9 @@ import {
import { AppInternalError } from '~/common/errors';
import { AddressName, QueueName } from '~/common/utils/amqp';
import { PolymeshLogger } from '~/logger/polymesh-logger.service';

type EventHandler<T> = (params: T) => Promise<void>;
import { ArtemisConfig } from '~/message/artemis/types';
import { MessageReceipt } from '~/message/common';
import { EventHandler, MessageService } from '~/message/common/message.service';

interface AddressEntry {
addressName: AddressName;
Expand All @@ -28,28 +29,31 @@ interface AddressEntry {
type AddressStore = Record<AddressName, AddressEntry>;

@Injectable()
export class ArtemisService implements OnApplicationShutdown {
export class ArtemisService extends MessageService implements OnApplicationShutdown {
private receivers: Receiver[] = [];
private addressStore: Partial<AddressStore> = {};
private connectionPromise?: Promise<Connection>;

constructor(private readonly logger: PolymeshLogger) {
constructor(
private readonly logger: PolymeshLogger,
@Inject('ARTEMIS_CONFIG') private readonly config: ArtemisConfig
) {
super();
this.logger.setContext(ArtemisService.name);
}
this.logger.debug('Artemis service initialized');

public isConfigured(): boolean {
return !!process.env.ARTEMIS_HOST;
if (!this.config.configured) {
throw new AppInternalError(
'Artemis service was constructed, but Artemis config values were not set'
);
}
}

public async onApplicationShutdown(signal?: string | undefined): Promise<void> {
this.logger.debug(
`artemis service received application shutdown request, sig: ${signal} - now closing connections`
);

if (!this.isConfigured()) {
return;
}

const closePromises = [
...this.receivers.map(receiver => receiver.close()),
...this.addressEntries().map(entry => entry.sender.close()),
Expand Down Expand Up @@ -88,16 +92,7 @@ export class ArtemisService implements OnApplicationShutdown {
}

private connectOptions(): ConnectionOptions {
const { ARTEMIS_HOST, ARTEMIS_USERNAME, ARTEMIS_PASSWORD, ARTEMIS_PORT } = process.env;

return {
port: Number(ARTEMIS_PORT),
host: ARTEMIS_HOST,
username: ARTEMIS_USERNAME,
password: ARTEMIS_PASSWORD,
operationTimeoutInSeconds: 10,
transport: 'tcp',
};
return this.config as ConnectionOptions;
}

private sendOptions(): AwaitableSendOptions {
Expand Down Expand Up @@ -139,14 +134,19 @@ export class ArtemisService implements OnApplicationShutdown {
};
}

public async sendMessage(publishOn: AddressName, body: unknown): Promise<Delivery> {
public async sendMessage(publishOn: AddressName, body: unknown): Promise<MessageReceipt> {
const { sender } = await this.getAddress(publishOn);

const message = { body };

const sendOptions = this.sendOptions();
this.logger.debug(`sending message on: ${publishOn}`);
return sender.send(message, sendOptions);
const receipt = await sender.send(message, sendOptions);

return {
id: new BigNumber(receipt.id),
topic: publishOn,
};
}

/**
Expand Down Expand Up @@ -206,6 +206,7 @@ export class ArtemisService implements OnApplicationShutdown {
});

receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
/* istanbul ignore next */
const receiverError = context?.receiver?.error;
this.logger.error(`an error occurred for receiver "${listenOn}": ${receiverError}`);
});
Expand Down
14 changes: 14 additions & 0 deletions src/message/artemis/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/* istanbul ignore file */

export type ArtemisConfig =
| {
type: 'artemis';
port: number;
host: string;
username: string;
password: string;
operationTimeoutInSeconds: number;
transport: string;
configured: true;
}
| { type: 'artemis'; configured: false };
Loading
Loading