Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenTelemetry Sample #78

Merged
merged 7 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="17.4.33" PrivateAssets="all" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
robcao marked this conversation as resolved.
Show resolved Hide resolved
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="17.10.48" PrivateAssets="all" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435" PrivateAssets="all" />
</ItemGroup>
</Project>
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Prerequisites:
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
* [OpenTelemetry](src/OpenTelemetry) - Demonstrates how to set up OpenTelemetry tracing and metrics for both the client and worker, using both the .NET metrics API and internal forwarding from the Core SDK.
cretz marked this conversation as resolved.
Show resolved Hide resolved
* [Patching](src/Patching) - Alter workflows safely with Patch and DeprecatePatch.
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [SafeMessageHandlers](src/SafeMessageHandlers) - Use `Semaphore` to ensure operations are atomically processed in a workflow.
Expand Down
20 changes: 18 additions & 2 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TemporalioSamples.Mutex", "
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TemporalioSamples.Saga", "src\Saga\TemporalioSamples.Saga.csproj", "{B79F07F7-3429-4C58-84C3-08587F748B2D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowUpdate", "src\WorkflowUpdate\TemporalioSamples.WorkflowUpdate.csproj", "{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TemporalioSamples.WorkflowUpdate", "src\WorkflowUpdate\TemporalioSamples.WorkflowUpdate.csproj", "{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.OpenTelemetry.DotNetMetrics", "src\OpenTelemetry\DotNetMetrics\TemporalioSamples.OpenTelemetry.DotNetMetrics.csproj", "{6E32CC8F-7008-42A5-96A7-F182B9A6DC03}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.OpenTelemetry.CoreSdkForwarding", "src\OpenTelemetry\CoreSdkForwarding\TemporalioSamples.OpenTelemetry.CoreSdkForwarding.csproj", "{785AA9A8-C440-4A62-B778-FD52423A6EDF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.SafeMessageHandlers", "src\SafeMessageHandlers\TemporalioSamples.SafeMessageHandlers.csproj", "{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}"
EndProject
Expand Down Expand Up @@ -165,6 +169,18 @@ Global
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU
{CDFAD7B0-FC43-4ECD-9E36-1023322294C9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CDFAD7B0-FC43-4ECD-9E36-1023322294C9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CDFAD7B0-FC43-4ECD-9E36-1023322294C9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CDFAD7B0-FC43-4ECD-9E36-1023322294C9}.Release|Any CPU.Build.0 = Release|Any CPU
{6E32CC8F-7008-42A5-96A7-F182B9A6DC03}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E32CC8F-7008-42A5-96A7-F182B9A6DC03}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E32CC8F-7008-42A5-96A7-F182B9A6DC03}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6E32CC8F-7008-42A5-96A7-F182B9A6DC03}.Release|Any CPU.Build.0 = Release|Any CPU
{785AA9A8-C440-4A62-B778-FD52423A6EDF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{785AA9A8-C440-4A62-B778-FD52423A6EDF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{785AA9A8-C440-4A62-B778-FD52423A6EDF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{785AA9A8-C440-4A62-B778-FD52423A6EDF}.Release|Any CPU.Build.0 = Release|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FAF5984A-5B1C-4686-B056-A4F2AC0E8EB7}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down
8 changes: 4 additions & 4 deletions src/Mutex/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ async Task RunWorkerAsync()
}
}

async Task ExecuteWorkflowsWithMutex(string resourceId)
async Task ExecuteWorkflowsWithMutexAsync(string resourceId)
{
await Task.WhenAll(Execute(), Execute());
await Task.WhenAll(ExecuteAsync(), ExecuteAsync());

return;

async Task Execute()
async Task ExecuteAsync()
{
var workflowId = "test-" + Guid.NewGuid();
Console.WriteLine($"Starting test workflow with id '{workflowId}'. Connecting to lock workflow '{resourceId}'");
Expand All @@ -68,7 +68,7 @@ async Task Execute()
await RunWorkerAsync();
break;
case "workflow":
await ExecuteWorkflowsWithMutex(args.ElementAtOrDefault(1) ?? "locked-resource-id");
await ExecuteWorkflowsWithMutexAsync(args.ElementAtOrDefault(1) ?? "locked-resource-id");
break;
default:
throw new ArgumentException("Must pass 'worker' or 'workflow' as the first argument");
Expand Down
15 changes: 15 additions & 0 deletions src/OpenTelemetry/Common/Activities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Microsoft.Extensions.Logging;
using Temporalio.Activities;

namespace TemporalioSamples.OpenTelemetry.Common;

public static class Activities
{
[Activity]
public static void MyActivity(string input)
{
ActivityExecutionContext.Current.Logger.LogInformation("Executing activity for OpenTelemetry sample.");

ActivityExecutionContext.Current.MetricMeter.CreateCounter<int>("my-activity-counter", description: "Counter used to instrument an activity.").Add(123);
}
}
24 changes: 24 additions & 0 deletions src/OpenTelemetry/Common/MyWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace TemporalioSamples.OpenTelemetry.Common;

using Microsoft.Extensions.Logging;
using Temporalio.Workflows;

[Workflow]
public class MyWorkflow
{
[WorkflowRun]
public async Task<string> RunAsync()
{
Workflow.Logger.LogInformation("Running workflow {WorkflowId}.", Workflow.Info.WorkflowId);

Workflow.MetricMeter.CreateCounter<int>("my-workflow-counter", description: "Replay-safe counter for instrumentation inside a workflow.").Add(123);
await Workflow.ExecuteActivityAsync(
() => Activities.MyActivity("input"),
new()
{
StartToCloseTimeout = TimeSpan.FromMinutes(5),
});

return "complete!";
}
}
95 changes: 95 additions & 0 deletions src/OpenTelemetry/CoreSdkForwarding/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Temporalio.Client;
using Temporalio.Extensions.OpenTelemetry;
using Temporalio.Runtime;
using Temporalio.Worker;
using TemporalioSamples.OpenTelemetry.Common;

var assemblyName = typeof(TemporalClient).Assembly.GetName();

var instanceId = args.ElementAtOrDefault(0) ?? throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument");

var resourceBuilder = ResourceBuilder.
CreateDefault().
AddService("TemporalioSamples.OpenTelemetry", serviceInstanceId: instanceId);

using var tracerProvider = Sdk.
CreateTracerProviderBuilder().
SetResourceBuilder(resourceBuilder).
AddSource(TracingInterceptor.ClientSource.Name, TracingInterceptor.WorkflowsSource.Name, TracingInterceptor.ActivitiesSource.Name).
AddOtlpExporter().
Build();

// Create a client to localhost on default namespace
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
{
LoggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information)),
Interceptors = new[] { new TracingInterceptor() },
Runtime = new TemporalRuntime(new TemporalRuntimeOptions()
{
Telemetry = new TelemetryOptions()
{
Metrics = new MetricsOptions()
{
OpenTelemetry = new OpenTelemetryOptions()
{
Url = new Uri("http://localhost:4317"),
},
},
},
}),
});

async Task RunWorkerAsync()
{
// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

// Run worker until cancelled
Console.WriteLine("Running worker");
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: "opentelemetry-sample-core-sdk-forwarding").
AddWorkflow<MyWorkflow>().
AddActivity(Activities.MyActivity));
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}

async Task ExecuteWorkflowAsync()
{
Console.WriteLine("Executing workflow");
await client.ExecuteWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
new(id: "opentelemetry-sample-core-sdk-workflow-id", taskQueue: "opentelemetry-sample-core-sdk-forwarding"));
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
await RunWorkerAsync();
break;
case "workflow":
await ExecuteWorkflowAsync();
break;
default:
throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument");
}
41 changes: 41 additions & 0 deletions src/OpenTelemetry/CoreSdkForwarding/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# OpenTelemetry - .Core SDK Forwarding

This sample shows how to configure the SDK to forward metrics from the Core SDK.

The main advantage over using .NET metrics is simplicity.

This sample also shows how to configure custom metrics from both an activity and a workflow in a replay-safe manner.

To run, first see [README.md](../../../README.md) for prerequisites.

Then, run the following from [one directory up ](../docker-compose.yaml) to start the .NET Aspire Dashboard which will collect telemetry. The dashboard UI is available at http://localhost:18888.

docker compose up

Then, run the following from this directory in a separate terminal to start the worker:

dotnet run worker

Then in another terminal, run the workflow from this directory:

dotnet run workflow

The workflow will complete.

## Traces

Traces can be viewed at http://localhost:18888/traces.

You can select either `worker` or `workflow` for traces; both should show the same trace. The workflow should appear and when clicked, may look something like:

![Tracing Screenshot](tracing-screenshot.png)

## Metrics

Metrics can be viewed by clicking the metrics tab on the dashboard.

Select `temporal-core-sdk`.

All metrics emitted by the Core SDK will be shown. It may look something like:

![Metrics Screenshot](metrics-screenshot.png)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Common\**\*.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
</ItemGroup>

</Project>
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
103 changes: 103 additions & 0 deletions src/OpenTelemetry/DotNetMetrics/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System.Diagnostics.Metrics;
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Temporalio.Client;
using Temporalio.Extensions.DiagnosticSource;
using Temporalio.Extensions.OpenTelemetry;
using Temporalio.Runtime;
using Temporalio.Worker;
using TemporalioSamples.OpenTelemetry.Common;

var assemblyName = typeof(TemporalClient).Assembly.GetName();

using var meter = new Meter(assemblyName.Name!, assemblyName.Version!.ToString());

var instanceId = args.ElementAtOrDefault(0) ?? throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument");

var resourceBuilder = ResourceBuilder.
CreateDefault().
AddService("TemporalioSamples.OpenTelemetry", serviceInstanceId: instanceId);

using var tracerProvider = Sdk.
CreateTracerProviderBuilder().
SetResourceBuilder(resourceBuilder).
AddSource(TracingInterceptor.ClientSource.Name, TracingInterceptor.WorkflowsSource.Name, TracingInterceptor.ActivitiesSource.Name).
AddOtlpExporter().
Build();

using var meterProvider = Sdk.
CreateMeterProviderBuilder().
SetResourceBuilder(resourceBuilder).
AddMeter(assemblyName.Name!).
AddOtlpExporter().
Build();

// Create a client to localhost on default namespace
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
{
LoggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information)),
Interceptors = new[] { new TracingInterceptor() },
Runtime = new TemporalRuntime(new TemporalRuntimeOptions()
{
Telemetry = new TelemetryOptions()
{
Metrics = new MetricsOptions()
{
CustomMetricMeter = new CustomMetricMeter(meter),
},
},
}),
});

async Task RunWorkerAsync()
{
// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

// Run worker until cancelled
Console.WriteLine("Running worker");
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: "opentelemetry-sample-dotnet-metrics").
AddWorkflow<MyWorkflow>().
AddActivity(Activities.MyActivity));
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}

async Task ExecuteWorkflowAsync()
{
Console.WriteLine("Executing workflow");
await client.ExecuteWorkflowAsync(
(MyWorkflow wf) => wf.RunAsync(),
new(id: "opentelemetry-sample-dotnet-workflow-id", taskQueue: "opentelemetry-sample-dotnet-metrics"));
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
await RunWorkerAsync();
break;
case "workflow":
await ExecuteWorkflowAsync();
break;
default:
throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument");
}
Loading