diff --git a/dsl-reference.md b/dsl-reference.md
index faaa776d..78e9f8c2 100644
--- a/dsl-reference.md
+++ b/dsl-reference.md
@@ -56,6 +56,10 @@
+ [HTTP Request](#http-request)
+ [URI Template](#uri-template)
+ [Process Result](#process-result)
+ + [AsyncAPI Server](#asyncapi-server)
+ + [AsyncAPI Message](#asyncapi-message)
+ + [AsyncAPI Subscription](#asyncapi-subscription)
+
## Abstract
@@ -300,15 +304,16 @@ The [AsyncAPI Call](#asyncapi-call) enables workflows to interact with external
###### Properties
-| Name | Type | Required | Description|
-|:--|:---:|:---:|:---|
-| document | [`externalResource`](#external-resource) | `yes` | The AsyncAPI document that defines the operation to call. |
-| operationRef | `string` | `yes` | A reference to the AsyncAPI operation to call. |
-| server | `string` | `no` | A reference to the server to call the specified AsyncAPI operation on.
If not set, default to the first server matching the operation's channel. |
-| message | `string` | `no` | The name of the message to use.
If not set, defaults to the first message defined by the operation. |
-| binding | `string` | `no` | The name of the binding to use.
If not set, defaults to the first binding defined by the operation |
-| payload | `any` | `no` | The operation's payload, as defined by the configured message |
-| authentication | `string`
[`authentication`](#authentication) | `no` | The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation. |
+| Name | Type | Required | Description |
+|:-------|:------:|:----------:|:--------------|
+| document | [`externalResource`](#external-resource) | `yes` | The AsyncAPI document that defines the [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) to call. |
+| channel | `string` | `yes` | The name of the channel on which to perform the operation. The operation to perform is defined by declaring either `message`, in which case the [channel](https://v2.asyncapi.com/docs/reference/specification/v2.6.0#channelItemObject)'s `publish` operation will be executed, or `subscription`, in which case the [channel](https://v2.asyncapi.com/docs/reference/specification/v2.6.0#channelItemObject)'s `subscribe` operation will be executed.
*Used only in case the referenced document uses AsyncAPI `v2.6.0`.* |
+| operation | `string` | `yes` | A reference to the AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) to call.
*Used only in case the referenced document uses AsyncAPI `v3.0.0`.* |
+| server | [`asyncApiServer`](#asyncapi-server) | `no` | An object used to configure to the [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) to call the specified AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) on.
If not set, default to the first [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) matching the operation's channel. |
+| protocol | `string` | `no` | The [protocol](https://www.asyncapi.com/docs/reference/specification/v3.0.0#definitionsProtocol) to use to select the target [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject).
Ignored if `server` has been set.
*Supported values are: `amqp`, `amqp1`, `anypointmq`, `googlepubsub`, `http`, `ibmmq`, `jms`, `kafka`, `mercure`, `mqtt`, `mqtt5`, `nats`, `pulsar`, `redis`, `sns`, `solace`, `sqs`, `stomp` and `ws`* |
+| message | [`asyncApiMessage`](#asyncapi-message) | `no` | An object used to configure the message to publish using the target operation.
*Required if `subscription` has not been set.* |
+| subscription | [`asyncApiSubscription`](#asyncapi-subscription) | `no` | An object used to configure the subscription to messages consumed using the target operation.
*Required if `message` has not been set.* |
+| authentication | `string`
[`authentication`](#authentication) | `no` | The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation. |
###### Examples
@@ -319,17 +324,35 @@ document:
name: asyncapi-example
version: '0.1.0'
do:
- - findPet:
+ - publishGreetings:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: greet
+ server:
+ name: greetingsServer
+ variables:
+ environment: dev
+ message:
+ payload:
+ greetings: Hello, World!
+ headers:
+ foo: bar
+ bar: baz
+ - subscribeToChatInbox:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
- operationRef: findPetsByStatus
- server: staging
- message: getPetByStatusQuery
- binding: http
- payload:
- petId: ${ .pet.id }
+ operation: chat-inbox
+ protocol: http
+ subscription:
+ filter: ${ . == $workflow.input.chat.roomId }
+ consume:
+ amount: 5
+ for:
+ seconds: 10
```
##### gRPC Call
@@ -1940,4 +1963,149 @@ do:
version: '0.1.0'
input: {}
return: none
+```
+
+### AsyncAPI Server
+
+Configures the target server of an AsyncAPI operation.
+
+#### Properties
+
+| Name | Type | Required | Description |
+|:-------|:------:|:----------:|:--------------|
+| name | `string` | `yes` | The name of the [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) to call the specified AsyncAPI operation on. |
+| variables | `object` | `no` | The target [server's variables](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverVariableObject), if any. |
+
+#### Examples
+
+```yaml
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: test
+ name: asyncapi-example
+ version: '0.1.0'
+do:
+ - publishGreetings:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: greet
+ server:
+ name: greetingsServer
+ variables:
+ environment: dev
+ message:
+ payload:
+ greetings: Hello, World!
+ headers:
+ foo: bar
+ bar: baz
+```
+
+### AsyncAPI Message
+
+Configures an AsyncAPI message to publish.
+
+#### Properties
+
+| Name | Type | Required | Description |
+|:-------|:------:|:----------:|:--------------|
+| payload | `object` | `no` | The message's payload, if any. |
+| headers | `object` | `no` | The message's headers, if any. |
+
+#### Examples
+
+```yaml
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: test
+ name: asyncapi-example
+ version: '0.1.0'
+do:
+ - publishGreetings:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: greet
+ protocol: http
+ message:
+ payload:
+ greetings: Hello, World!
+ headers:
+ foo: bar
+ bar: baz
+```
+
+### AsyncAPI Subscription
+
+Configures a subscription to an AsyncAPI operation.
+
+#### Properties
+
+| Name | Type | Required | Description |
+|:-------|:------:|:----------:|:--------------|
+| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed messages. |
+| consume | [`subscriptionLifetime`](#asyncapi-subscription-lifetime) | `yes` | An object used to configure the subscription's lifetime. |
+
+#### Examples
+
+```yaml
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: test
+ name: asyncapi-example
+ version: '0.1.0'
+do:
+ - subscribeToChatInboxForAmount:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: chat-inbox
+ protocol: http
+ subscription:
+ filter: ${ . == $workflow.input.chat.roomId }
+ consume:
+ amount: 5
+ for:
+ seconds: 10
+```
+
+### AsyncAPI Subscription Lifetime
+
+Configures the lifetime of an AsyncAPI subscription
+
+#### Properties
+
+| Name | Type | Required | Description |
+|:-------|:------:|:----------:|:--------------|
+| amount | `integer` | `no` | The amount of messages to consume.
*Required if `while` and `until` have not been set.* |
+| for | [`duration`](#duration) | `no` | The [`duration`](#duration) that defines for how long to consume messages. |
+| while | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to determine whether or not to keep consuming messages.
*Required if `amount` and `until` have not been set.* |
+| until | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to determine until when to consume messages.
*Required if `amount` and `while` have not been set.* |
+
+#### Examples
+
+```yaml
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: test
+ name: asyncapi-example
+ version: '0.1.0'
+do:
+ - subscribeToChatInboxUntil:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: chat-inbox
+ protocol: http
+ subscription:
+ filter: ${ . == $workflow.input.chat.roomId }
+ consume:
+ until: '${ ($context.messages | length) == 5 }'
+ for:
+ seconds: 10
```
\ No newline at end of file
diff --git a/examples/call-asyncapi.yaml b/examples/call-asyncapi-publish.yaml
similarity index 63%
rename from examples/call-asyncapi.yaml
rename to examples/call-asyncapi-publish.yaml
index d90b088d..93438242 100644
--- a/examples/call-asyncapi.yaml
+++ b/examples/call-asyncapi-publish.yaml
@@ -9,12 +9,12 @@ do:
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
- operationRef: findPetsByStatus
- server: staging
- message: getPetByStatusQuery
- binding: http
- payload:
- petId: ${ .pet.id }
+ operation: findPetsByStatus
+ server:
+ name: staging
+ message:
+ payload:
+ petId: ${ .pet.id }
authentication:
bearer:
token: ${ .token }
diff --git a/examples/call-asyncapi-subscribe-consume-amount.yaml b/examples/call-asyncapi-subscribe-consume-amount.yaml
new file mode 100644
index 00000000..29eb1cc3
--- /dev/null
+++ b/examples/call-asyncapi-subscribe-consume-amount.yaml
@@ -0,0 +1,17 @@
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: examples
+ name: bearer-auth
+ version: '0.1.0'
+do:
+ - getNotifications:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: getNotifications
+ protocol: ws
+ subscription:
+ filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
+ consume:
+ amount: 5
diff --git a/examples/call-asyncapi-subscribe-consume-until.yaml b/examples/call-asyncapi-subscribe-consume-until.yaml
new file mode 100644
index 00000000..bec5a4f0
--- /dev/null
+++ b/examples/call-asyncapi-subscribe-consume-until.yaml
@@ -0,0 +1,18 @@
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: examples
+ name: bearer-auth
+ version: '0.1.0'
+do:
+ - getNotifications:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ channel: /notifications
+ subscription:
+ filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
+ consume:
+ for:
+ minutes: 30
+ until: '${ ($context.consumedMessages | length) == 5 }'
diff --git a/examples/call-asyncapi-subscribe-consume-while.yaml b/examples/call-asyncapi-subscribe-consume-while.yaml
new file mode 100644
index 00000000..58c2e1ed
--- /dev/null
+++ b/examples/call-asyncapi-subscribe-consume-while.yaml
@@ -0,0 +1,16 @@
+document:
+ dsl: '1.0.0-alpha5'
+ namespace: examples
+ name: bearer-auth
+ version: '0.1.0'
+do:
+ - getNotifications:
+ call: asyncapi
+ with:
+ document:
+ endpoint: https://fake.com/docs/asyncapi.json
+ operation: getNotifications
+ subscription:
+ filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
+ consume:
+ while: '${ ($context.consumedMessages | length) < 5 }'
diff --git a/schema/workflow.yaml b/schema/workflow.yaml
index d35ab774..4c83a175 100644
--- a/schema/workflow.yaml
+++ b/schema/workflow.yaml
@@ -241,33 +241,42 @@ $defs:
properties:
document:
$ref: '#/$defs/externalResource'
- title: WithAsyncAPIDocument
+ title: AsyncAPIDocument
description: The document that defines the AsyncAPI operation to call.
- operationRef:
+ channel:
type: string
- title: WithAsyncAPIOperation
+ title: With
+ description: The name of the channel on which to perform the operation. Used only in case the referenced document uses AsyncAPI v2.6.0.
+ operation:
+ type: string
+ title: AsyncAPIOperation
description: A reference to the AsyncAPI operation to call.
server:
+ $ref: '#/$defs/asyncApiServer'
+ title: AsyncAPIServer
+ description: An object used to configure to the server to call the specified AsyncAPI operation on.
+ protocol:
type: string
- title: WithAsyncAPIServer
- description: A a reference to the server to call the specified AsyncAPI operation on. If not set, default to the first server matching the operation's channel.
+ title: AsyncApiProtocol
+ description: The protocol to use to select the target server.
+ enum: [ amqp, amqp1, anypointmq, googlepubsub, http, ibmmq, jms, kafka, mercure, mqtt, mqtt5, nats, pulsar, redis, sns, solace, sqs, stomp, ws ]
message:
- type: string
- title: WithAsyncAPIMessage
- description: The name of the message to use. If not set, defaults to the first message defined by the operation.
- binding:
- type: string
- title: WithAsyncAPIBinding
- description: The name of the binding to use. If not set, defaults to the first binding defined by the operation.
- payload:
- type: object
- title: WithAsyncAPIPayload
- description: The payload to call the AsyncAPI operation with, if any.
+ $ref: '#/$defs/asyncApiOutboundMessage'
+ title: AsyncApiMessage
+ description: An object used to configure the message to publish using the target operation.
+ subscription:
+ $ref: '#/$defs/asyncApiSubscription'
+ title: AsyncApiSubscription
+ description: An object used to configure the subscription to messages consumed using the target operation.
authentication:
$ref: '#/$defs/referenceableAuthenticationPolicy'
- title: WithAsyncAPIAuthentication
+ title: AsyncAPIAuthentication
description: The authentication policy, if any, to use when calling the AsyncAPI operation.
- required: [ document, operationRef ]
+ oneOf:
+ - required: [ document, operation, message ]
+ - required: [ document, operation, subscription ]
+ - required: [ document, channel, message ]
+ - required: [ document, channel, subscription ]
unevaluatedProperties: false
- title: CallGRPC
description: Defines the GRPC call to perform.
@@ -1528,13 +1537,13 @@ $defs:
catalog:
type: object
title: Catalog
- description: The definition of a resource catalog
+ description: The definition of a resource catalog.
unevaluatedProperties: false
properties:
endpoint:
$ref: '#/$defs/endpoint'
title: CatalogEndpoint
- description: The root URL where the catalog is hosted
+ description: The root URL where the catalog is hosted.
required: [ endpoint ]
runtimeExpression:
type: string
@@ -1544,7 +1553,8 @@ $defs:
processResult:
type: object
title: ProcessResult
- description: The object returned by a run task when its return type has been set 'all'
+ description: The object returned by a run task when its return type has been set 'all'.
+ unevaluatedProperties: false
properties:
code:
type: integer
@@ -1553,9 +1563,95 @@ $defs:
stdout:
type: string
title: ProcessStandardOutput
- description: The content of the process's STDOUT
+ description: The content of the process's STDOUT.
stderr:
type: string
title: ProcessStandardError
- description: The content of the process's STDERR
- required: [ code, stdout, stderr ]
\ No newline at end of file
+ description: The content of the process's STDERR.
+ required: [ code, stdout, stderr ]
+ asyncApiServer:
+ type: object
+ title: AsyncApiServer
+ description: Configures the target server of an AsyncAPI operation.
+ unevaluatedProperties: false
+ properties:
+ name:
+ type: string
+ title: AsyncApiServerName
+ description: The target server's name.
+ variables:
+ type: object
+ title: AsyncApiServerVariables
+ description: The target server's variables, if any.
+ required: [ name ]
+ asyncApiOutboundMessage:
+ type: object
+ title: AsyncApiOutboundMessage
+ description: An object used to configure the message to publish using the target operation.
+ unevaluatedProperties: false
+ properties:
+ payload:
+ type: object
+ title: AsyncApiMessagePayload
+ description: The message's payload, if any.
+ additionalProperties: true
+ headers:
+ type: object
+ title: AsyncApiMessageHeaders
+ description: The message's headers, if any.
+ additionalProperties: true
+ asyncApiInboundMessage:
+ type: object
+ title: AsyncApiInboundMessage
+ description: Represents a message counsumed by an AsyncAPI subscription.
+ allOf:
+ - $ref: '#/$defs/asyncApiOutboundMessage'
+ properties:
+ correlationId:
+ type: string
+ title: AsyncApiMessageCorrelationId
+ description: The message's correlation id, if any.
+ asyncApiSubscription:
+ type: object
+ title: AsyncApiSubscription
+ description: An object used to configure the subscription to messages consumed using the target operation.
+ unevaluatedProperties: false
+ properties:
+ filter:
+ $ref: '#/$defs/runtimeExpression'
+ title: AsyncApiSubscriptionCorrelation
+ description: A runtime expression, if any, used to filter consumed messages.
+ consume:
+ $ref: '#/$defs/asyncApiMessageConsumptionPolicy'
+ title: AsyncApiMessageConsumptionPolicy
+ description: An object used to configure the subscription's message consumption policy.
+ required: [ consume ]
+ asyncApiMessageConsumptionPolicy:
+ type: object
+ title: AsyncApiMessageConsumptionPolicy
+ description: An object used to configure a subscription's message consumption policy.
+ unevaluatedProperties: false
+ properties:
+ for:
+ $ref: '#/$defs/duration'
+ title: AsyncApiMessageConsumptionPolicyFor
+ description: Specifies the time period over which messages will be consumed.
+ oneOf:
+ - properties:
+ amount:
+ type: integer
+ title: AsyncApiMessageConsumptionPolicyAmount
+ description: The amount of (filtered) messages to consume before disposing of the subscription.
+ required: [ amount ]
+ - properties:
+ while:
+ $ref: '#/$defs/runtimeExpression'
+ title: AsyncApiMessageConsumptionPolicyWhile
+ description: A runtime expression evaluated after each consumed (filtered) message to decide if message consumption should continue.
+ required: [ while ]
+ - properties:
+ until:
+ $ref: '#/$defs/runtimeExpression'
+ title: AsyncApiMessageConsumptionPolicyUntil
+ description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue.
+ required: [ until ]
\ No newline at end of file