Skip to content

Commit

Permalink
chore: group & release events for local eventbus (medusajs#7649)
Browse files Browse the repository at this point in the history
* chore: stage & release events for local eventbus

* chore: address review

* chore: mock emitter correctly
  • Loading branch information
riqwan authored Jun 7, 2024
1 parent c9c2b6c commit fbb00f3
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 71 deletions.
10 changes: 10 additions & 0 deletions packages/core/utils/src/event-bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ export abstract class AbstractEventBusModuleService
abstract emit<T>(data: EventBusTypes.EmitData<T>[]): Promise<void>
abstract emit<T>(data: EventBusTypes.Message<T>[]): Promise<void>

/*
Grouped events are useful when you have distributed transactions
where you need to explicitly group, release and clear events upon
lifecycle events of a transaction.
*/
// Given a eventGroupId, all the grouped events will be released
abstract releaseGroupedEvents(eventGroupId: string): Promise<void>
// Given a eventGroupId, all the grouped events will be cleared
abstract clearGroupedEvents(eventGroupId: string): Promise<void>

protected storeSubscribers({
event,
subscriberId,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import LocalEventBusService from "../event-bus-local"

jest.genMockFromModule("events")
jest.mock("events")

const loggerMock = {
info: jest.fn().mockReturnValue(console.log),
warn: jest.fn().mockReturnValue(console.log),
error: jest.fn().mockReturnValue(console.log),
}

const moduleDeps = {
logger: loggerMock,
}

describe("LocalEventBusService", () => {
let eventBus

describe("emit", () => {
describe("Successfully emits events", () => {
beforeEach(() => {
jest.clearAllMocks()

eventBus = new LocalEventBusService(moduleDeps as any)
})

it("should emit an event", async () => {
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)

await eventBus.emit("eventName", { hi: "1234" })

expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("eventName", {
hi: "1234",
})
})

it("should emit multiple events", async () => {
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)

await eventBus.emit([
{ eventName: "event-1", data: { hi: "1234" } },
{ eventName: "event-2", data: { hi: "5678" } },
])

expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", {
hi: "1234",
})
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", {
hi: "5678",
})
})

it("should group an event if data consists of eventGroupId", async () => {
const groupEventFn = jest.spyOn(eventBus, "groupEvent")

eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)

await eventBus.emit("test-event", {
test: "1234",
eventGroupId: "test",
})

expect(eventBus.eventEmitter_.emit).not.toHaveBeenCalled()
expect(groupEventFn).toHaveBeenCalledTimes(1)
expect(groupEventFn).toHaveBeenCalledWith("test", "test-event", {
test: "1234",
})

jest.clearAllMocks()

eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
eventBus.emit("test-event", { test: "1234", eventGroupId: "test" })
eventBus.emit("test-event", { test: "test-1" })

expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1)
expect(groupEventFn).toHaveBeenCalledTimes(1)

expect(eventBus.groupedEventsMap_.get("test")).toEqual([
expect.objectContaining({ eventName: "test-event" }),
expect.objectContaining({ eventName: "test-event" }),
])

await eventBus.emit("test-event", {
test: "1234",
eventGroupId: "test-2",
})

expect(eventBus.groupedEventsMap_.get("test-2")).toEqual([
expect.objectContaining({ eventName: "test-event" }),
])
})

it("should release events when requested with eventGroupId", async () => {
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)

await eventBus.emit([
{
eventName: "event-1",
data: { test: "1", eventGroupId: "group-1" },
},
{
eventName: "event-2",
data: { test: "2", eventGroupId: "group-1" },
},
{
eventName: "event-1",
data: { test: "1", eventGroupId: "group-2" },
},
{
eventName: "event-2",
data: { test: "2", eventGroupId: "group-2" },
},
{ eventName: "event-1", data: { test: "1" } },
])

expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", {
test: "1",
})

expect(eventBus.groupedEventsMap_.get("group-1")).toHaveLength(2)
expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(2)

jest.clearAllMocks()
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
eventBus.releaseGroupedEvents("group-1")

expect(eventBus.groupedEventsMap_.get("group-1")).not.toBeDefined()
expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(2)

expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", {
test: "1",
})
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", {
test: "2",
})
})

it("should clear events from grouped events when requested with eventGroupId", async () => {
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)

await eventBus.emit([
{
eventName: "event-1",
data: { test: "1", eventGroupId: "group-1" },
},
{
eventName: "event-1",
data: { test: "1", eventGroupId: "group-2" },
},
])

expect(eventBus.groupedEventsMap_.get("group-1")).toHaveLength(1)
expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(1)

eventBus.clearGroupedEvents("group-1")

expect(eventBus.groupedEventsMap_.get("group-1")).not.toBeDefined()
expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(1)

eventBus.clearGroupedEvents("group-2")

expect(eventBus.groupedEventsMap_.get("group-2")).not.toBeDefined()
})
})
})
})
54 changes: 53 additions & 1 deletion packages/modules/event-bus-local/src/services/event-bus-local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ type InjectedDependencies = {
logger: Logger
}

type StagingQueueType = Map<string, { eventName: string; data?: unknown }[]>

const eventEmitter = new EventEmitter()
eventEmitter.setMaxListeners(Infinity)

// eslint-disable-next-line max-len
export default class LocalEventBusService extends AbstractEventBusModuleService {
protected readonly logger_?: Logger
protected readonly eventEmitter_: EventEmitter
protected groupedEventsMap_: StagingQueueType

constructor({ logger }: MedusaContainer & InjectedDependencies) {
// @ts-ignore
Expand All @@ -29,6 +32,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService

this.logger_ = logger
this.eventEmitter_ = eventEmitter
this.groupedEventsMap_ = new Map()
}

async emit<T>(
Expand Down Expand Up @@ -70,10 +74,58 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
}

const data = (event as EmitData).data ?? (event as Message<T>).body
this.eventEmitter_.emit(event.eventName, data)

await this.groupOrEmitEvent(event.eventName, data)
}
}

// If the data of the event consists of a eventGroupId, we don't emit the event, instead
// we add them to a queue grouped by the eventGroupId and release them when
// explicitly requested.
// This is useful in the event of a distributed transaction where you'd want to emit
// events only once the transaction ends.
private async groupOrEmitEvent(
eventName: string,
data: unknown & { eventGroupId?: string }
) {
const { eventGroupId, ...eventData } = data

if (eventGroupId) {
await this.groupEvent(eventGroupId, eventName, eventData)
} else {
this.eventEmitter_.emit(eventName, data)
}
}

// Groups an event to a queue to be emitted upon explicit release
private async groupEvent(
eventGroupId: string,
eventName: string,
data: unknown
) {
const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []

groupedEvents.push({ eventName, data })

this.groupedEventsMap_.set(eventGroupId, groupedEvents)
}

async releaseGroupedEvents(eventGroupId: string) {
const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []

for (const event of groupedEvents) {
const { eventName, data } = event

this.eventEmitter_.emit(eventName, data)
}

this.clearGroupedEvents(eventGroupId)
}

async clearGroupedEvents(eventGroupId: string) {
this.groupedEventsMap_.delete(eventGroupId)
}

subscribe(event: string | symbol, subscriber: Subscriber): this {
const randId = ulid()
this.storeSubscribers({ event, subscriberId: randId, subscriber })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
await this.queue_.addBulk(events)
}

// TODO: Implement redis based staging + release
async releaseGroupedEvents(eventGroupId: string) {}
async clearGroupedEvents(eventGroupId: string) {}

/**
* Handles incoming jobs.
* @param job The job object
Expand Down

0 comments on commit fbb00f3

Please sign in to comment.