diff --git a/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs index 5c6f617f..90231273 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs @@ -62,7 +62,13 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken if (this.Task.Definition.Foreach == null) { var context = await this.Task.CorrelateAsync(cancellationToken).ConfigureAwait(false); - await this.SetResultAsync(context, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + var events = this.Task.Definition.Listen.Read switch + { + EventReadMode.Data or EventReadMode.Raw => context.Events.Select(e => e.Value.Data), + EventReadMode.Envelope => context.Events.Select(e => e.Value.Data), + _ => throw new NotSupportedException($"The specified event read mode '{this.Task.Definition.Listen.Read}' is not supported") + }; + await this.SetResultAsync(events, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } else { @@ -107,7 +113,12 @@ protected virtual async Task OnStreamingEventAsync(IStreamedCloudEvent e) ] }; var arguments = this.GetExpressionEvaluationArguments(); - var eventData = e.Event as object; + var eventData = this.Task.Definition.Listen.Read switch + { + EventReadMode.Data or EventReadMode.Raw => e.Event.Data, + EventReadMode.Envelope => e.Event, + _ => throw new NotSupportedException($"The specified event read mode '{this.Task.Definition.Listen.Read}' is not supported") + }; if (this.Task.Definition.Foreach.Output?.As is string fromExpression) eventData = await this.Task.Workflow.Expressions.EvaluateAsync(fromExpression, eventData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); else if (this.Task.Definition.Foreach.Output?.As != null) eventData = await this.Task.Workflow.Expressions.EvaluateAsync(this.Task.Definition.Foreach.Output.As!, eventData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); if (this.Task.Definition.Foreach.Export?.As is string toExpression)