Skip to content

Commit

Permalink
feat: adds jobs queue (#8228)
Browse files Browse the repository at this point in the history
Adds a jobs queue to Payload.

- [x] Docs, w/ examples for Vercel Cron, additional services
- [x] Type the `job` using GeneratedTypes in `JobRunnerArgs`
(@AlessioGr)
- [x] Write the `runJobs` function 
- [x] Allow for some type of `payload.runTask` 
- [x] Open up a new bin script for running jobs
- [x] Determine strategy for runner endpoint to either await jobs
successfully or return early and stay open until job work completes
(serverless ramifications here)
- [x] Allow for job runner to accept how many jobs to run in one
invocation
- [x] Make a Payload local API method for creating a new job easily
(payload.createJob) or similar which is strongly typed (@AlessioGr)
- [x] Make `payload.runJobs` or similar  (@AlessioGr)
- [x] Write tests for retrying up to max retries for a given step
- [x] Write tests for dynamic import of a runner

The shape of the config should permit the definition of steps separate
from the job workflows themselves.

```js
const config = {
  // Not sure if we need this property anymore
  queues: {
  },
  // A job is an instance of a workflow, stored in DB
  // and triggered by something at some point
  jobs: {
    // Be able to override the jobs collection
    collectionOverrides: () => {},

    // Workflows are groups of tasks that handle
    // the flow from task to task.
    // When defined on the config, they are considered as predefined workflows
    // BUT - in the future, we'll allow for UI-based workflow definition as well.
    workflows: [
      {
        slug: 'job-name',
        // Temporary name for this
        // should be able to pass function 
        // or path to it for Node to dynamically import
        controlFlowInJS: '/my-runner.js',

        // Temporary name as well
        // should be able to eventually define workflows
        // in UI (meaning they need to be serialized in JSON)
        // Should not be able to define both control flows
        controlFlowInJSON: [
          {
            task: 'myTask',
            next: {
              // etc
            }
          }
        ],

        // Workflows take input
        // which are a group of fields
        input: [
          {
            name: 'post',
            type: 'relationship',
            relationTo: 'posts',
            maxDepth: 0,
            required: true,
          },
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
      },
    ],

    // Tasks are defined separately as isolated functions
    // that can be retried on fail
    tasks: [
      {
        slug: 'myTask',
        retries: 2,
        // Each task takes input
        // Used to auto-type the task func args
        input: [
          {
            name: 'post',
            type: 'relationship',
            relationTo: 'posts',
            maxDepth: 0,
            required: true,
          },
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
        // Each task takes output
        // Used to auto-type the function signature
        output: [
          {
            name: 'success',
            type: 'checkbox',
          }
        ],
        onSuccess: () => {},
        onFail: () => {},
        run: myRunner,
      },
    ]
  }
}
```

### `payload.createJob`

This function should allow for the creation of jobs based on either a
workflow (group of tasks) or an individual task.

To create a job using a workflow:

```js
const job = await payload.createJob({
  // Accept the `name` of a workflow so we can match to either a 
  // code-based workflow OR a workflow defined in the DB
  // Should auto-type the input
  workflowName: 'myWorkflow',
  input: {
    // typed to the args of the workflow by name
  }
})
```

To create a job using a task:

```js
const job = await payload.createJob({
  // Accept the `name` of a task
  task: 'myTask',
  input: {
    // typed to the args of the task by name
  }
})
```

---------

Co-authored-by: Alessio Gravili <[email protected]>
Co-authored-by: Dan Ribbens <[email protected]>
  • Loading branch information
3 people authored Oct 30, 2024
1 parent 0574155 commit 8970c6b
Show file tree
Hide file tree
Showing 49 changed files with 6,357 additions and 125 deletions.
2 changes: 1 addition & 1 deletion docs/database/transactions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ await payload.update({
where: {
slug: { equals: 'my-slug' }
},
req: { disableTransaction: true },
disableTransaction: true,
})
```
382 changes: 382 additions & 0 deletions docs/jobs-queue/overview.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,382 @@
---
title: Jobs Queue
label: Jobs Queue
order: 10
desc: Payload provides all you need to run job queues, which are helpful to offload long-running processes into separate workers.
keywords: jobs queue, application framework, typescript, node, react, nextjs
---

## Defining tasks

A task is a simple function that can be executed directly or within a workflow. The difference between tasks and functions is that tasks can be run in the background, and can be retried if they fail.

Tasks can either be defined within the `jobs.tasks` array in your payload config, or they can be run inline within a workflow.

### Defining tasks in the config

Simply add a task to the `jobs.tasks` array in your Payload config. A task consists of the following fields:

| Option | Description |
| --------------------------- | -------------------------------------------------------------------------------- |
| `slug` | Define a slug-based name for this job. This slug needs to be unique among both tasks and workflows.|
| `handler` | The function that should be responsible for running the job. You can either pass a string-based path to the job function file, or the job function itself. If you are using large dependencies within your job, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. |
| `inputSchema` | Define the input field schema - payload will generate a type for this schema. |
| `interfaceName` | You can use interfaceName to change the name of the interface that is generated for this task. By default, this is "Task" + the capitalized task slug. |
| `outputSchema` | Define the output field schema - payload will generate a type for this schema. |
| `label` | Define a human-friendly label for this task. |
| `onFail` | Function to be executed if the task fails. |
| `onSuccess` | Function to be executed if the task fails. |
| `retries` | Specify the number of times that this step should be retried if it fails. |

The handler is the function, or a path to the function, that will run once the job picks up this task. The handler function should return an object with an `output` key, which should contain the output of the task.

Example:

```ts
export default buildConfig({
// ...
jobs: {
tasks: [
{
retries: 2,
slug: 'createPost',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
outputSchema: [
{
name: 'postID',
type: 'text',
required: true,
},
],
handler: async ({ input, job, req }) => {
const newPost = await req.payload.create({
collection: 'post',
req,
data: {
title: input.title,
},
})
return {
output: {
postID: newPost.id,
},
}
},
} as TaskConfig<'createPost'>,
]
}
})
```

### Example: defining external tasks

payload.config.ts:

```ts
import { fileURLToPath } from 'node:url'
import path from 'path'

const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)

export default buildConfig({
// ...
jobs: {
tasks: [
{
retries: 2,
slug: 'createPost',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
outputSchema: [
{
name: 'postID',
type: 'text',
required: true,
},
],
handler: path.resolve(dirname, 'src/tasks/createPost.ts') + '#createPostHandler',
}
]
}
})
```

src/tasks/createPost.ts:

```ts
import type { TaskHandler } from 'payload'

export const createPostHandler: TaskHandler<'createPost'> = async ({ input, job, req }) => {
const newPost = await req.payload.create({
collection: 'post',
req,
data: {
title: input.title,
},
})
return {
output: {
postID: newPost.id,
},
}
}
```


## Defining workflows

There are two types of workflows - JS-based workflows and JSON-based workflows.

### Defining JS-based workflows

A JS-based function is a function in which you decide yourself when the tasks should run, by simply calling the `runTask` function. If the job, or any task within the job, fails, the entire function will re-run.

Tasks that have successfully been completed will simply re-return the cached output without running again, and failed tasks will be re-run.

Simply add a workflow to the `jobs.wokflows` array in your Payload config. A wokflow consists of the following fields:

| Option | Description |
| --------------------------- | -------------------------------------------------------------------------------- |
| `slug` | Define a slug-based name for this workflow. This slug needs to be unique among both tasks and workflows.|
| `handler` | The function that should be responsible for running the workflow. You can either pass a string-based path to the workflow function file, or workflow job function itself. If you are using large dependencies within your workflow, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. |
| `inputSchema` | Define the input field schema - payload will generate a type for this schema. |
| `interfaceName` | You can use interfaceName to change the name of the interface that is generated for this workflow. By default, this is "Workflow" + the capitalized workflow slug. |
| `label` | Define a human-friendly label for this workflow. |
| `queue` | Optionally, define the queue name that this workflow should be tied to. Defaults to "default". |

Example:

```ts
export default buildConfig({
// ...
jobs: {
tasks: [
// ...
]
workflows: [
{
slug: 'createPostAndUpdate',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
handler: async ({ job, runTask }) => {
const output = await runTask({
task: 'createPost',
id: '1',
input: {
title: job.input.title,
},
})

await runTask({
task: 'updatePost',
id: '2',
input: {
post: job.taskStatus.createPost['1'].output.postID, // or output.postID
title: job.input.title + '2',
},
})
},
} as WorkflowConfig<'updatePost'>
]
}
})
```

#### Running tasks inline

In order to run tasks inline without predefining them, you can use the `runTaskInline` function.

The drawbacks of this approach are that tasks cannot be re-used as easily, and the **task data stored in the job** will not be typed. In the following example, the inline task data will be stored on the job under `job.taskStatus.inline['2']` but completely untyped, as types for dynamic tasks like these cannot be generated beforehand.

Example:

```ts
export default buildConfig({
// ...
jobs: {
tasks: [
// ...
]
workflows: [
{
slug: 'createPostAndUpdate',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
handler: async ({ job, runTask }) => {
const output = await runTask({
task: 'createPost',
id: '1',
input: {
title: job.input.title,
},
})

const { newPost } = await runTaskInline({
task: async ({ req }) => {
const newPost = await req.payload.update({
collection: 'post',
id: output.postID,
req,
retries: 3,
data: {
title: 'updated!',
},
})
return {
output: {
newPost
},
}
},
id: '2',
})
},
} as WorkflowConfig<'updatePost'>
]
}
})
```

### Defining JSON-based workflows

JSON-based workflows are a way to define the tasks the workflow should run in an array. The relationships between the tasks, their run order and their conditions are defined in the JSON object, which allows payload to statically analyze the workflow and will generate more helpful graphs.

This functionality is not available yet, but it will be available in the future.

## Queueing workflows and tasks

In order to queue a workflow or a task (= create them and add them to the queue), you can use the `payload.jobs.queue` function.

Example: queueing workflows:

```ts
const createdJob = await payload.jobs.queue({
workflows: 'createPostAndUpdate',
input: {
title: 'my title',
},
})
```

Example: queueing tasks:

```ts
const createdJob = await payload.jobs.queue({
task: 'createPost',
input: {
title: 'my title',
},
})
```

## Running workflows and tasks

Workflows and tasks added to the queue will not run unless a worker picks it up and runs it. This can be done in two ways:

### Endpoint

Make a fetch request to the `api/payload-jobs/run` endpoint:

```ts
await fetch('/api/payload-jobs/run', {
method: 'GET',
headers: {
'Authorization': `JWT ${token}`,
},
});
```

### Local API

Run the payload.jobs.run function:

```ts
const results = await payload.jobs.run()

// You can customize the queue name by passing it as an argument
await payload.jobs.run({ queue: 'posts' })
```

### Script

You can run the jobs:run script from the command line:

```sh
npx payload jobs:run --queue default --limit 10
```

#### Triggering jobs as cronjob

You can pass the --cron flag to the jobs:run script to run the jobs in a cronjob:

```sh
npx payload jobs:run --cron "*/5 * * * *"
```

### Vercel Cron

Vercel Cron allows scheduled tasks to be executed automatically by triggering specific endpoints. Below is a step-by-step guide to configuring Vercel Cron for running queued jobs on apps hosted on Vercel:

1. Add Vercel Cron Configuration: Place a vercel.json file at the root of your project with the following content:

```json
{
"crons": [
{
"path": "/api/payload-jobs/run",
"schedule": "*/5 * * * *"
}
]
}
```

This configuration schedules the endpoint `/api/payload-jobs/run` to be triggered every 5 minutes. This endpoint is added automatically by payload and is responsible for running the queued jobs.

2. Environment Variable Setup: By default, the endpoint may require a JWT token for authorization. However, Vercel Cron jobs cannot pass JWT tokens. Instead, you can use an environment variable to secure the endpoint:

Add a new environment variable named `CRON_SECRET` to your Vercel project settings. This should be a random string, ideally 16 characters or longer.

3. Modify Authentication for Job Running: Adjust the job running authorization logic in your project to accept the `CRON_SECRET` as a valid token. Modify your `payload.config.ts` file as follows:

```ts
export default buildConfig({
// Other configurations...
jobs: {
access: {
run: ({ req }: { req: PayloadRequest }): boolean => {
const authHeader = req.headers.get('authorization');
return authHeader === `Bearer ${process.env.CRON_SECRET}`;
},
},
// Other job configurations...
}
})
```

This code snippet ensures that the jobs can only be triggered if the correct `CRON_SECRET` is provided in the authorization header.

Vercel will automatically make the `CRON_SECRET` environment variable available to the endpoint when triggered by the Vercel Cron, ensuring that the jobs can be run securely.

After the project is deployed to Vercel, the Vercel Cron job will automatically trigger the `/api/payload-jobs/run` endpoint in the specified schedule, running the queued jobs in the background.
1 change: 1 addition & 0 deletions packages/payload/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"bson-objectid": "2.0.4",
"ci-info": "^4.0.0",
"console-table-printer": "2.11.2",
"croner": "8.1.2",
"dataloader": "2.2.2",
"deepmerge": "4.3.1",
"file-type": "19.3.0",
Expand Down
Loading

0 comments on commit 8970c6b

Please sign in to comment.