Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SignalsQueries sample #58

Merged
merged 9 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Prerequisites:
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [Saga](src/Saga) - Demonstrates how to implement a saga pattern.
* [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future.
* [SignalsQueries](src/SignalsQueries) - A loyalty program using Signals and Queries.
* [Timer](src/Timer) - Use a timer to implement a monthly subscription; handle workflow cancellation.
* [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker.
* [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code.
Expand Down
47 changes: 47 additions & 0 deletions src/SignalsQueries/LoyaltyProgram.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace TemporalioSamples.SignalsQueries;

using Microsoft.Extensions.Logging;
using Temporalio.Workflows;

public record Purchase(string Id, int TotalCents);

[Workflow]
public class LoyaltyProgram
{
private readonly Queue<Purchase> toProcess = new();

[WorkflowQuery]
public int Points { get; private set; }

[WorkflowRun]
public async Task RunAsync(string userId)
{
while (true)
{
// Wait for purchase
await Workflow.WaitConditionAsync(() => toProcess.Count > 0);

// Process
var purchase = toProcess.Dequeue();
Points += purchase.TotalCents;
Workflow.Logger.LogInformation("Added {TotalCents} points, total: {Points}", purchase.TotalCents, Points);
if (Points >= 10_000)
{
await Workflow.ExecuteActivityAsync(
() => MyActivities.SendCoupon(userId),
new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
Points -= 10_000;
Workflow.Logger.LogInformation("Remaining points: {Points}", Points);
}
}
}

[WorkflowSignal]
public async Task NotifyPurchaseAsync(Purchase purchase)
{
if (!toProcess.Contains(purchase))
{
toProcess.Enqueue(purchase);
}
}
}
13 changes: 13 additions & 0 deletions src/SignalsQueries/MyActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace TemporalioSamples.SignalsQueries;

using Microsoft.Extensions.Logging;
using Temporalio.Activities;

public static class MyActivities
{
[Activity]
public static void SendCoupon(string? userId)
{
ActivityExecutionContext.Current.Logger.LogInformation("Sending coupon to user {UserId}", userId);
}
}
80 changes: 80 additions & 0 deletions src/SignalsQueries/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Worker;
using TemporalioSamples.SignalsQueries;

// Create a client to localhost on default namespace
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
{
LoggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information)),
});

async Task RunWorkerAsync()
{
// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

// Run worker until cancelled
Console.WriteLine("Running worker");
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: "signals-queries-sample").
AddActivity(MyActivities.SendCoupon).
AddWorkflow<LoyaltyProgram>());
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}

async Task ExecuteWorkflowAsync()
{
// If the workflow is already running from a previous run, terminate it
try
{
await client.GetWorkflowHandle("signals-queries-workflow-id").TerminateAsync();
}
catch (Temporalio.Exceptions.RpcException ex) when (ex.Code == Temporalio.Exceptions.RpcException.StatusCode.NotFound)
{
// Ignore
}

Console.WriteLine("Executing workflow");
var handle = await client.StartWorkflowAsync(
(LoyaltyProgram wf) => wf.RunAsync("user-id-123"),
new(id: "signals-queries-workflow-id", taskQueue: "signals-queries-sample"));

Console.WriteLine("Signal: Purchase made for $80");
await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 8_000)));
Console.WriteLine("Signal: Purchase made for $40");
await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 4_000)));

// Wait for workflow to process the signals
await Task.Delay(1000);
var points = await handle.QueryAsync(wf => wf.Points);
Console.WriteLine("Remaining points: {0}", points);
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
await RunWorkerAsync();
break;
case "workflow":
await ExecuteWorkflowAsync();
break;
default:
throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument");
}
14 changes: 14 additions & 0 deletions src/SignalsQueries/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SignalsQueries

A loyalty program implemented with Signals and Queries.

To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory
in a separate terminal to start the worker:

dotnet run worker

Then in another terminal, run the workflow from this directory:

dotnet run workflow

The worker terminal will show logs from running the workflow.
7 changes: 7 additions & 0 deletions src/SignalsQueries/TemporalioSamples.SignalsQueries.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
</PropertyGroup>

</Project>
Loading