Skip to content

Commit

Permalink
Merge pull request #1053 from neuroglia-io/feat-asyncapi-refactor
Browse files Browse the repository at this point in the history
Refactor the `AsyncAPI` call
  • Loading branch information
cdavernas authored Jan 8, 2025
2 parents 52cb747 + d1bd6ce commit dab7039
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 46 deletions.
200 changes: 184 additions & 16 deletions dsl-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
+ [HTTP Request](#http-request)
+ [URI Template](#uri-template)
+ [Process Result](#process-result)
+ [AsyncAPI Server](#asyncapi-server)
+ [AsyncAPI Message](#asyncapi-message)
+ [AsyncAPI Subscription](#asyncapi-subscription)


## Abstract

Expand Down Expand Up @@ -300,15 +304,16 @@ The [AsyncAPI Call](#asyncapi-call) enables workflows to interact with external

###### Properties

| Name | Type | Required | Description|
|:--|:---:|:---:|:---|
| document | [`externalResource`](#external-resource) | `yes` | The AsyncAPI document that defines the operation to call. |
| operationRef | `string` | `yes` | A reference to the AsyncAPI operation to call. |
| server | `string` | `no` | A reference to the server to call the specified AsyncAPI operation on.<br>If not set, default to the first server matching the operation's channel. |
| message | `string` | `no` | The name of the message to use. <br>If not set, defaults to the first message defined by the operation. |
| binding | `string` | `no` | The name of the binding to use. <br>If not set, defaults to the first binding defined by the operation |
| payload | `any` | `no` | The operation's payload, as defined by the configured message |
| authentication | `string`<br>[`authentication`](#authentication) | `no` | The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation. |
| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
| document | [`externalResource`](#external-resource) | `yes` | The AsyncAPI document that defines the [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) to call. |
| channel | `string` | `yes` | The name of the channel on which to perform the operation. The operation to perform is defined by declaring either `message`, in which case the [channel](https://v2.asyncapi.com/docs/reference/specification/v2.6.0#channelItemObject)'s `publish` operation will be executed, or `subscription`, in which case the [channel](https://v2.asyncapi.com/docs/reference/specification/v2.6.0#channelItemObject)'s `subscribe` operation will be executed.<br>*Used only in case the referenced document uses AsyncAPI `v2.6.0`.* |
| operation | `string` | `yes` | A reference to the AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) to call.<br>*Used only in case the referenced document uses AsyncAPI `v3.0.0`.* |
| server | [`asyncApiServer`](#asyncapi-server) | `no` | An object used to configure to the [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) to call the specified AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) on.<br>If not set, default to the first [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) matching the operation's channel. |
| protocol | `string` | `no` | The [protocol](https://www.asyncapi.com/docs/reference/specification/v3.0.0#definitionsProtocol) to use to select the target [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject). <br>Ignored if `server` has been set.<br>*Supported values are: `amqp`, `amqp1`, `anypointmq`, `googlepubsub`, `http`, `ibmmq`, `jms`, `kafka`, `mercure`, `mqtt`, `mqtt5`, `nats`, `pulsar`, `redis`, `sns`, `solace`, `sqs`, `stomp` and `ws`* |
| message | [`asyncApiMessage`](#asyncapi-message) | `no` | An object used to configure the message to publish using the target operation.<br>*Required if `subscription` has not been set.* |
| subscription | [`asyncApiSubscription`](#asyncapi-subscription) | `no` | An object used to configure the subscription to messages consumed using the target operation.<br>*Required if `message` has not been set.* |
| authentication | `string`<br>[`authentication`](#authentication) | `no` | The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation. |

###### Examples

Expand All @@ -319,17 +324,35 @@ document:
name: asyncapi-example
version: '0.1.0'
do:
- findPet:
- publishGreetings:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: greet
server:
name: greetingsServer
variables:
environment: dev
message:
payload:
greetings: Hello, World!
headers:
foo: bar
bar: baz
- subscribeToChatInbox:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operationRef: findPetsByStatus
server: staging
message: getPetByStatusQuery
binding: http
payload:
petId: ${ .pet.id }
operation: chat-inbox
protocol: http
subscription:
filter: ${ . == $workflow.input.chat.roomId }
consume:
amount: 5
for:
seconds: 10
```

##### gRPC Call
Expand Down Expand Up @@ -1940,4 +1963,149 @@ do:
version: '0.1.0'
input: {}
return: none
```

### AsyncAPI Server

Configures the target server of an AsyncAPI operation.

#### Properties

| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
| name | `string` | `yes` | The name of the [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) to call the specified AsyncAPI operation on. |
| variables | `object` | `no` | The target [server's variables](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverVariableObject), if any. |

#### Examples

```yaml
document:
dsl: '1.0.0-alpha5'
namespace: test
name: asyncapi-example
version: '0.1.0'
do:
- publishGreetings:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: greet
server:
name: greetingsServer
variables:
environment: dev
message:
payload:
greetings: Hello, World!
headers:
foo: bar
bar: baz
```

### AsyncAPI Message

Configures an AsyncAPI message to publish.

#### Properties

| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
| payload | `object` | `no` | The message's payload, if any. |
| headers | `object` | `no` | The message's headers, if any. |

#### Examples

```yaml
document:
dsl: '1.0.0-alpha5'
namespace: test
name: asyncapi-example
version: '0.1.0'
do:
- publishGreetings:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: greet
protocol: http
message:
payload:
greetings: Hello, World!
headers:
foo: bar
bar: baz
```

### AsyncAPI Subscription

Configures a subscription to an AsyncAPI operation.

#### Properties

| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed messages. |
| consume | [`subscriptionLifetime`](#asyncapi-subscription-lifetime) | `yes` | An object used to configure the subscription's lifetime. |

#### Examples

```yaml
document:
dsl: '1.0.0-alpha5'
namespace: test
name: asyncapi-example
version: '0.1.0'
do:
- subscribeToChatInboxForAmount:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: chat-inbox
protocol: http
subscription:
filter: ${ . == $workflow.input.chat.roomId }
consume:
amount: 5
for:
seconds: 10
```

### AsyncAPI Subscription Lifetime

Configures the lifetime of an AsyncAPI subscription

#### Properties

| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
| amount | `integer` | `no` | The amount of messages to consume.<br>*Required if `while` and `until` have not been set.* |
| for | [`duration`](#duration) | `no` | The [`duration`](#duration) that defines for how long to consume messages. |
| while | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to determine whether or not to keep consuming messages.<br>*Required if `amount` and `until` have not been set.* |
| until | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to determine until when to consume messages.<br>*Required if `amount` and `while` have not been set.* |

#### Examples

```yaml
document:
dsl: '1.0.0-alpha5'
namespace: test
name: asyncapi-example
version: '0.1.0'
do:
- subscribeToChatInboxUntil:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: chat-inbox
protocol: http
subscription:
filter: ${ . == $workflow.input.chat.roomId }
consume:
until: '${ ($context.messages | length) == 5 }'
for:
seconds: 10
```
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ do:
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operationRef: findPetsByStatus
server: staging
message: getPetByStatusQuery
binding: http
payload:
petId: ${ .pet.id }
operation: findPetsByStatus
server:
name: staging
message:
payload:
petId: ${ .pet.id }
authentication:
bearer:
token: ${ .token }
17 changes: 17 additions & 0 deletions examples/call-asyncapi-subscribe-consume-amount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
document:
dsl: '1.0.0-alpha5'
namespace: examples
name: bearer-auth
version: '0.1.0'
do:
- getNotifications:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: getNotifications
protocol: ws
subscription:
filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
consume:
amount: 5
18 changes: 18 additions & 0 deletions examples/call-asyncapi-subscribe-consume-until.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
document:
dsl: '1.0.0-alpha5'
namespace: examples
name: bearer-auth
version: '0.1.0'
do:
- getNotifications:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
channel: /notifications
subscription:
filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
consume:
for:
minutes: 30
until: '${ ($context.consumedMessages | length) == 5 }'
16 changes: 16 additions & 0 deletions examples/call-asyncapi-subscribe-consume-while.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
document:
dsl: '1.0.0-alpha5'
namespace: examples
name: bearer-auth
version: '0.1.0'
do:
- getNotifications:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: getNotifications
subscription:
filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
consume:
while: '${ ($context.consumedMessages | length) < 5 }'
Loading

0 comments on commit dab7039

Please sign in to comment.