From 59d838072881541b0b5065a48f2f1e31df476eca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Thu, 28 Mar 2024 00:34:57 -0400 Subject: [PATCH 1/6] Add Timer --- README.md | 1 + src/Timer/MyActivities.cs | 23 +++++++ src/Timer/MyWorkflow.workflow.cs | 30 +++++++++ src/Timer/Program.cs | 64 +++++++++++++++++++ src/Timer/README.md | 14 ++++ .../TemporalioSamples.ActivitySimple.csproj | 7 ++ 6 files changed, 139 insertions(+) create mode 100644 src/Timer/MyActivities.cs create mode 100644 src/Timer/MyWorkflow.workflow.cs create mode 100644 src/Timer/Program.cs create mode 100644 src/Timer/README.md create mode 100644 src/Timer/TemporalioSamples.ActivitySimple.csproj diff --git a/README.md b/README.md index 08b98c9..059e499 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Prerequisites: * [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [Saga](src/Saga) - Demonstrates how to implement a saga pattern. * [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future. +* [Timer](src/Timer) - Use a timer to implement a monthly subscription. * [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker. * [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code. * [WorkflowUpdate](src/WorkflowUpdate) - How to use the Workflow Update feature while blocking in update method for concurrent updates. diff --git a/src/Timer/MyActivities.cs b/src/Timer/MyActivities.cs new file mode 100644 index 0000000..c7ec236 --- /dev/null +++ b/src/Timer/MyActivities.cs @@ -0,0 +1,23 @@ +namespace TemporalioSamples.ActivitySimple; + +using Temporalio.Activities; + +public class MyActivities +{ + private readonly MyDatabaseClient dbClient = new(); + + // Activities can be static and/or sync + [Activity] + public static string DoStaticThing() => "some-static-value"; + + // Activities can be methods that can access state + [Activity] + public Task SelectFromDatabaseAsync(string table) => + dbClient.SelectValueAsync(table); + + public class MyDatabaseClient + { + public Task SelectValueAsync(string table) => + Task.FromResult($"some-db-value from table {table}"); + } +} \ No newline at end of file diff --git a/src/Timer/MyWorkflow.workflow.cs b/src/Timer/MyWorkflow.workflow.cs new file mode 100644 index 0000000..a61c593 --- /dev/null +++ b/src/Timer/MyWorkflow.workflow.cs @@ -0,0 +1,30 @@ +namespace TemporalioSamples.ActivitySimple; + +using Microsoft.Extensions.Logging; +using Temporalio.Workflows; + +[Workflow] +public class MyWorkflow +{ + [WorkflowRun] + public async Task RunAsync() + { + // Run an async instance method activity. + var result1 = await Workflow.ExecuteActivityAsync( + (MyActivities act) => act.SelectFromDatabaseAsync("some-db-table"), + new() + { + StartToCloseTimeout = TimeSpan.FromMinutes(5), + }); + Workflow.Logger.LogInformation("Activity instance method result: {Result}", result1); + + // Run a sync static method activity. + var result2 = await Workflow.ExecuteActivityAsync( + () => MyActivities.DoStaticThing(), + new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) }); + Workflow.Logger.LogInformation("Activity static method result: {Result}", result2); + + // We'll go ahead and return this result + return result2; + } +} \ No newline at end of file diff --git a/src/Timer/Program.cs b/src/Timer/Program.cs new file mode 100644 index 0000000..061456e --- /dev/null +++ b/src/Timer/Program.cs @@ -0,0 +1,64 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.ActivitySimple; + +// Create a client to localhost on default namespace +var client = await TemporalClient.ConnectAsync(new("localhost:7233") +{ + LoggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)), +}); + +async Task RunWorkerAsync() +{ + // Cancellation token cancelled on ctrl+c + using var tokenSource = new CancellationTokenSource(); + Console.CancelKeyPress += (_, eventArgs) => + { + tokenSource.Cancel(); + eventArgs.Cancel = true; + }; + + // Create an activity instance with some state + var activities = new MyActivities(); + + // Run worker until cancelled + Console.WriteLine("Running worker"); + using var worker = new TemporalWorker( + client, + new TemporalWorkerOptions(taskQueue: "activity-simple-sample"). + AddActivity(activities.SelectFromDatabaseAsync). + AddActivity(MyActivities.DoStaticThing). + AddWorkflow()); + try + { + await worker.ExecuteAsync(tokenSource.Token); + } + catch (OperationCanceledException) + { + Console.WriteLine("Worker cancelled"); + } +} + +async Task ExecuteWorkflowAsync() +{ + Console.WriteLine("Executing workflow"); + await client.ExecuteWorkflowAsync( + (MyWorkflow wf) => wf.RunAsync(), + new(id: "activity-simple-workflow-id", taskQueue: "activity-simple-sample")); +} + +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(); + break; + case "workflow": + await ExecuteWorkflowAsync(); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument"); +} \ No newline at end of file diff --git a/src/Timer/README.md b/src/Timer/README.md new file mode 100644 index 0000000..cc9d330 --- /dev/null +++ b/src/Timer/README.md @@ -0,0 +1,14 @@ +# Timer + +Use a timer (`Workflow.DelayAsync`) to implement a monthly subscription. + +To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory +in a separate terminal to start the worker: + + dotnet run worker + +Then in another terminal, run the workflow from this directory: + + dotnet run workflow + +The worker terminal will show logs from the workflow running. diff --git a/src/Timer/TemporalioSamples.ActivitySimple.csproj b/src/Timer/TemporalioSamples.ActivitySimple.csproj new file mode 100644 index 0000000..e3b6154 --- /dev/null +++ b/src/Timer/TemporalioSamples.ActivitySimple.csproj @@ -0,0 +1,7 @@ + + + + Exe + + + \ No newline at end of file From 7a08784d620cf4018ade51ba006511b5c39b395b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Thu, 28 Mar 2024 01:58:52 -0400 Subject: [PATCH 2/6] working --- src/Timer/MyActivities.cs | 18 ++---------------- src/Timer/MyWorkflow.workflow.cs | 28 ++++++++++------------------ src/Timer/Program.cs | 11 +++++------ 3 files changed, 17 insertions(+), 40 deletions(-) diff --git a/src/Timer/MyActivities.cs b/src/Timer/MyActivities.cs index c7ec236..9b56571 100644 --- a/src/Timer/MyActivities.cs +++ b/src/Timer/MyActivities.cs @@ -1,23 +1,9 @@ -namespace TemporalioSamples.ActivitySimple; +namespace TemporalioSamples.Timer; using Temporalio.Activities; public class MyActivities { - private readonly MyDatabaseClient dbClient = new(); - - // Activities can be static and/or sync - [Activity] - public static string DoStaticThing() => "some-static-value"; - - // Activities can be methods that can access state [Activity] - public Task SelectFromDatabaseAsync(string table) => - dbClient.SelectValueAsync(table); - - public class MyDatabaseClient - { - public Task SelectValueAsync(string table) => - Task.FromResult($"some-db-value from table {table}"); - } + public static string Charge(string userId) => "charge successful"; } \ No newline at end of file diff --git a/src/Timer/MyWorkflow.workflow.cs b/src/Timer/MyWorkflow.workflow.cs index a61c593..f3b69fb 100644 --- a/src/Timer/MyWorkflow.workflow.cs +++ b/src/Timer/MyWorkflow.workflow.cs @@ -1,4 +1,4 @@ -namespace TemporalioSamples.ActivitySimple; +namespace TemporalioSamples.Timer; using Microsoft.Extensions.Logging; using Temporalio.Workflows; @@ -7,24 +7,16 @@ namespace TemporalioSamples.ActivitySimple; public class MyWorkflow { [WorkflowRun] - public async Task RunAsync() + public async Task RunAsync(string userId) { - // Run an async instance method activity. - var result1 = await Workflow.ExecuteActivityAsync( - (MyActivities act) => act.SelectFromDatabaseAsync("some-db-table"), - new() - { - StartToCloseTimeout = TimeSpan.FromMinutes(5), - }); - Workflow.Logger.LogInformation("Activity instance method result: {Result}", result1); + while (true) + { + await Workflow.DelayAsync(TimeSpan.FromDays(30)); - // Run a sync static method activity. - var result2 = await Workflow.ExecuteActivityAsync( - () => MyActivities.DoStaticThing(), - new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) }); - Workflow.Logger.LogInformation("Activity static method result: {Result}", result2); - - // We'll go ahead and return this result - return result2; + var result = await Workflow.ExecuteActivityAsync( + () => MyActivities.Charge(userId), + new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) }); + Workflow.Logger.LogInformation("Activity result: {Result}", result); + } } } \ No newline at end of file diff --git a/src/Timer/Program.cs b/src/Timer/Program.cs index 061456e..5212804 100644 --- a/src/Timer/Program.cs +++ b/src/Timer/Program.cs @@ -1,7 +1,7 @@ using Microsoft.Extensions.Logging; using Temporalio.Client; using Temporalio.Worker; -using TemporalioSamples.ActivitySimple; +using TemporalioSamples.Timer; // Create a client to localhost on default namespace var client = await TemporalClient.ConnectAsync(new("localhost:7233") @@ -29,9 +29,8 @@ async Task RunWorkerAsync() Console.WriteLine("Running worker"); using var worker = new TemporalWorker( client, - new TemporalWorkerOptions(taskQueue: "activity-simple-sample"). - AddActivity(activities.SelectFromDatabaseAsync). - AddActivity(MyActivities.DoStaticThing). + new TemporalWorkerOptions(taskQueue: "timer-sample"). + AddActivity(MyActivities.Charge). AddWorkflow()); try { @@ -47,8 +46,8 @@ async Task ExecuteWorkflowAsync() { Console.WriteLine("Executing workflow"); await client.ExecuteWorkflowAsync( - (MyWorkflow wf) => wf.RunAsync(), - new(id: "activity-simple-workflow-id", taskQueue: "activity-simple-sample")); + (MyWorkflow wf) => wf.RunAsync("user-id-123"), + new(id: "timer-workflow-id", taskQueue: "timer-sample")); } switch (args.ElementAtOrDefault(0)) From d96d94d733ebb2fff398cd720417778f8fb0680a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Thu, 28 Mar 2024 02:03:17 -0400 Subject: [PATCH 3/6] wording --- src/Timer/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Timer/README.md b/src/Timer/README.md index cc9d330..afd51a8 100644 --- a/src/Timer/README.md +++ b/src/Timer/README.md @@ -11,4 +11,4 @@ Then in another terminal, run the workflow from this directory: dotnet run workflow -The worker terminal will show logs from the workflow running. +The worker terminal will show logs from running the workflow. From 33c379ee0114a12eb208ed8027413b52a56f57e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Thu, 28 Mar 2024 02:09:30 -0400 Subject: [PATCH 4/6] update workflow name --- src/Timer/Program.cs | 4 ++-- .../{MyWorkflow.workflow.cs => Subscription.workflow.cs} | 2 +- ...s.ActivitySimple.csproj => TemporalioSamples.Timer.csproj} | 0 3 files changed, 3 insertions(+), 3 deletions(-) rename src/Timer/{MyWorkflow.workflow.cs => Subscription.workflow.cs} (95%) rename src/Timer/{TemporalioSamples.ActivitySimple.csproj => TemporalioSamples.Timer.csproj} (100%) diff --git a/src/Timer/Program.cs b/src/Timer/Program.cs index 5212804..231efdd 100644 --- a/src/Timer/Program.cs +++ b/src/Timer/Program.cs @@ -31,7 +31,7 @@ async Task RunWorkerAsync() client, new TemporalWorkerOptions(taskQueue: "timer-sample"). AddActivity(MyActivities.Charge). - AddWorkflow()); + AddWorkflow()); try { await worker.ExecuteAsync(tokenSource.Token); @@ -46,7 +46,7 @@ async Task ExecuteWorkflowAsync() { Console.WriteLine("Executing workflow"); await client.ExecuteWorkflowAsync( - (MyWorkflow wf) => wf.RunAsync("user-id-123"), + (Subscription wf) => wf.RunAsync("user-id-123"), new(id: "timer-workflow-id", taskQueue: "timer-sample")); } diff --git a/src/Timer/MyWorkflow.workflow.cs b/src/Timer/Subscription.workflow.cs similarity index 95% rename from src/Timer/MyWorkflow.workflow.cs rename to src/Timer/Subscription.workflow.cs index f3b69fb..2f621ba 100644 --- a/src/Timer/MyWorkflow.workflow.cs +++ b/src/Timer/Subscription.workflow.cs @@ -4,7 +4,7 @@ namespace TemporalioSamples.Timer; using Temporalio.Workflows; [Workflow] -public class MyWorkflow +public class Subscription { [WorkflowRun] public async Task RunAsync(string userId) diff --git a/src/Timer/TemporalioSamples.ActivitySimple.csproj b/src/Timer/TemporalioSamples.Timer.csproj similarity index 100% rename from src/Timer/TemporalioSamples.ActivitySimple.csproj rename to src/Timer/TemporalioSamples.Timer.csproj From eb623e34608766bd95949583db9e848585a60a6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Thu, 28 Mar 2024 19:16:14 -0400 Subject: [PATCH 5/6] handle cancellation --- README.md | 2 +- src/Timer/README.md | 2 +- src/Timer/Subscription.workflow.cs | 22 ++++++++++++++++------ 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 059e499..2949982 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Prerequisites: * [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [Saga](src/Saga) - Demonstrates how to implement a saga pattern. * [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future. -* [Timer](src/Timer) - Use a timer to implement a monthly subscription. +* [Timer](src/Timer) - Use a timer to implement a monthly subscription; handle workflow cancellation. * [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker. * [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code. * [WorkflowUpdate](src/WorkflowUpdate) - How to use the Workflow Update feature while blocking in update method for concurrent updates. diff --git a/src/Timer/README.md b/src/Timer/README.md index afd51a8..d2cb03f 100644 --- a/src/Timer/README.md +++ b/src/Timer/README.md @@ -1,6 +1,6 @@ # Timer -Use a timer (`Workflow.DelayAsync`) to implement a monthly subscription. +Use a timer (`Workflow.DelayAsync`) to implement a monthly subscription. Also, handle workflow cancellation. To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory in a separate terminal to start the worker: diff --git a/src/Timer/Subscription.workflow.cs b/src/Timer/Subscription.workflow.cs index 2f621ba..9d6bf24 100644 --- a/src/Timer/Subscription.workflow.cs +++ b/src/Timer/Subscription.workflow.cs @@ -9,14 +9,24 @@ public class Subscription [WorkflowRun] public async Task RunAsync(string userId) { - while (true) + try { - await Workflow.DelayAsync(TimeSpan.FromDays(30)); + while (true) + { + await Workflow.DelayAsync(TimeSpan.FromDays(30)); - var result = await Workflow.ExecuteActivityAsync( - () => MyActivities.Charge(userId), - new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) }); - Workflow.Logger.LogInformation("Activity result: {Result}", result); + var result = await Workflow.ExecuteActivityAsync( + () => MyActivities.Charge(userId), + new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) }); + Workflow.Logger.LogInformation("Activity result: {Result}", result); + } + } + catch (OperationCanceledException) + { + Workflow.Logger.LogInformation("Workflow cancelled, cleaning up..."); + // Handle any cleanup here + // Re-throw to close the workflow as Cancelled. Otherwise, it will be closed as Completed. + throw; } } } \ No newline at end of file From 711d3509870cbfc13d85fd91e0775ccde2b00b0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=E2=98=BA=EF=B8=8F?= <251288+lorensr@users.noreply.github.com> Date: Fri, 29 Mar 2024 23:23:41 -0400 Subject: [PATCH 6/6] Update src/Timer/Subscription.workflow.cs Co-authored-by: Chad Retz --- src/Timer/Subscription.workflow.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Timer/Subscription.workflow.cs b/src/Timer/Subscription.workflow.cs index 9d6bf24..2d8bc55 100644 --- a/src/Timer/Subscription.workflow.cs +++ b/src/Timer/Subscription.workflow.cs @@ -21,7 +21,7 @@ public async Task RunAsync(string userId) Workflow.Logger.LogInformation("Activity result: {Result}", result); } } - catch (OperationCanceledException) + catch (Exception e) when (TemporalException.IsCanceledException(e)) { Workflow.Logger.LogInformation("Workflow cancelled, cleaning up..."); // Handle any cleanup here