From fbb00f3863736220650df2d4bb4f628715704e5a Mon Sep 17 00:00:00 2001 From: Riqwan Thamir Date: Fri, 7 Jun 2024 15:31:16 +0200 Subject: [PATCH] chore: group & release events for local eventbus (#7649) * chore: stage & release events for local eventbus * chore: address review * chore: mock emitter correctly --- packages/core/utils/src/event-bus/index.ts | 10 ++ .../src/services/__tests__/event-bus-local.js | 70 -------- .../src/services/__tests__/event-bus-local.ts | 170 ++++++++++++++++++ .../src/services/event-bus-local.ts | 54 +++++- .../src/services/event-bus-redis.ts | 4 + 5 files changed, 237 insertions(+), 71 deletions(-) delete mode 100644 packages/modules/event-bus-local/src/services/__tests__/event-bus-local.js create mode 100644 packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts diff --git a/packages/core/utils/src/event-bus/index.ts b/packages/core/utils/src/event-bus/index.ts index 3bcb03dcdc5d8..c82217bb6efba 100644 --- a/packages/core/utils/src/event-bus/index.ts +++ b/packages/core/utils/src/event-bus/index.ts @@ -24,6 +24,16 @@ export abstract class AbstractEventBusModuleService abstract emit(data: EventBusTypes.EmitData[]): Promise abstract emit(data: EventBusTypes.Message[]): Promise + /* + Grouped events are useful when you have distributed transactions + where you need to explicitly group, release and clear events upon + lifecycle events of a transaction. + */ + // Given a eventGroupId, all the grouped events will be released + abstract releaseGroupedEvents(eventGroupId: string): Promise + // Given a eventGroupId, all the grouped events will be cleared + abstract clearGroupedEvents(eventGroupId: string): Promise + protected storeSubscribers({ event, subscriberId, diff --git a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.js b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.js deleted file mode 100644 index ccc695e6fc235..0000000000000 --- a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.js +++ /dev/null @@ -1,70 +0,0 @@ -import LocalEventBusService from "../event-bus-local" - -jest.genMockFromModule("events") -jest.mock("events") - -const loggerMock = { - info: jest.fn().mockReturnValue(console.log), - warn: jest.fn().mockReturnValue(console.log), - error: jest.fn().mockReturnValue(console.log), -} - -const moduleDeps = { - logger: loggerMock, -} - -describe("LocalEventBusService", () => { - let eventBus - - describe("emit", () => { - describe("Successfully emits events", () => { - beforeEach(() => { - jest.clearAllMocks() - }) - - it("Emits an event", () => { - eventBus = new LocalEventBusService( - moduleDeps, - {}, - { - resources: "shared", - } - ) - - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) - - eventBus.emit("eventName", { hi: "1234" }) - - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("eventName", { - hi: "1234", - }) - }) - - it("Emits multiple events", () => { - eventBus = new LocalEventBusService( - moduleDeps, - {}, - { - resources: "shared", - } - ) - - eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) - - eventBus.emit([ - { eventName: "event-1", data: { hi: "1234" } }, - { eventName: "event-2", data: { hi: "5678" } }, - ]) - - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", { - hi: "1234", - }) - expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", { - hi: "5678", - }) - }) - }) - }) -}) diff --git a/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts new file mode 100644 index 0000000000000..f604f4d24110d --- /dev/null +++ b/packages/modules/event-bus-local/src/services/__tests__/event-bus-local.ts @@ -0,0 +1,170 @@ +import LocalEventBusService from "../event-bus-local" + +jest.genMockFromModule("events") +jest.mock("events") + +const loggerMock = { + info: jest.fn().mockReturnValue(console.log), + warn: jest.fn().mockReturnValue(console.log), + error: jest.fn().mockReturnValue(console.log), +} + +const moduleDeps = { + logger: loggerMock, +} + +describe("LocalEventBusService", () => { + let eventBus + + describe("emit", () => { + describe("Successfully emits events", () => { + beforeEach(() => { + jest.clearAllMocks() + + eventBus = new LocalEventBusService(moduleDeps as any) + }) + + it("should emit an event", async () => { + eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + + await eventBus.emit("eventName", { hi: "1234" }) + + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1) + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("eventName", { + hi: "1234", + }) + }) + + it("should emit multiple events", async () => { + eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + + await eventBus.emit([ + { eventName: "event-1", data: { hi: "1234" } }, + { eventName: "event-2", data: { hi: "5678" } }, + ]) + + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2) + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", { + hi: "1234", + }) + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", { + hi: "5678", + }) + }) + + it("should group an event if data consists of eventGroupId", async () => { + const groupEventFn = jest.spyOn(eventBus, "groupEvent") + + eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + + await eventBus.emit("test-event", { + test: "1234", + eventGroupId: "test", + }) + + expect(eventBus.eventEmitter_.emit).not.toHaveBeenCalled() + expect(groupEventFn).toHaveBeenCalledTimes(1) + expect(groupEventFn).toHaveBeenCalledWith("test", "test-event", { + test: "1234", + }) + + jest.clearAllMocks() + + eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + eventBus.emit("test-event", { test: "1234", eventGroupId: "test" }) + eventBus.emit("test-event", { test: "test-1" }) + + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1) + expect(groupEventFn).toHaveBeenCalledTimes(1) + + expect(eventBus.groupedEventsMap_.get("test")).toEqual([ + expect.objectContaining({ eventName: "test-event" }), + expect.objectContaining({ eventName: "test-event" }), + ]) + + await eventBus.emit("test-event", { + test: "1234", + eventGroupId: "test-2", + }) + + expect(eventBus.groupedEventsMap_.get("test-2")).toEqual([ + expect.objectContaining({ eventName: "test-event" }), + ]) + }) + + it("should release events when requested with eventGroupId", async () => { + eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + + await eventBus.emit([ + { + eventName: "event-1", + data: { test: "1", eventGroupId: "group-1" }, + }, + { + eventName: "event-2", + data: { test: "2", eventGroupId: "group-1" }, + }, + { + eventName: "event-1", + data: { test: "1", eventGroupId: "group-2" }, + }, + { + eventName: "event-2", + data: { test: "2", eventGroupId: "group-2" }, + }, + { eventName: "event-1", data: { test: "1" } }, + ]) + + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1) + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", { + test: "1", + }) + + expect(eventBus.groupedEventsMap_.get("group-1")).toHaveLength(2) + expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(2) + + jest.clearAllMocks() + eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + eventBus.releaseGroupedEvents("group-1") + + expect(eventBus.groupedEventsMap_.get("group-1")).not.toBeDefined() + expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(2) + + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2) + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", { + test: "1", + }) + expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", { + test: "2", + }) + }) + + it("should clear events from grouped events when requested with eventGroupId", async () => { + eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data) + + await eventBus.emit([ + { + eventName: "event-1", + data: { test: "1", eventGroupId: "group-1" }, + }, + { + eventName: "event-1", + data: { test: "1", eventGroupId: "group-2" }, + }, + ]) + + expect(eventBus.groupedEventsMap_.get("group-1")).toHaveLength(1) + expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(1) + + eventBus.clearGroupedEvents("group-1") + + expect(eventBus.groupedEventsMap_.get("group-1")).not.toBeDefined() + expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(1) + + eventBus.clearGroupedEvents("group-2") + + expect(eventBus.groupedEventsMap_.get("group-2")).not.toBeDefined() + }) + }) + }) +}) diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index 9a5e99afdbb25..ba11b5535102b 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -14,6 +14,8 @@ type InjectedDependencies = { logger: Logger } +type StagingQueueType = Map + const eventEmitter = new EventEmitter() eventEmitter.setMaxListeners(Infinity) @@ -21,6 +23,7 @@ eventEmitter.setMaxListeners(Infinity) export default class LocalEventBusService extends AbstractEventBusModuleService { protected readonly logger_?: Logger protected readonly eventEmitter_: EventEmitter + protected groupedEventsMap_: StagingQueueType constructor({ logger }: MedusaContainer & InjectedDependencies) { // @ts-ignore @@ -29,6 +32,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService this.logger_ = logger this.eventEmitter_ = eventEmitter + this.groupedEventsMap_ = new Map() } async emit( @@ -70,10 +74,58 @@ export default class LocalEventBusService extends AbstractEventBusModuleService } const data = (event as EmitData).data ?? (event as Message).body - this.eventEmitter_.emit(event.eventName, data) + + await this.groupOrEmitEvent(event.eventName, data) + } + } + + // If the data of the event consists of a eventGroupId, we don't emit the event, instead + // we add them to a queue grouped by the eventGroupId and release them when + // explicitly requested. + // This is useful in the event of a distributed transaction where you'd want to emit + // events only once the transaction ends. + private async groupOrEmitEvent( + eventName: string, + data: unknown & { eventGroupId?: string } + ) { + const { eventGroupId, ...eventData } = data + + if (eventGroupId) { + await this.groupEvent(eventGroupId, eventName, eventData) + } else { + this.eventEmitter_.emit(eventName, data) } } + // Groups an event to a queue to be emitted upon explicit release + private async groupEvent( + eventGroupId: string, + eventName: string, + data: unknown + ) { + const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] + + groupedEvents.push({ eventName, data }) + + this.groupedEventsMap_.set(eventGroupId, groupedEvents) + } + + async releaseGroupedEvents(eventGroupId: string) { + const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || [] + + for (const event of groupedEvents) { + const { eventName, data } = event + + this.eventEmitter_.emit(eventName, data) + } + + this.clearGroupedEvents(eventGroupId) + } + + async clearGroupedEvents(eventGroupId: string) { + this.groupedEventsMap_.delete(eventGroupId) + } + subscribe(event: string | symbol, subscriber: Subscriber): this { const randId = ulid() this.storeSubscribers({ event, subscriberId: randId, subscriber }) diff --git a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts index a7b92093ba8c9..6d76d628e1393 100644 --- a/packages/modules/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/modules/event-bus-redis/src/services/event-bus-redis.ts @@ -140,6 +140,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService await this.queue_.addBulk(events) } + // TODO: Implement redis based staging + release + async releaseGroupedEvents(eventGroupId: string) {} + async clearGroupedEvents(eventGroupId: string) {} + /** * Handles incoming jobs. * @param job The job object