Skip to content

Commit

Permalink
fix(order): add processing payment child workflow for creating orders
Browse files Browse the repository at this point in the history
  • Loading branch information
jdnichollsc committed Nov 12, 2024
1 parent a487bc6 commit 2a79bb5
Show file tree
Hide file tree
Showing 26 changed files with 569 additions and 43 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ npx nx show projects
npx nx graph
```

View the Database diagram [here](./libs/backend/db/README.md).
> [!TIP]
> View the Database diagram [here](./libs/backend/db/README.md).
Do you want to change the path of a project to decide your own organization? No problem:
```sh
Expand Down
7 changes: 4 additions & 3 deletions apps/auth/src/app/user/user.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import {
ApiOperation,
ApiTags,
} from '@nestjs/swagger';
import { UserService } from './user.service';
import { AuthUser, JwtAuthGuard, User } from '@projectx/core';
import { AuthUser, JwtAuthGuard, AuthenticatedUser } from '@projectx/core';
import { UserDto, UserStatus } from '@projectx/models';

import { UserService } from './user.service';

@ApiBearerAuth()
@ApiTags('User')
@UseGuards(JwtAuthGuard)
Expand All @@ -38,7 +39,7 @@ export class UserController {
})
@Get()
@HttpCode(HttpStatus.OK)
async getProfile(@User() userDto: AuthUser) {
async getProfile(@AuthenticatedUser() userDto: AuthUser) {
const user = await this.userService.findOne(userDto);
if (!user) {
throw new NotFoundException('User not found');
Expand Down
27 changes: 19 additions & 8 deletions apps/auth/src/workflows/login.workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
isCancellation,
CancellationScope,
allHandlersFinished,
ApplicationFailure,
} from '@temporalio/workflow';

// eslint-disable-next-line @nx/enforce-module-boundaries
Expand All @@ -17,11 +18,11 @@ import {
LoginWorkflowState,
LoginWorkflowStatus,
verifyLoginCodeUpdate,
} from '../../../../libs/backend/core/src/lib/user/user.workflow';
} from '../../../../libs/backend/core/src/lib/user/workflow.utils';
import type { ActivitiesService } from '../main';

const { sendLoginEmail } = proxyActivities<ActivitiesService>({
startToCloseTimeout: '5m',
startToCloseTimeout: '5 seconds',
retry: {
initialInterval: '2s',
maximumInterval: '10s',
Expand All @@ -31,7 +32,7 @@ const { sendLoginEmail } = proxyActivities<ActivitiesService>({
});

const { verifyLoginCode } = proxyActivities<ActivitiesService>({
startToCloseTimeout: '5m',
startToCloseTimeout: '5 seconds',
retry: {
initialInterval: '2s',
maximumInterval: '10s',
Expand Down Expand Up @@ -69,14 +70,27 @@ export async function loginUserWorkflow(
state.codeStatus = LoginWorkflowCodeStatus.SENT;

// Wait for user to verify code (human interaction)
if (await condition(() => !!state.user, '10m')) {
await condition(() => !!state.user, '10m');
// Wait for all handlers to finish before checking the state
await condition(allHandlersFinished);
if (state.user) {
state.status = LoginWorkflowStatus.SUCCESS;
log.info(`User logged in, user: ${state.user}`);
} else {
state.status = LoginWorkflowStatus.FAILED;
log.error(`User login failed, email: ${data.email}`);
log.error(`User login code expired, email: ${data.email}`);
throw ApplicationFailure.nonRetryable(
'User login code expired',
LoginWorkflowNonRetryableErrors.LOGIN_CODE_EXPIRED,
);
}
return;
} catch (error) {
// If the error is an application failure, throw it
if (error instanceof ApplicationFailure) {
throw error;
}
// Otherwise, update the state and log the error
state.status = LoginWorkflowStatus.FAILED;
if (!isCancellation(error)) {
log.error(`Login workflow failed, email: ${data.email}, error: ${error}`);
Expand All @@ -86,8 +100,5 @@ export async function loginUserWorkflow(
// TODO: Handle workflow cancellation
});
}
} finally {
// Wait for all handlers to finish before completing the workflow
await condition(allHandlersFinished);
}
}
2 changes: 2 additions & 0 deletions apps/order/src/app/app.controller.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Controller, Get } from '@nestjs/common';
import { ApiTags } from '@nestjs/swagger';

import { AppService } from './app.service';

@ApiTags('Order')
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
Expand Down
12 changes: 12 additions & 0 deletions apps/order/src/workflows/long-running.workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { continueAsNew, workflowInfo } from "@temporalio/workflow";

const MAX_NUMBER_OF_EVENTS = 10000;

export async function longRunningWorkflow(n: number): Promise<void> {
// Long-duration workflow
while (workflowInfo().historyLength < MAX_NUMBER_OF_EVENTS) {
//...
}

await continueAsNew<typeof longRunningWorkflow>(n + 1);
}
65 changes: 55 additions & 10 deletions apps/order/src/workflows/order.workflow.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,58 @@
import { continueAsNew, sleep, workflowInfo } from "@temporalio/workflow";
/* eslint-disable @nx/enforce-module-boundaries */
import {
allHandlersFinished,
ChildWorkflowHandle,
condition,
setHandler,
startChild,
} from '@temporalio/workflow';

const MAX_NUMBER_OF_EVENTS = 10000;
import {
OrderProcessPaymentStatus,
OrderWorkflowData,
OrderWorkflowState,
OrderWorkflowStatus,
getOrderStateQuery,
getWorkflowIdByPaymentOrder,
} from '../../../../libs/backend/core/src/lib/order/workflow.utils';
import {
cancelWorkflowSignal,
} from '../../../../libs/backend/core/src/lib/workflows';
import { processPayment } from './process-payment.workflow';

export async function createOrder(email?: string): Promise<void> {

// Long-duration workflow
while (workflowInfo().historyLength < MAX_NUMBER_OF_EVENTS) {
await sleep(1000);
}
const initialState: OrderWorkflowState = {
status: OrderWorkflowStatus.PENDING,
orderId: 0,
};

export async function createOrder(
data: OrderWorkflowData,
state = initialState
): Promise<void> {
let processPaymentWorkflow: ChildWorkflowHandle<typeof processPayment>;
// Attach queries, signals and updates
setHandler(getOrderStateQuery, () => state);
setHandler(
cancelWorkflowSignal,
() => processPaymentWorkflow?.signal(cancelWorkflowSignal)
);
// TODO: Create the order in the database

await continueAsNew<typeof createOrder>(email);
}
state.status = OrderWorkflowStatus.PROCESSING_PAYMENT;
processPaymentWorkflow = await startChild(processPayment, {
args: [data],
workflowId: getWorkflowIdByPaymentOrder(state.orderId),
});
const processPaymentResult = await processPaymentWorkflow.result();
if (processPaymentResult.status === OrderProcessPaymentStatus.SUCCESS) {
state.status = OrderWorkflowStatus.PAYMENT_COMPLETED;
} else {
state.status = OrderWorkflowStatus.FAILED;
return;
}
processPaymentWorkflow = undefined;
//...
state.status = OrderWorkflowStatus.COMPLETED;
// Wait for all handlers to finish before workflow completion
await condition(allHandlersFinished);
}
82 changes: 82 additions & 0 deletions apps/order/src/workflows/process-payment.workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* eslint-disable @nx/enforce-module-boundaries */
import {
log,
condition,
setHandler,
allHandlersFinished,
} from '@temporalio/workflow';

import {
OrderWorkflowData,
PROCESS_PAYMENT_TIMEOUT,
OrderProcessPaymentState,
OrderProcessPaymentStatus,
paymentWebHookEventSignal,
} from '../../../../libs/backend/core/src/lib/order';
import {
cancelWorkflowSignal,
} from '../../../../libs/backend/core/src/lib/workflows';

export const finalPaymentStatuses = [
OrderProcessPaymentStatus.SUCCESS,
OrderProcessPaymentStatus.FAILURE,
OrderProcessPaymentStatus.DECLINED,
OrderProcessPaymentStatus.CANCELLED,
];

const initiatedWebhookEvents = [
// Stripe
'payment_intent.created',
'payment_intent.processing',
'payment_method.attached',
]
const confirmedWebhookEvents = [
// Stripe
'checkout.session.completed',
'checkout.session.async_payment_succeeded',
'payment_intent.succeeded',
];
const failedWebhookEvents = [
// Stripe
'payment_intent.payment_failed',
];

export async function processPayment(
data: OrderWorkflowData,
): Promise<OrderProcessPaymentState> {
const state: OrderProcessPaymentState = {
status: OrderProcessPaymentStatus.PENDING,
};
log.info('Processing payment', { data });

// Attach queries, signals and updates
setHandler(cancelWorkflowSignal, async () => {
if (finalPaymentStatuses.includes(state.status)) {
log.warn('Payment already completed, cannot cancel');
return;
}
log.warn('Cancelling payment');
state.status = OrderProcessPaymentStatus.CANCELLED;
});
setHandler(
paymentWebHookEventSignal,
async (webhookEvent) => {
if (initiatedWebhookEvents.includes(webhookEvent.type)) {
state.status = OrderProcessPaymentStatus.INITIATED;
} else if (confirmedWebhookEvents.includes(webhookEvent.type)) {
state.status = OrderProcessPaymentStatus.SUCCESS;
} else if (failedWebhookEvents.includes(webhookEvent.type)) {
state.status = OrderProcessPaymentStatus.FAILURE;
}
},
);

await condition(
() => finalPaymentStatuses.includes(state.status),
PROCESS_PAYMENT_TIMEOUT
);
// Wait for all handlers to finish before workflow completion
await condition(allHandlersFinished);

return state;
}
1 change: 1 addition & 0 deletions libs/backend/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './lib/configuration';
export * from './lib/nest';
export * from './lib/user';
export * from './lib/auth';
export * from './lib/workflows';
1 change: 1 addition & 0 deletions libs/backend/core/src/lib/order/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './workflow.utils';
12 changes: 12 additions & 0 deletions libs/backend/core/src/lib/order/providers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* Enum representing supported payment providers.
*
* @enum {string}
*/
export enum PaymentProvider {
Stripe = 'Stripe',
MercadoPago = 'MercadoPago',
PayU = 'PayU',
Wompi = 'Wompi',
// Add more providers as needed
}
67 changes: 67 additions & 0 deletions libs/backend/core/src/lib/order/workflow.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { CreateOrderDto } from '@projectx/models';
import { defineQuery, defineSignal } from '@temporalio/workflow';

export type OrderWorkflowData = {
email: string;
order: CreateOrderDto;
};

export const getWorkflowIdByPaymentOrder = (orderId: number) => {
return `payment-${orderId}`;
};

export enum OrderWorkflowStatus {
PENDING = 'Pending',
PROCESSING_PAYMENT = 'ProcessingPayment',
PAYMENT_COMPLETED = 'PaymentCompleted',
COMPLETED = 'Completed',
FAILED = 'Failed',
}
export type OrderWorkflowState = {
status: OrderWorkflowStatus;
orderId?: number;
};

export enum OrderProcessPaymentStatus {
PENDING = 'Pending',
INITIATED = 'Initiated',
SUCCESS = 'Success',
DECLINED = 'Declined',
CANCELLED = 'Cancelled',
FAILURE = 'Failure',
}
export type OrderProcessPaymentState = {
status: OrderProcessPaymentStatus;
};

export const PROCESS_PAYMENT_TIMEOUT = '20 minutes';

// DEFINE QUERIES
export const getOrderStateQuery =
defineQuery<OrderWorkflowState>('getOrderStateQuery');

/**
* Represents a payment webhook event received from third-party payment providers
* such as Stripe, MercadoPago, PayU, Wompi, etc.
*
* @property {string} id - Unique identifier for the webhook event.
* @property {string} type - Type of the event (e.g., 'payment_intent.succeeded').
* @property {string} provider - Payment provider sending the webhook (e.g., 'Stripe', 'PayU').
* @property {Object} data - Payload containing event-specific data.
*/
export type PaymentWebhookEvent = {
id?: string;
type: string;
provider: 'Stripe' | 'MercadoPago' | 'PayU' | 'Wompi';
data: Record<string, unknown>;
};

// DEFINE SIGNALS
/**
* Receive a payment webhook event, webhook events is particularly useful for listening to asynchronous events
* such as when a customer’s bank confirms a payment, a customer disputes a charge,
* a recurring payment succeeds, or when collecting subscription payments.
*/
export const paymentWebHookEventSignal = defineSignal<[PaymentWebhookEvent]>(
'paymentWebHookSignal'
);
2 changes: 1 addition & 1 deletion libs/backend/core/src/lib/user/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from './user.decorator';
export * from './user.interface';
export * from './user.workflow';
export * from './workflow.utils';
2 changes: 1 addition & 1 deletion libs/backend/core/src/lib/user/user.decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { createParamDecorator, ExecutionContext, UnauthorizedException } from '@

import { AuthUser } from './user.interface';

export const User = createParamDecorator<keyof AuthUser, ExecutionContext>(
export const AuthenticatedUser = createParamDecorator<keyof AuthUser, ExecutionContext>(
(data: keyof AuthUser, ctx: ExecutionContext) => {
const request = ctx.switchToHttp().getRequest();
const user = request.user as AuthUser;
Expand Down
Loading

0 comments on commit 2a79bb5

Please sign in to comment.