diff --git a/EventStore.Client.sln b/EventStore.Client.sln
index 51229f72c..84919f42b 100644
--- a/EventStore.Client.sln
+++ b/EventStore.Client.sln
@@ -33,6 +33,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.UserManag
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Tests.Common", "test\EventStore.Client.Tests.Common\EventStore.Client.Tests.Common.csproj", "{E326832D-DE52-4DE4-9E54-C800908B75F3}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Extensions.OpenTelemetry", "src\EventStore.Client.Extensions.OpenTelemetry\EventStore.Client.Extensions.OpenTelemetry.csproj", "{3723933C-585A-49BE-98E8-52D3FAD904CE}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
@@ -94,6 +96,10 @@ Global
{E326832D-DE52-4DE4-9E54-C800908B75F3}.Debug|x64.Build.0 = Debug|Any CPU
{E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.ActiveCfg = Release|Any CPU
{E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.Build.0 = Release|Any CPU
+ {3723933C-585A-49BE-98E8-52D3FAD904CE}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {3723933C-585A-49BE-98E8-52D3FAD904CE}.Debug|x64.Build.0 = Debug|Any CPU
+ {3723933C-585A-49BE-98E8-52D3FAD904CE}.Release|x64.ActiveCfg = Release|Any CPU
+ {3723933C-585A-49BE-98E8-52D3FAD904CE}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{D3744A86-DD35-4104-AAEE-84B79062C4A2} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
@@ -109,5 +115,6 @@ Global
{6CEB731F-72E1-461F-A6B3-54DBF3FD786C} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{22634CEE-4F7B-4679-A48D-38A2A8580ECA} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{E326832D-DE52-4DE4-9E54-C800908B75F3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
+ {3723933C-585A-49BE-98E8-52D3FAD904CE} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
EndGlobalSection
EndGlobal
diff --git a/README.md b/README.md
index 37657a3df..27e3feb40 100644
--- a/README.md
+++ b/README.md
@@ -19,6 +19,24 @@ Reference the nuget package(s) for the API that you would like to call
[User Management](https://www.nuget.org/packages/EventStore.Client.Grpc.UserManagement)
+## Open Telemetry
+
+Telemetry instrumentation can be enabled by installing the [Open Telemetry Extensions](https://www.nuget.org/packages/EventStore.Client.Extensions.OpenTelemetry) package.
+
+Once installed you can configure instrumentation using the `AddEventStoreClientInstrumentation` extension method on a `TracerProviderBuilder`.
+
+```csharp
+using var tracerProvider = Sdk.CreateTracerProviderBuilder()
+ ...
+ .AddEventStoreClientInstrumentation()
+ ...
+ .Build();
+```
+
+Tracing is the only telemetry currently exported, specifically for the `Append` and `Subscribe` (Catchup and Persistent) operations.
+
+For more information about Open Telemetry, refer to the [official documentation](https://opentelemetry.io/docs/what-is-opentelemetry/).
+
## Support
Information on support and commercial tools such as LDAP authentication can be found here: [Event Store Support](https://eventstore.com/support/).
diff --git a/gencert.ps1 b/gencert.ps1
index 3908f57e8..593e9c386 100644
--- a/gencert.ps1
+++ b/gencert.ps1
@@ -7,13 +7,13 @@ New-Item -ItemType Directory -Path .\certs -Force
icacls .\certs /grant:r "$($env:UserName):(OI)(CI)RX"
# Pull the Docker image
-docker pull eventstore/es-gencert-cli:1.0.2
+docker pull ghcr.io/eventstore/es-gencert-cli:1.3.0
# Create CA certificate
-docker run --rm --volume ${PWD}\certs:/tmp --user (Get-Process -Id $PID).SessionId eventstore/es-gencert-cli:1.0.2 create-ca -out /tmp/ca
+docker run --rm --volume ${PWD}\certs:/tmp --user (Get-Process -Id $PID).SessionId ghcr.io/eventstore/es-gencert-cli:1.3.0 create-ca -out /tmp/ca
# Create node certificate
-docker run --rm --volume ${PWD}\certs:/tmp --user (Get-Process -Id $PID).SessionId eventstore/es-gencert-cli:1.0.2 create-node -ca-certificate /tmp/ca/ca.crt -ca-key /tmp/ca/ca.key -out /tmp/node -ip-addresses 127.0.0.1 -dns-names localhost
+docker run --rm --volume ${PWD}\certs:/tmp --user (Get-Process -Id $PID).SessionId ghcr.io/eventstore/es-gencert-cli:1.3.0 create-node -ca-certificate /tmp/ca/ca.crt -ca-key /tmp/ca/ca.key -out /tmp/node -ip-addresses 127.0.0.1 -dns-names localhost
# Set permissions recursively for the directory
icacls .\certs /grant:r "$($env:UserName):(OI)(CI)RX"
diff --git a/gencert.sh b/gencert.sh
index fa640f624..2d8d5e346 100755
--- a/gencert.sh
+++ b/gencert.sh
@@ -13,11 +13,11 @@ mkdir -p certs
chmod 0755 ./certs
-docker pull eventstore/es-gencert-cli:1.0.2
+docker pull ghcr.io/eventstore/es-gencert-cli:1.3.0
-docker run --rm --volume $PWD/certs:/tmp --user $(id -u):$(id -g) eventstore/es-gencert-cli:1.0.2 create-ca -out /tmp/ca
+docker run --rm --volume $PWD/certs:/tmp --user $(id -u):$(id -g) ghcr.io/eventstore/es-gencert-cli:1.3.0 create-ca -out /tmp/ca
-docker run --rm --volume $PWD/certs:/tmp --user $(id -u):$(id -g) eventstore/es-gencert-cli:1.0.2 create-node -ca-certificate /tmp/ca/ca.crt -ca-key /tmp/ca/ca.key -out /tmp/node -ip-addresses 127.0.0.1 -dns-names localhost
+docker run --rm --volume $PWD/certs:/tmp --user $(id -u):$(id -g) ghcr.io/eventstore/es-gencert-cli:1.3.0 create-node -ca-certificate /tmp/ca/ca.crt -ca-key /tmp/ca/ca.key -out /tmp/node -ip-addresses 127.0.0.1 -dns-names localhost
chmod -R 0755 ./certs
diff --git a/samples/secure-with-tls/docker-compose.certs.yml b/samples/secure-with-tls/docker-compose.certs.yml
index 56c6278dc..03d2e225f 100644
--- a/samples/secure-with-tls/docker-compose.certs.yml
+++ b/samples/secure-with-tls/docker-compose.certs.yml
@@ -16,7 +16,7 @@ services:
network_mode: none
cert-gen:
- image: eventstore/es-gencert-cli:1.0.2
+ image: ghcr.io/eventstore/es-gencert-cli:1.3.0
container_name: cert-gen
user: "1000:1000"
entrypoint: [ "/bin/sh","-c" ]
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index c12d9449c..609de534e 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -4,7 +4,7 @@
EventStore.Client
-
+
$(MSBuildProjectName.Remove(0,18))
$(ESPackageIdSuffix.ToLower()).proto
../EventStore.Client.Common/protos/$(ESProto)
@@ -50,6 +50,10 @@
+
+
+
+
<_Parameter1>$(ProjectName).Tests
diff --git a/src/EventStore.Client.Common/Constants.cs b/src/EventStore.Client.Common/Constants.cs
index 3e0279e6b..a088bcab2 100644
--- a/src/EventStore.Client.Common/Constants.cs
+++ b/src/EventStore.Client.Common/Constants.cs
@@ -39,10 +39,10 @@ public static class Exceptions {
}
public static class Metadata {
- public const string Type = "type";
- public const string Created = "created";
- public const string ContentType = "content-type";
- public static readonly string[] RequiredMetadata = { Type, ContentType };
+ public const string Type = "type";
+ public const string Created = "created";
+ public const string ContentType = "content-type";
+ public static readonly string[] RequiredMetadata = { Type, ContentType };
public static class ContentTypes {
public const string ApplicationJson = "application/json";
@@ -58,4 +58,4 @@ public static class Headers {
public const string ConnectionName = "connection-name";
public const string RequiresLeader = "requires-leader";
}
-}
\ No newline at end of file
+}
diff --git a/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj b/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj
new file mode 100644
index 000000000..2783f0adc
--- /dev/null
+++ b/src/EventStore.Client.Extensions.OpenTelemetry/EventStore.Client.Extensions.OpenTelemetry.csproj
@@ -0,0 +1,20 @@
+
+
+
+ EventStore.Client.Extensions.OpenTelemetry
+
+
+
+ EventStore.Client.Extensions.OpenTelemetry
+ Extensions used to facilitate instrumentation of the EventStore Client.
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs b/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs
new file mode 100644
index 000000000..30e1de843
--- /dev/null
+++ b/src/EventStore.Client.Extensions.OpenTelemetry/TracerProviderBuilderExtensions.cs
@@ -0,0 +1,19 @@
+using EventStore.Client.Diagnostics;
+using JetBrains.Annotations;
+using OpenTelemetry.Trace;
+
+namespace EventStore.Client.Extensions.OpenTelemetry;
+
+///
+/// Extension methods used to facilitate tracing instrumentation of the EventStore Client.
+///
+[PublicAPI]
+public static class TracerProviderBuilderExtensions {
+ ///
+ /// Adds the EventStore client ActivitySource name to the list of subscribed sources on the
+ ///
+ /// being configured.
+ /// The instance of to chain configuration.
+ public static TracerProviderBuilder AddEventStoreClientInstrumentation(this TracerProviderBuilder builder)
+ => builder.AddSource(EventStoreClientInstrumentation.ActivitySourceName);
+}
diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs
index 148ebcf8d..7f671dcfa 100644
--- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs
+++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs
@@ -1,4 +1,5 @@
using System.Threading.Channels;
+using EventStore.Client.Diagnostics;
using EventStore.Client.PersistentSubscriptions;
using Grpc.Core;
using static EventStore.Client.PersistentSubscriptions.ReadResp.ContentOneofCase;
@@ -12,18 +13,28 @@ partial class EventStorePersistentSubscriptionsClient {
///
///
[Obsolete("SubscribeAsync is no longer supported. Use SubscribeToStream with manual acks instead.", false)]
- public async Task SubscribeAsync(string streamName, string groupName,
+ public async Task SubscribeAsync(
+ string streamName, string groupName,
Func eventAppeared,
Action? subscriptionDropped = null,
UserCredentials? userCredentials = null, int bufferSize = 10, bool autoAck = true,
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
if (autoAck) {
throw new InvalidOperationException(
- $"AutoAck is no longer supported. Please use {nameof(SubscribeToStreamAsync)} with manual acks instead.");
+ $"AutoAck is no longer supported. Please use {nameof(SubscribeToStreamAsync)} with manual acks instead."
+ );
}
- return await SubscribeToStreamAsync(streamName, groupName, eventAppeared, subscriptionDropped,
- userCredentials, bufferSize, cancellationToken).ConfigureAwait(false);
+ return await SubscribeToStreamAsync(
+ streamName,
+ groupName,
+ eventAppeared,
+ subscriptionDropped,
+ userCredentials,
+ bufferSize,
+ cancellationToken
+ ).ConfigureAwait(false);
}
///
@@ -32,15 +43,26 @@ public async Task SubscribeAsync(string streamName, stri
///
///
///
- [Obsolete("SubscribeToStreamAsync is no longer supported. Use SubscribeToStream with manual acks instead.", false)]
- public async Task SubscribeToStreamAsync(string streamName, string groupName,
- Func eventAppeared,
- Action? subscriptionDropped = null,
- UserCredentials? userCredentials = null, int bufferSize = 10,
- CancellationToken cancellationToken = default) {
+ [Obsolete(
+ "SubscribeToStreamAsync is no longer supported. Use SubscribeToStream with manual acks instead.",
+ false
+ )]
+ public async Task SubscribeToStreamAsync(
+ string streamName, string groupName,
+ Func eventAppeared,
+ Action? subscriptionDropped = null,
+ UserCredentials? userCredentials = null, int bufferSize = 10,
+ CancellationToken cancellationToken = default
+ ) {
return await PersistentSubscription
- .Confirm(SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken),
- eventAppeared, subscriptionDropped ?? delegate { }, _log, userCredentials, cancellationToken)
+ .Confirm(
+ SubscribeToStream(streamName, groupName, bufferSize, userCredentials, cancellationToken),
+ eventAppeared,
+ subscriptionDropped ?? delegate { },
+ _log,
+ userCredentials,
+ cancellationToken
+ )
.ConfigureAwait(false);
}
@@ -53,8 +75,10 @@ public async Task SubscribeToStreamAsync(string streamNa
/// The optional user credentials to perform operation with.
/// The optional .
///
- public PersistentSubscriptionResult SubscribeToStream(string streamName, string groupName, int bufferSize = 10,
- UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) {
+ public PersistentSubscriptionResult SubscribeToStream(
+ string streamName, string groupName, int bufferSize = 10,
+ UserCredentials? userCredentials = null, CancellationToken cancellationToken = default
+ ) {
if (streamName == null) {
throw new ArgumentNullException(nameof(streamName));
}
@@ -77,7 +101,7 @@ public PersistentSubscriptionResult SubscribeToStream(string streamName, string
var readOptions = new ReadReq.Types.Options {
BufferSize = bufferSize,
- GroupName = groupName,
+ GroupName = groupName,
UuidOption = new ReadReq.Types.Options.Types.UUIDOption { Structured = new Empty() }
};
@@ -87,29 +111,48 @@ public PersistentSubscriptionResult SubscribeToStream(string streamName, string
readOptions.StreamIdentifier = streamName;
}
- return new PersistentSubscriptionResult(streamName, groupName, async ct => {
- var channelInfo = await GetChannelInfo(ct).ConfigureAwait(false);
-
- if (streamName == SystemStreams.AllStream &&
- !channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) {
- throw new NotSupportedException("The server does not support persistent subscriptions to $all.");
- }
+ return new PersistentSubscriptionResult(
+ streamName,
+ groupName,
+ async ct => {
+ var channelInfo = await GetChannelInfo(ct).ConfigureAwait(false);
+
+ if (streamName == SystemStreams.AllStream &&
+ !channelInfo.ServerCapabilities.SupportsPersistentSubscriptionsToAll) {
+ throw new NotSupportedException(
+ "The server does not support persistent subscriptions to $all."
+ );
+ }
- return channelInfo.CallInvoker;
- }, new() { Options = readOptions }, Settings, userCredentials, cancellationToken);
+ return channelInfo;
+ },
+ new() { Options = readOptions },
+ Settings,
+ userCredentials,
+ cancellationToken
+ );
}
///
/// Subscribes to a persistent subscription to $all. Messages must be manually acknowledged
///
[Obsolete("SubscribeToAllAsync is no longer supported. Use SubscribeToAll with manual acks instead.", false)]
- public async Task SubscribeToAllAsync(string groupName,
- Func eventAppeared,
- Action? subscriptionDropped = null,
- UserCredentials? userCredentials = null, int bufferSize = 10,
- CancellationToken cancellationToken = default) =>
- await SubscribeToStreamAsync(SystemStreams.AllStream, groupName, eventAppeared, subscriptionDropped,
- userCredentials, bufferSize, cancellationToken)
+ public async Task SubscribeToAllAsync(
+ string groupName,
+ Func eventAppeared,
+ Action? subscriptionDropped = null,
+ UserCredentials? userCredentials = null, int bufferSize = 10,
+ CancellationToken cancellationToken = default
+ ) =>
+ await SubscribeToStreamAsync(
+ SystemStreams.AllStream,
+ groupName,
+ eventAppeared,
+ subscriptionDropped,
+ userCredentials,
+ bufferSize,
+ cancellationToken
+ )
.ConfigureAwait(false);
///
@@ -120,31 +163,33 @@ await SubscribeToStreamAsync(SystemStreams.AllStream, groupName, eventAppeared,
/// The optional user credentials to perform operation with.
/// The optional .
///
- public PersistentSubscriptionResult SubscribeToAll(string groupName, int bufferSize = 10,
- UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) =>
+ public PersistentSubscriptionResult SubscribeToAll(
+ string groupName, int bufferSize = 10,
+ UserCredentials? userCredentials = null, CancellationToken cancellationToken = default
+ ) =>
SubscribeToStream(SystemStreams.AllStream, groupName, bufferSize, userCredentials, cancellationToken);
///
public class PersistentSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable {
- private const int MaxEventIdLength = 2000;
- private readonly ReadReq _request;
+ private const int MaxEventIdLength = 2000;
+ private readonly ReadReq _request;
private readonly Channel _channel;
- private readonly CancellationTokenSource _cts;
- private readonly CallOptions _callOptions;
+ private readonly CancellationTokenSource _cts;
+ private readonly CallOptions _callOptions;
private AsyncDuplexStreamingCall? _call;
- private int _messagesEnumerated;
+ private int _messagesEnumerated;
///
/// The server-generated unique identifier for the subscription.
///
public string? SubscriptionId { get; private set; }
-
+
///
/// The name of the stream to read events from.
///
public string StreamName { get; }
-
+
///
/// The name of the persistent subscription group.
///
@@ -178,17 +223,22 @@ async IAsyncEnumerable GetMessages() {
}
}
- internal PersistentSubscriptionResult(string streamName, string groupName,
- Func> selectCallInvoker,
+ internal PersistentSubscriptionResult(
+ string streamName, string groupName,
+ Func> selectChannelInfo,
ReadReq request, EventStoreClientSettings settings, UserCredentials? userCredentials,
- CancellationToken cancellationToken) {
+ CancellationToken cancellationToken
+ ) {
StreamName = streamName;
- GroupName = groupName;
-
+ GroupName = groupName;
+
_request = request;
-
- _callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials,
- cancellationToken: cancellationToken);
+
+ _callOptions = EventStoreCallOptions.CreateStreaming(
+ settings,
+ userCredentials: userCredentials,
+ cancellationToken: cancellationToken
+ );
_channel = Channel.CreateBounded(ReadBoundedChannelOptions);
@@ -200,25 +250,42 @@ internal PersistentSubscriptionResult(string streamName, string groupName,
async Task PumpMessages() {
try {
- var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false);
- var client = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
- callInvoker);
+ var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false);
+ var client =
+ new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
+ channelInfo.CallInvoker
+ );
+
_call = client.Read(_callOptions);
await _call.RequestStream.WriteAsync(_request).ConfigureAwait(false);
await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token)
.ConfigureAwait(false)) {
- await _channel.Writer.WriteAsync(response.ContentCase switch {
+ PersistentSubscriptionMessage subscriptionMessage = response.ContentCase switch {
SubscriptionConfirmation => new PersistentSubscriptionMessage.SubscriptionConfirmation(
- response.SubscriptionConfirmation.SubscriptionId),
- Event => new PersistentSubscriptionMessage.Event(ConvertToResolvedEvent(response),
+ response.SubscriptionConfirmation.SubscriptionId
+ ),
+ Event => new PersistentSubscriptionMessage.Event(
+ ConvertToResolvedEvent(response),
response.Event.CountCase switch {
ReadResp.Types.ReadEvent.CountOneofCase.RetryCount => response.Event.RetryCount,
- _ => null
- }),
+ _ => null
+ }
+ ),
_ => PersistentSubscriptionMessage.Unknown.Instance
- }, _cts.Token).ConfigureAwait(false);
+ };
+
+ if (subscriptionMessage is PersistentSubscriptionMessage.Event evnt)
+ EventStoreClientDiagnostics.TraceSubscriptionEvent(
+ SubscriptionId,
+ evnt.ResolvedEvent,
+ channelInfo,
+ settings,
+ userCredentials
+ );
+
+ await _channel.Writer.WriteAsync(subscriptionMessage, _cts.Token).ConfigureAwait(false);
}
_channel.Writer.TryComplete();
@@ -240,11 +307,15 @@ when rex2.Status.Detail.Contains("No grpc-status found on response"):
}
#endif
if (ex is PersistentSubscriptionNotFoundException) {
- await _channel.Writer.WriteAsync(PersistentSubscriptionMessage.NotFound.Instance,
- cancellationToken).ConfigureAwait(false);
+ await _channel.Writer.WriteAsync(
+ PersistentSubscriptionMessage.NotFound.Instance,
+ cancellationToken
+ ).ConfigureAwait(false);
+
_channel.Writer.TryComplete();
return;
}
+
_channel.Writer.TryComplete(ex);
}
}
@@ -280,7 +351,6 @@ public Task Ack(params ResolvedEvent[] resolvedEvents) =>
public Task Ack(IEnumerable resolvedEvents) =>
Ack(resolvedEvents.Select(resolvedEvent => resolvedEvent.OriginalEvent.EventId));
-
///
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
///
@@ -298,58 +368,72 @@ public Task Nack(PersistentSubscriptionNakEventAction action, string reason, par
/// A reason given.
/// The s to nak. There should not be more than 2000 to nak at a time.
/// The number of resolvedEvents exceeded the limit of 2000.
- public Task Nack(PersistentSubscriptionNakEventAction action, string reason,
- params ResolvedEvent[] resolvedEvents) => Nack(action, reason,
- Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId));
+ public Task Nack(
+ PersistentSubscriptionNakEventAction action, string reason,
+ params ResolvedEvent[] resolvedEvents
+ ) => Nack(
+ action,
+ reason,
+ Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId)
+ );
private static ResolvedEvent ConvertToResolvedEvent(ReadResp response) => new(
ConvertToEventRecord(response.Event.Event)!,
ConvertToEventRecord(response.Event.Link),
response.Event.PositionCase switch {
ReadResp.Types.ReadEvent.PositionOneofCase.CommitPosition => response.Event.CommitPosition,
- _ => null
- });
+ _ => null
+ }
+ );
private Task AckInternal(params Uuid[] eventIds) {
if (eventIds.Length > MaxEventIdLength) {
throw new ArgumentException(
- $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.", nameof(eventIds));
+ $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.",
+ nameof(eventIds)
+ );
}
return _call is null
? throw new InvalidOperationException()
- : _call.RequestStream.WriteAsync(new ReadReq {
- Ack = new ReadReq.Types.Ack {
- Ids = {
- Array.ConvertAll(eventIds, id => id.ToDto())
+ : _call.RequestStream.WriteAsync(
+ new ReadReq {
+ Ack = new ReadReq.Types.Ack {
+ Ids = {
+ Array.ConvertAll(eventIds, id => id.ToDto())
+ }
}
}
- });
+ );
}
private Task NackInternal(Uuid[] eventIds, PersistentSubscriptionNakEventAction action, string reason) {
if (eventIds.Length > MaxEventIdLength) {
throw new ArgumentException(
- $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.", nameof(eventIds));
+ $"The number of eventIds exceeds the maximum length of {MaxEventIdLength}.",
+ nameof(eventIds)
+ );
}
return _call is null
? throw new InvalidOperationException()
- : _call.RequestStream.WriteAsync(new ReadReq {
- Nack = new ReadReq.Types.Nack {
- Ids = {
- Array.ConvertAll(eventIds, id => id.ToDto())
- },
- Action = action switch {
- PersistentSubscriptionNakEventAction.Park => ReadReq.Types.Nack.Types.Action.Park,
- PersistentSubscriptionNakEventAction.Retry => ReadReq.Types.Nack.Types.Action.Retry,
- PersistentSubscriptionNakEventAction.Skip => ReadReq.Types.Nack.Types.Action.Skip,
- PersistentSubscriptionNakEventAction.Stop => ReadReq.Types.Nack.Types.Action.Stop,
- _ => ReadReq.Types.Nack.Types.Action.Unknown
- },
- Reason = reason
+ : _call.RequestStream.WriteAsync(
+ new ReadReq {
+ Nack = new ReadReq.Types.Nack {
+ Ids = {
+ Array.ConvertAll(eventIds, id => id.ToDto())
+ },
+ Action = action switch {
+ PersistentSubscriptionNakEventAction.Park => ReadReq.Types.Nack.Types.Action.Park,
+ PersistentSubscriptionNakEventAction.Retry => ReadReq.Types.Nack.Types.Action.Retry,
+ PersistentSubscriptionNakEventAction.Skip => ReadReq.Types.Nack.Types.Action.Skip,
+ PersistentSubscriptionNakEventAction.Stop => ReadReq.Types.Nack.Types.Action.Stop,
+ _ => ReadReq.Types.Nack.Types.Action.Unknown
+ },
+ Reason = reason
+ }
}
- });
+ );
}
private static EventRecord? ConvertToEventRecord(ReadResp.Types.ReadEvent.Types.RecordedEvent? e) =>
@@ -362,7 +446,8 @@ e is null
new Position(e.CommitPosition, e.PreparePosition),
e.Metadata,
e.Data.ToByteArray(),
- e.CustomMetadata.ToByteArray());
+ e.CustomMetadata.ToByteArray()
+ );
///
public async ValueTask DisposeAsync() {
@@ -375,9 +460,11 @@ static async Task CastAndDispose(IDisposable? resource) {
switch (resource) {
case null:
return;
+
case IAsyncDisposable resourceAsyncDisposable:
await resourceAsyncDisposable.DisposeAsync().ConfigureAwait(false);
break;
+
default:
resource.Dispose();
break;
@@ -385,7 +472,6 @@ static async Task CastAndDispose(IDisposable? resource) {
}
}
-
///
public void Dispose() {
_cts.Dispose();
@@ -394,7 +480,8 @@ public void Dispose() {
///
public async IAsyncEnumerator GetAsyncEnumerator(
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
await foreach (var message in Messages.WithCancellation(cancellationToken)) {
if (message is not PersistentSubscriptionMessage.Event(var resolvedEvent, _)) {
continue;
diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs
index 9d0842855..ae82c98f4 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs
@@ -1,14 +1,13 @@
-using System;
using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
+using System.Diagnostics;
using System.Threading.Channels;
using Google.Protobuf;
using EventStore.Client.Streams;
using Grpc.Core;
using Microsoft.Extensions.Logging;
-using System.Runtime.CompilerServices;
+using EventStore.Client.Diagnostics;
+using EventStore.Client.Diagnostics.Telemetry;
+using EventStore.Client.Diagnostics.Tracing;
namespace EventStore.Client {
public partial class EventStoreClient {
@@ -30,24 +29,30 @@ public async Task AppendToStreamAsync(
Action? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
var options = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(options);
_log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedRevision);
- var batchAppender = _streamAppender;
var task =
- userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false)
- ? batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
+ userCredentials == null && await _batchAppender.IsUsable().ConfigureAwait(false)
+ ? _batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
- (await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker,
+ await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new AppendReq.Types.Options {
StreamIdentifier = streamName,
Revision = expectedRevision
}
- }, eventData, options, deadline, userCredentials, cancellationToken);
+ },
+ eventData,
+ options,
+ deadline,
+ userCredentials,
+ cancellationToken
+ );
return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(options);
}
@@ -70,29 +75,35 @@ public async Task AppendToStreamAsync(
Action? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);
- _log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedState);
+ _log.LogDebug("Append to stream - {streamName}@{expectedState}.", streamName, expectedState);
- var batchAppender = _streamAppender;
var task =
- userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false)
- ? batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken)
+ userCredentials == null && await _batchAppender.IsUsable().ConfigureAwait(false)
+ ? _batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
- (await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker,
+ await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new AppendReq.Types.Options {
StreamIdentifier = streamName
}
- }.WithAnyStreamRevision(expectedState), eventData, operationOptions, deadline, userCredentials,
- cancellationToken);
+ }.WithAnyStreamRevision(expectedState),
+ eventData,
+ operationOptions,
+ deadline,
+ userCredentials,
+ cancellationToken
+ );
+
return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(operationOptions);
}
- private async ValueTask AppendToStreamInternal(
- CallInvoker callInvoker,
+ private ValueTask AppendToStreamInternal(
+ ChannelInfo channelInfo,
AppendReq header,
IEnumerable eventData,
EventStoreClientOperationOptions operationOptions,
@@ -100,39 +111,54 @@ private async ValueTask AppendToStreamInternal(
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
- using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
- EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
+ return EventStoreClientDiagnostics.TraceOperation(
+ Operation,
+ TracingConstants.Operations.Append,
+ new ActivityTagsCollection {
+ {
+ TelemetryAttributes.EventStoreStream,
+ header.Options.StreamIdentifier.StreamName.ToStringUtf8()
+ }
+ }
+ .WithTagsFrom(channelInfo, Settings)
+ .WithTagsFrom(userCredentials)
);
- await call.RequestStream.WriteAsync(header).ConfigureAwait(false);
+ async ValueTask Operation() {
+ using var call = new Streams.Streams.StreamsClient(channelInfo.CallInvoker).Append(
+ EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
+ );
+
+ await call.RequestStream.WriteAsync(header).ConfigureAwait(false);
- foreach (var e in eventData) {
- await call.RequestStream.WriteAsync(
- new AppendReq {
+ foreach (var e in eventData) {
+ var appendReq = new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
- CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
+ CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingMetadata()),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
- },
- }
- ).ConfigureAwait(false);
- }
+ }
+ };
- await call.RequestStream.CompleteAsync().ConfigureAwait(false);
+ await call.RequestStream.WriteAsync(appendReq).ConfigureAwait(false);
+ }
- var response = await call.ResponseAsync.ConfigureAwait(false);
+ await call.RequestStream.CompleteAsync().ConfigureAwait(false);
+
+ var response = await call.ResponseAsync.ConfigureAwait(false);
- if (response.Success != null)
- return HandleSuccessAppend(response, header);
+ if (response.Success != null)
+ return HandleSuccessAppend(response, header);
- if (response.WrongExpectedVersion == null)
- throw new InvalidOperationException("The operation completed with an unexpected result.");
+ if (response.WrongExpectedVersion == null)
+ throw new InvalidOperationException("The operation completed with an unexpected result.");
- return HandleWrongExpectedRevision(response, header, operationOptions);
+ return HandleWrongExpectedRevision(response, header, operationOptions);
+ }
}
private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
@@ -150,7 +176,8 @@ private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header)
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
position,
- currentRevision);
+ currentRevision
+ );
return new SuccessResult(currentRevision, position);
}
@@ -224,144 +251,203 @@ private class StreamAppender : IDisposable {
private readonly Action _onException;
private readonly Channel _channel;
private readonly ConcurrentDictionary> _pendingRequests;
+ private readonly TaskCompletionSource _isUsable;
- private readonly Task?> _callTask;
+ private ChannelInfo? _channelInfo;
+ AsyncDuplexStreamingCall? _call;
- public StreamAppender(EventStoreClientSettings settings,
- Task?> callTask, CancellationToken cancellationToken,
- Action onException) {
+ public StreamAppender(
+ EventStoreClientSettings settings,
+ ValueTask channelInfoTask,
+ CancellationToken cancellationToken,
+ Action onException
+ ) {
_settings = settings;
- _callTask = callTask;
_cancellationToken = cancellationToken;
_onException = onException;
- _channel = System.Threading.Channels.Channel.CreateBounded(10000);
+ _channel = Channel.CreateBounded(10000);
_pendingRequests = new ConcurrentDictionary>();
- _ = Task.Factory.StartNew(Send);
- _ = Task.Factory.StartNew(Receive);
+ _isUsable = new TaskCompletionSource();
+
+ _ = Task.Run(() => Duplex(channelInfoTask), cancellationToken);
}
- public ValueTask Append(string streamName, StreamRevision expectedStreamPosition,
- IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) =>
- AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter),
- events, cancellationToken);
+ public ValueTask Append(
+ string streamName, StreamRevision expectedStreamPosition,
+ IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default
+ ) =>
+ AppendInternal(
+ BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter),
+ events,
+ cancellationToken
+ );
- public ValueTask Append(string streamName, StreamState expectedStreamState,
- IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) =>
- AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter),
- events, cancellationToken);
+ public ValueTask Append(
+ string streamName, StreamState expectedStreamState,
+ IEnumerable events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default
+ ) =>
+ AppendInternal(
+ BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter),
+ events,
+ cancellationToken
+ );
- public async ValueTask IsUsable() {
- var call = await _callTask.ConfigureAwait(false);
- return call != null;
- }
+ public Task IsUsable() => _isUsable.Task;
+
+ private ValueTask AppendInternal(
+ BatchAppendReq.Types.Options options,
+ IEnumerable events, CancellationToken cancellationToken
+ ) {
+ return EventStoreClientDiagnostics.TraceOperation(
+ Operation,
+ TracingConstants.Operations.Append,
+ new ActivityTagsCollection {
+ {
+ TelemetryAttributes.EventStoreStream, options.StreamIdentifier.StreamName.ToStringUtf8()
+ }
+ }
+ .WithTagsFrom(_channelInfo, _settings)
+ );
- private async Task Receive() {
- try {
- var call = await _callTask.ConfigureAwait(false);
- if (call is null) {
- _channel.Writer.TryComplete(
- new NotSupportedException("Server does not support batch append"));
- return;
- }
+ async ValueTask Operation() {
+ var correlationId = Uuid.NewUuid();
- await foreach (var response in call.ResponseStream.ReadAllAsync(_cancellationToken)
- .ConfigureAwait(false)) {
- if (!_pendingRequests.TryRemove(Uuid.FromDto(response.CorrelationId), out var writeResult)) {
- continue; // TODO: Log?
- }
+ var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource());
- try {
- writeResult.TrySetResult(response.ToWriteResult());
- } catch (Exception ex) {
- writeResult.TrySetException(ex);
+ try {
+ foreach (var appendRequest in GetRequests(events, options, correlationId)) {
+ await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
}
+ } catch (ChannelClosedException ex) {
+ // channel is closed, our tcs won't necessarily get completed, don't wait for it.
+ throw ex.InnerException ?? ex;
}
- } catch (Exception ex) {
- // signal that no tcs added to _pendingRequests after this point will necessarily complete
- _channel.Writer.TryComplete(ex);
- // complete whatever tcs's we have
- _onException(ex);
- foreach (var request in _pendingRequests) {
- request.Value.TrySetException(ex);
- }
+ return await complete.Task.ConfigureAwait(false);
}
}
- private async Task Send() {
- var call = await _callTask.ConfigureAwait(false);
- if (call is null)
- throw new NotSupportedException("Server does not support batch append");
+ private async Task Duplex(
+ ValueTask channelInfoTask
+ ) {
+ try {
+ _channelInfo = await channelInfoTask.ConfigureAwait(false);
+ if (!_channelInfo.ServerCapabilities.SupportsBatchAppend) {
+ _channel.Writer.TryComplete(new NotSupportedException("Server does not support batch append"));
+ _isUsable.TrySetResult(false);
+ return;
+ }
- await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken)
- .ConfigureAwait(false)) {
- await call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);
- }
+ _call = new Streams.Streams.StreamsClient(_channelInfo.CallInvoker).BatchAppend(
+ EventStoreCallOptions.CreateStreaming(
+ _settings,
+ userCredentials: _settings.DefaultCredentials,
+ cancellationToken: _cancellationToken
+ )
+ );
- await call.RequestStream.CompleteAsync().ConfigureAwait(false);
- }
+ _ = Task.Run(Send, _cancellationToken);
+ _ = Task.Run(Receive, _cancellationToken);
- private async ValueTask AppendInternal(BatchAppendReq.Types.Options options,
- IEnumerable events, CancellationToken cancellationToken) {
- var batchSize = 0;
- var correlationId = Uuid.NewUuid();
- var correlationIdDto = correlationId.ToDto();
+ _isUsable.TrySetResult(true);
+ } catch (Exception ex) {
+ _isUsable.TrySetException(ex);
+ _onException(ex);
+ }
- var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource());
+ return;
- try {
- foreach (var appendRequest in GetRequests()) {
- await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
+ async Task Send() {
+ if (_call == null) return;
+
+ await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken)
+ .ConfigureAwait(false)) {
+ await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);
}
- } catch (ChannelClosedException ex) {
- // channel is closed, our tcs won't necessarily get completed, don't wait for it.
- throw ex.InnerException ?? ex;
+
+ await _call.RequestStream.CompleteAsync().ConfigureAwait(false);
}
- return await complete.Task.ConfigureAwait(false);
+ async Task Receive() {
+ if (_call == null) return;
+
+ try {
+ await foreach (var response in _call.ResponseStream.ReadAllAsync(_cancellationToken)
+ .ConfigureAwait(false)) {
+ if (!_pendingRequests.TryRemove(
+ Uuid.FromDto(response.CorrelationId),
+ out var writeResult
+ )) {
+ continue; // TODO: Log?
+ }
- IEnumerable GetRequests() {
- bool first = true;
- var proposedMessages = new List();
- foreach (var @event in events) {
- var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
- Data = ByteString.CopyFrom(@event.Data.Span),
- CustomMetadata = ByteString.CopyFrom(@event.Metadata.Span),
- Id = @event.EventId.ToDto(),
- Metadata = {
- {Constants.Metadata.Type, @event.Type},
- {Constants.Metadata.ContentType, @event.ContentType}
+ try {
+ writeResult.TrySetResult(response.ToWriteResult());
+ } catch (Exception ex) {
+ writeResult.TrySetException(ex);
}
- };
+ }
+ } catch (Exception ex) {
+ // signal that no tcs added to _pendingRequests after this point will necessarily complete
+ _channel.Writer.TryComplete(ex);
- proposedMessages.Add(proposedMessage);
+ // complete whatever tcs's we have
+ foreach (var request in _pendingRequests)
+ request.Value.TrySetException(ex);
- if ((batchSize += proposedMessage.CalculateSize()) <
- _settings.OperationOptions.BatchAppendSize) {
- continue;
+ _onException(ex);
+ }
+ }
+ }
+
+ private IEnumerable GetRequests(
+ IEnumerable events, BatchAppendReq.Types.Options options, Uuid correlationId
+ ) {
+ var batchSize = 0;
+ bool first = true;
+ var correlationIdDto = correlationId.ToDto();
+ var proposedMessages = new List();
+
+ foreach (var @event in events) {
+ var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
+ Data = ByteString.CopyFrom(@event.Data.Span),
+ CustomMetadata = ByteString.CopyFrom(@event.Metadata.InjectTracingMetadata()),
+ Id = @event.EventId.ToDto(),
+ Metadata = {
+ { Constants.Metadata.Type, @event.Type },
+ { Constants.Metadata.ContentType, @event.ContentType }
}
+ };
+
+ proposedMessages.Add(proposedMessage);
- yield return new BatchAppendReq {
- ProposedMessages = {proposedMessages},
- CorrelationId = correlationIdDto,
- Options = first ? options : null
- };
- first = false;
- proposedMessages.Clear();
- batchSize = 0;
+ if ((batchSize += proposedMessage.CalculateSize()) <
+ _settings.OperationOptions.BatchAppendSize) {
+ continue;
}
yield return new BatchAppendReq {
- ProposedMessages = {proposedMessages},
- IsFinal = true,
+ ProposedMessages = { proposedMessages },
CorrelationId = correlationIdDto,
Options = first ? options : null
};
+
+ first = false;
+ proposedMessages.Clear();
+ batchSize = 0;
}
+
+ yield return new BatchAppendReq {
+ ProposedMessages = { proposedMessages },
+ IsFinal = true,
+ CorrelationId = correlationIdDto,
+ Options = first ? options : null
+ };
}
public void Dispose() {
_channel.Writer.TryComplete();
+ _call?.Dispose();
}
}
}
diff --git a/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs b/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
index 6581bd94b..19de629e7 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
@@ -97,7 +97,7 @@ private async Task SetStreamMetadataInternal(StreamMetadata metada
CancellationToken cancellationToken) {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
- return await AppendToStreamInternal(channelInfo.CallInvoker, appendReq, new[] {
+ return await AppendToStreamInternal(channelInfo, appendReq, new[] {
new EventData(Uuid.NewUuid(), SystemEventTypes.StreamMetadata,
JsonSerializer.SerializeToUtf8Bytes(metadata, StreamMetadataJsonSerializerOptions)),
}, operationOptions, deadline, userCredentials, cancellationToken).ConfigureAwait(false);
diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
index f82bd5d07..7a2f7fdbc 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
@@ -1,4 +1,5 @@
using System.Threading.Channels;
+using EventStore.Client.Diagnostics;
using EventStore.Client.Streams;
using Grpc.Core;
using static EventStore.Client.Streams.ReadResp.ContentOneofCase;
@@ -24,10 +25,15 @@ public Task SubscribeToAllAsync(
Action? subscriptionDropped = default,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) => StreamSubscription.Confirm(
+ CancellationToken cancellationToken = default
+ ) => StreamSubscription.Confirm(
SubscribeToAll(start, resolveLinkTos, filterOptions, userCredentials, cancellationToken),
- eventAppeared, subscriptionDropped, _log, filterOptions?.CheckpointReached,
- cancellationToken: cancellationToken);
+ eventAppeared,
+ subscriptionDropped,
+ _log,
+ filterOptions?.CheckpointReached,
+ cancellationToken: cancellationToken
+ );
///
/// Subscribes to all events.
@@ -43,19 +49,23 @@ public StreamSubscriptionResult SubscribeToAll(
bool resolveLinkTos = false,
SubscriptionFilterOptions? filterOptions = null,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) => new(async _ => {
- var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
- return channelInfo.CallInvoker;
- }, new ReadReq {
- Options = new ReadReq.Types.Options {
- ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
- ResolveLinks = resolveLinkTos,
- All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
- Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
- Filter = GetFilterOptions(filterOptions)!,
- UuidOption = new() { Structured = new() }
- }
- }, Settings, userCredentials, cancellationToken);
+ CancellationToken cancellationToken = default
+ ) => new(
+ async _ => await GetChannelInfo(cancellationToken).ConfigureAwait(false),
+ new ReadReq {
+ Options = new ReadReq.Types.Options {
+ ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
+ ResolveLinks = resolveLinkTos,
+ All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
+ Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
+ Filter = GetFilterOptions(filterOptions)!,
+ UuidOption = new() { Structured = new() }
+ }
+ },
+ Settings,
+ userCredentials,
+ cancellationToken
+ );
///
/// Subscribes to a stream from a checkpoint.
@@ -69,15 +79,21 @@ public StreamSubscriptionResult SubscribeToAll(
/// The optional .
///
[Obsolete("SubscribeToStreamAsync is no longer supported. Use SubscribeToStream instead.", false)]
- public Task SubscribeToStreamAsync(string streamName,
- FromStream start,
- Func eventAppeared,
- bool resolveLinkTos = false,
- Action? subscriptionDropped = default,
- UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) => StreamSubscription.Confirm(
+ public Task SubscribeToStreamAsync(
+ string streamName,
+ FromStream start,
+ Func eventAppeared,
+ bool resolveLinkTos = false,
+ Action? subscriptionDropped = default,
+ UserCredentials? userCredentials = null,
+ CancellationToken cancellationToken = default
+ ) => StreamSubscription.Confirm(
SubscribeToStream(streamName, start, resolveLinkTos, userCredentials, cancellationToken),
- eventAppeared, subscriptionDropped, _log, cancellationToken: cancellationToken);
+ eventAppeared,
+ subscriptionDropped,
+ _log,
+ cancellationToken: cancellationToken
+ );
///
/// Subscribes to a stream from a checkpoint.
@@ -93,28 +109,33 @@ public StreamSubscriptionResult SubscribeToStream(
FromStream start,
bool resolveLinkTos = false,
UserCredentials? userCredentials = null,
- CancellationToken cancellationToken = default) => new(async _ => {
- var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
- return channelInfo.CallInvoker;
- }, new ReadReq {
- Options = new ReadReq.Types.Options {
- ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
- ResolveLinks = resolveLinkTos,
- Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
- Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
- UuidOption = new() { Structured = new() }
- }
- }, Settings, userCredentials, cancellationToken);
+ CancellationToken cancellationToken = default
+ ) => new(
+ async _ => await GetChannelInfo(cancellationToken).ConfigureAwait(false),
+ new ReadReq {
+ Options = new ReadReq.Types.Options {
+ ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
+ ResolveLinks = resolveLinkTos,
+ Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
+ Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
+ UuidOption = new() { Structured = new() }
+ }
+ },
+ Settings,
+ userCredentials,
+ cancellationToken
+ );
///
/// A class that represents the result of a subscription operation. You may either enumerate this instance directly or . Do not enumerate more than once.
///
public class StreamSubscriptionResult : IAsyncEnumerable, IAsyncDisposable, IDisposable {
- private readonly ReadReq _request;
- private readonly Channel _channel;
- private readonly CancellationTokenSource _cts;
- private readonly CallOptions _callOptions;
- private AsyncServerStreamingCall? _call;
+ private readonly ReadReq _request;
+ private readonly Channel _channel;
+ private readonly CancellationTokenSource _cts;
+ private readonly CallOptions _callOptions;
+ private readonly EventStoreClientSettings _settings;
+ private AsyncServerStreamingCall? _call;
private int _messagesEnumerated;
@@ -150,12 +171,18 @@ async IAsyncEnumerable GetMessages() {
}
}
- internal StreamSubscriptionResult(Func> selectCallInvoker,
+ internal StreamSubscriptionResult(
+ Func> selectChannelInfo,
ReadReq request, EventStoreClientSettings settings, UserCredentials? userCredentials,
- CancellationToken cancellationToken) {
- _request = request;
- _callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials,
- cancellationToken: cancellationToken);
+ CancellationToken cancellationToken
+ ) {
+ _request = request;
+ _settings = settings;
+ _callOptions = EventStoreCallOptions.CreateStreaming(
+ settings,
+ userCredentials: userCredentials,
+ cancellationToken: cancellationToken
+ );
_channel = Channel.CreateBounded(ReadBoundedChannelOptions);
@@ -171,29 +198,53 @@ internal StreamSubscriptionResult(Func> sel
async Task PumpMessages() {
try {
- var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false);
- var client = new Streams.Streams.StreamsClient(callInvoker);
+ var channelInfo = await selectChannelInfo(_cts.Token).ConfigureAwait(false);
+ var client = new Streams.Streams.StreamsClient(channelInfo.CallInvoker);
_call = client.Read(_request, _callOptions);
await foreach (var response in _call.ResponseStream.ReadAllAsync(_cts.Token)
.ConfigureAwait(false)) {
- await _channel.Writer.WriteAsync(response.ContentCase switch {
- Confirmation => new StreamMessage.SubscriptionConfirmation(
- response.Confirmation.SubscriptionId),
- Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
- FirstStreamPosition => new StreamMessage.FirstStreamPosition(
- new StreamPosition(response.FirstStreamPosition)),
- LastStreamPosition => new StreamMessage.LastStreamPosition(
- new StreamPosition(response.LastStreamPosition)),
- LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
- new Position(response.LastAllStreamPosition.CommitPosition,
- response.LastAllStreamPosition.PreparePosition)),
- Checkpoint => new StreamMessage.AllStreamCheckpointReached(
- new Position(response.Checkpoint.CommitPosition,
- response.Checkpoint.PreparePosition)),
- CaughtUp => StreamMessage.CaughtUp.Instance,
- FellBehind => StreamMessage.FellBehind.Instance,
- _ => StreamMessage.Unknown.Instance
- }, _cts.Token).ConfigureAwait(false);
+ StreamMessage subscriptionMessage =
+ response.ContentCase switch {
+ Confirmation => new StreamMessage.SubscriptionConfirmation(
+ response.Confirmation.SubscriptionId
+ ),
+ Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
+ FirstStreamPosition => new StreamMessage.FirstStreamPosition(
+ new StreamPosition(response.FirstStreamPosition)
+ ),
+ LastStreamPosition => new StreamMessage.LastStreamPosition(
+ new StreamPosition(response.LastStreamPosition)
+ ),
+ LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
+ new Position(
+ response.LastAllStreamPosition.CommitPosition,
+ response.LastAllStreamPosition.PreparePosition
+ )
+ ),
+ Checkpoint => new StreamMessage.AllStreamCheckpointReached(
+ new Position(
+ response.Checkpoint.CommitPosition,
+ response.Checkpoint.PreparePosition
+ )
+ ),
+ CaughtUp => StreamMessage.CaughtUp.Instance,
+ FellBehind => StreamMessage.FellBehind.Instance,
+ _ => StreamMessage.Unknown.Instance
+ };
+
+ if (subscriptionMessage is StreamMessage.Event evnt)
+ EventStoreClientDiagnostics.TraceSubscriptionEvent(
+ SubscriptionId,
+ evnt.ResolvedEvent,
+ channelInfo,
+ _settings,
+ userCredentials
+ );
+
+ await _channel.Writer.WriteAsync(
+ subscriptionMessage,
+ _cts.Token
+ ).ConfigureAwait(false);
}
_channel.Writer.Complete();
@@ -214,9 +265,11 @@ static async ValueTask CastAndDispose(IDisposable? resource) {
switch (resource) {
case null:
return;
+
case IAsyncDisposable resourceAsyncDisposable:
await resourceAsyncDisposable.DisposeAsync().ConfigureAwait(false);
break;
+
default:
resource.Dispose();
break;
@@ -232,7 +285,8 @@ public void Dispose() {
///
public async IAsyncEnumerator GetAsyncEnumerator(
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
try {
await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken)
.ConfigureAwait(false)) {
diff --git a/src/EventStore.Client.Streams/EventStoreClient.cs b/src/EventStore.Client.Streams/EventStoreClient.cs
index 361e6e2d4..700df3c3f 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.cs
@@ -1,6 +1,5 @@
using System.Text.Json;
using System.Threading.Channels;
-using EventStore.Client.Streams;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@@ -12,32 +11,48 @@ namespace EventStore.Client {
/// The client used for operations on streams.
///
public sealed partial class EventStoreClient : EventStoreClientBase {
-
private static readonly JsonSerializerOptions StreamMetadataJsonSerializerOptions = new() {
Converters = {
StreamMetadataJsonConverter.Instance
},
};
- private static BoundedChannelOptions ReadBoundedChannelOptions = new (1) {
- SingleReader = true,
- SingleWriter = true,
+ private static BoundedChannelOptions ReadBoundedChannelOptions = new(1) {
+ SingleReader = true,
+ SingleWriter = true,
AllowSynchronousContinuations = true
};
-
- private readonly ILogger _log;
- private Lazy _streamAppenderLazy;
- private StreamAppender _streamAppender => _streamAppenderLazy.Value;
- private readonly CancellationTokenSource _disposedTokenSource;
+ private readonly ILogger _log;
+ private Lazy _batchAppenderLazy;
+ private StreamAppender _batchAppender => _batchAppenderLazy.Value;
+ private readonly CancellationTokenSource _disposedTokenSource;
private static readonly Dictionary> ExceptionMap = new() {
[Constants.Exceptions.InvalidTransaction] = ex => new InvalidTransactionException(ex.Message, ex),
- [Constants.Exceptions.StreamDeleted] = ex => new StreamDeletedException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value ?? "", ex),
- [Constants.Exceptions.WrongExpectedVersion] = ex => new WrongExpectedVersionException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, ex.Trailers.GetStreamRevision(Constants.Exceptions.ExpectedVersion), ex.Trailers.GetStreamRevision(Constants.Exceptions.ActualVersion), ex, ex.Message),
- [Constants.Exceptions.MaximumAppendSizeExceeded] = ex => new MaximumAppendSizeExceededException(ex.Trailers.GetIntValueOrDefault(Constants.Exceptions.MaximumAppendSize), ex),
- [Constants.Exceptions.StreamNotFound] = ex => new StreamNotFoundException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!, ex),
- [Constants.Exceptions.MissingRequiredMetadataProperty] = ex => new RequiredMetadataPropertyMissingException(ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.MissingRequiredMetadataProperty)?.Value!, ex),
+ [Constants.Exceptions.StreamDeleted] = ex => new StreamDeletedException(
+ ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value ?? "",
+ ex
+ ),
+ [Constants.Exceptions.WrongExpectedVersion] = ex => new WrongExpectedVersionException(
+ ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!,
+ ex.Trailers.GetStreamRevision(Constants.Exceptions.ExpectedVersion),
+ ex.Trailers.GetStreamRevision(Constants.Exceptions.ActualVersion),
+ ex,
+ ex.Message
+ ),
+ [Constants.Exceptions.MaximumAppendSizeExceeded] = ex => new MaximumAppendSizeExceededException(
+ ex.Trailers.GetIntValueOrDefault(Constants.Exceptions.MaximumAppendSize),
+ ex
+ ),
+ [Constants.Exceptions.StreamNotFound] = ex => new StreamNotFoundException(
+ ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.StreamName)?.Value!,
+ ex
+ ),
+ [Constants.Exceptions.MissingRequiredMetadataProperty] = ex => new RequiredMetadataPropertyMissingException(
+ ex.Trailers.FirstOrDefault(x => x.Key == Constants.Exceptions.MissingRequiredMetadataProperty)?.Value!,
+ ex
+ ),
};
///
@@ -53,29 +68,24 @@ public EventStoreClient(IOptions options) : this(optio
public EventStoreClient(EventStoreClientSettings? settings = null) : base(settings, ExceptionMap) {
_log = Settings.LoggerFactory?.CreateLogger() ?? new NullLogger();
_disposedTokenSource = new CancellationTokenSource();
- _streamAppenderLazy = new Lazy(CreateStreamAppender);
+ _batchAppenderLazy = new Lazy(CreateStreamAppender);
}
private void SwapStreamAppender(Exception ex) =>
- Interlocked.Exchange(ref _streamAppenderLazy, new Lazy(CreateStreamAppender)).Value.Dispose();
+ Interlocked.Exchange(ref _batchAppenderLazy, new Lazy(CreateStreamAppender)).Value
+ .Dispose();
// todo: might be nice to have two different kinds of appenders and we decide which to instantiate according to the server caps.
- private StreamAppender CreateStreamAppender() {
- return new StreamAppender(Settings, GetCall(), _disposedTokenSource.Token, SwapStreamAppender);
-
- async Task?> GetCall() {
- var channelInfo = await GetChannelInfo(_disposedTokenSource.Token).ConfigureAwait(false);
- if (!channelInfo.ServerCapabilities.SupportsBatchAppend)
- return null;
-
- var client = new Streams.Streams.StreamsClient(channelInfo.CallInvoker);
-
- return client.BatchAppend(EventStoreCallOptions.CreateStreaming(Settings,
- userCredentials: Settings.DefaultCredentials, cancellationToken: _disposedTokenSource.Token));
- }
- }
-
- private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(IEventFilter? filter, uint checkpointInterval = 0) {
+ private StreamAppender CreateStreamAppender() => new StreamAppender(
+ Settings,
+ GetChannelInfo(_disposedTokenSource.Token),
+ _disposedTokenSource.Token,
+ SwapStreamAppender
+ );
+
+ private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
+ IEventFilter? filter, uint checkpointInterval = 0
+ ) {
if (filter == null) {
return null;
}
@@ -131,21 +141,25 @@ private StreamAppender CreateStreamAppender() {
return options;
}
- private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(SubscriptionFilterOptions? filterOptions)
+ private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
+ SubscriptionFilterOptions? filterOptions
+ )
=> filterOptions == null ? null : GetFilterOptions(filterOptions.Filter, filterOptions.CheckpointInterval);
///
public override void Dispose() {
- if (_streamAppenderLazy.IsValueCreated)
- _streamAppenderLazy.Value.Dispose();
+ if (_batchAppenderLazy.IsValueCreated)
+ _batchAppenderLazy.Value.Dispose();
+
_disposedTokenSource.Dispose();
base.Dispose();
}
///
public override async ValueTask DisposeAsync() {
- if (_streamAppenderLazy.IsValueCreated)
- _streamAppenderLazy.Value.Dispose();
+ if (_batchAppenderLazy.IsValueCreated)
+ _batchAppenderLazy.Value.Dispose();
+
_disposedTokenSource.Dispose();
await base.DisposeAsync().ConfigureAwait(false);
}
diff --git a/src/EventStore.Client.Streams/StreamSubscription.cs b/src/EventStore.Client.Streams/StreamSubscription.cs
index 9019eda64..c6b9e8ae6 100644
--- a/src/EventStore.Client.Streams/StreamSubscription.cs
+++ b/src/EventStore.Client.Streams/StreamSubscription.cs
@@ -7,26 +7,28 @@ namespace EventStore.Client {
///
[Obsolete]
public class StreamSubscription : IDisposable {
- private readonly EventStoreClient.StreamSubscriptionResult _subscription;
- private readonly IAsyncEnumerator _messages;
- private readonly Func _eventAppeared;
- private readonly Func _checkpointReached;
+ private readonly EventStoreClient.StreamSubscriptionResult _subscription;
+ private readonly IAsyncEnumerator _messages;
+ private readonly Func _eventAppeared;
+ private readonly Func _checkpointReached;
private readonly Action? _subscriptionDropped;
- private readonly ILogger _log;
- private readonly CancellationTokenSource _cts;
- private int _subscriptionDroppedInvoked;
+ private readonly ILogger _log;
+ private readonly CancellationTokenSource _cts;
+ private int _subscriptionDroppedInvoked;
///
/// The id of the set by the server.
///
public string SubscriptionId { get; }
- internal static async Task Confirm(EventStoreClient.StreamSubscriptionResult subscription,
+ internal static async Task Confirm(
+ EventStoreClient.StreamSubscriptionResult subscription,
Func eventAppeared,
Action? subscriptionDropped,
ILogger log,
Func? checkpointReached = null,
- CancellationToken cancellationToken = default) {
+ CancellationToken cancellationToken = default
+ ) {
var messages = subscription.Messages;
var enumerator = messages.GetAsyncEnumerator(cancellationToken);
@@ -35,26 +37,36 @@ enumerator.Current is not StreamMessage.SubscriptionConfirmation(var subscriptio
throw new InvalidOperationException($"Subscription to {enumerator} could not be confirmed.");
}
- return new StreamSubscription(subscription, enumerator, subscriptionId, eventAppeared, subscriptionDropped,
- log, checkpointReached, cancellationToken);
+ return new StreamSubscription(
+ subscription,
+ enumerator,
+ subscriptionId,
+ eventAppeared,
+ subscriptionDropped,
+ log,
+ checkpointReached,
+ cancellationToken
+ );
}
- private StreamSubscription(EventStoreClient.StreamSubscriptionResult subscription,
+ private StreamSubscription(
+ EventStoreClient.StreamSubscriptionResult subscription,
IAsyncEnumerator messages, string subscriptionId,
Func eventAppeared,
Action? subscriptionDropped,
ILogger log,
Func? checkpointReached,
- CancellationToken cancellationToken = default) {
- _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
- _subscription = subscription;
- _messages = messages;
- _eventAppeared = eventAppeared;
- _checkpointReached = checkpointReached ?? ((_, _, _) => Task.CompletedTask);
- _subscriptionDropped = subscriptionDropped;
- _log = log;
+ CancellationToken cancellationToken = default
+ ) {
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ _subscription = subscription;
+ _messages = messages;
+ _eventAppeared = eventAppeared;
+ _checkpointReached = checkpointReached ?? ((_, _, _) => Task.CompletedTask);
+ _subscriptionDropped = subscriptionDropped;
+ _log = log;
_subscriptionDroppedInvoked = 0;
- SubscriptionId = subscriptionId;
+ SubscriptionId = subscriptionId;
_log.LogDebug("Subscription {subscriptionId} confirmed.", SubscriptionId);
@@ -77,11 +89,14 @@ private async Task Subscribe() {
resolvedEvent.OriginalEvent.EventNumber,
resolvedEvent.OriginalEvent.Position
);
+
await _eventAppeared(this, resolvedEvent, _cts.Token).ConfigureAwait(false);
break;
+
case StreamMessage.AllStreamCheckpointReached (var position):
await _checkpointReached(this, position, _cts.Token)
.ConfigureAwait(false);
+
break;
}
} catch (Exception ex) when
@@ -105,6 +120,7 @@ await _checkpointReached(this, position, _cts.Token)
"Subscription {subscriptionId} was dropped because the subscriber made an error.",
SubscriptionId
);
+
SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex);
return;
@@ -114,13 +130,18 @@ await _checkpointReached(this, position, _cts.Token)
ex.Status.Detail.Contains("Call canceled by the client.")) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
- SubscriptionId);
+ SubscriptionId
+ );
+
SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
} catch (Exception ex) {
if (_subscriptionDroppedInvoked == 0) {
- _log.LogError(ex,
+ _log.LogError(
+ ex,
"Subscription {subscriptionId} was dropped because an error occurred on the server.",
- SubscriptionId);
+ SubscriptionId
+ );
+
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
}
}
diff --git a/src/EventStore.Client/Diagnostics/ActivityExtensions.cs b/src/EventStore.Client/Diagnostics/ActivityExtensions.cs
new file mode 100644
index 000000000..564aa98fe
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/ActivityExtensions.cs
@@ -0,0 +1,45 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using EventStore.Client.Diagnostics.Telemetry;
+using EventStore.Client.Diagnostics.Tracing;
+
+namespace EventStore.Client.Diagnostics;
+
+static class ActivityExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static TracingMetadata GetTracingMetadata(this Activity activity)
+ => new(activity.TraceId.ToString(), activity.SpanId.ToString());
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void SetActivityStatus(this Activity activity, ActivityStatus activityStatus) {
+ var (activityStatusCode, description, exception) = activityStatus;
+
+ var statusCode = activityStatusCode switch {
+ ActivityStatusCode.Error => "ERROR",
+ ActivityStatusCode.Ok => "OK",
+ _ => "UNSET"
+ };
+
+ activity.SetStatus(activityStatus.StatusCode, description);
+ activity.SetTag(TelemetryAttributes.OtelStatusCode, statusCode);
+ activity.SetTag(TelemetryAttributes.OtelStatusDescription, description);
+
+ if (exception != null)
+ activity.SetException(exception);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static void SetException(this Activity activity, Exception exception) {
+ var tags = new ActivityTagsCollection {
+ { TelemetryAttributes.ExceptionType, exception.GetType().Name },
+ { TelemetryAttributes.ExceptionMessage, $"{exception.Message} {exception.InnerException?.Message}" },
+ { TelemetryAttributes.ExceptionStacktrace, exception.StackTrace }
+ };
+
+ foreach (var tag in tags) {
+ activity.SetTag(tag.Key, tag.Value);
+ }
+
+ activity.AddEvent(new ActivityEvent(TelemetryAttributes.ExceptionEventName, DateTimeOffset.Now, tags));
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/ActivityStatus.cs b/src/EventStore.Client/Diagnostics/ActivityStatus.cs
new file mode 100644
index 000000000..9470f7b59
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/ActivityStatus.cs
@@ -0,0 +1,11 @@
+using System.Diagnostics;
+
+namespace EventStore.Client.Diagnostics;
+
+record ActivityStatus(ActivityStatusCode StatusCode, string? Description, Exception? Exception) {
+ public static ActivityStatus Ok(string? description = null)
+ => new(ActivityStatusCode.Ok, description, null);
+
+ public static ActivityStatus Error(Exception ex, string? description = null)
+ => new(ActivityStatusCode.Error, description, ex);
+}
diff --git a/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs b/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs
new file mode 100644
index 000000000..e7efaf5f7
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/ActivityTagsCollectionExtensions.cs
@@ -0,0 +1,51 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using EventStore.Client.Diagnostics.Telemetry;
+
+namespace EventStore.Client.Diagnostics;
+
+static class ActivityTagsCollectionExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityTagsCollection WithTagsFrom(
+ this ActivityTagsCollection tags, ChannelInfo? channelInfo, EventStoreClientSettings settings
+ ) {
+ ActivityTagsCollection? serverTags = null;
+ if (settings.ConnectivitySettings.DnsGossipSeeds?.Length == 1) {
+ // Ensure consistent server.address attribute when connecting to cluster via dns discovery
+ var gossipSeed = settings.ConnectivitySettings.DnsGossipSeeds[0];
+ serverTags = CreateServerAttributes(gossipSeed.Host, gossipSeed.Port);
+ } else if (channelInfo != null) {
+ // Otherwise use the current gRPC channel target
+ var authorityParts = channelInfo.Channel.Target.Split(':');
+ serverTags = CreateServerAttributes(authorityParts[0], int.Parse(authorityParts[1]));
+ }
+
+ return tags.WithTags(serverTags).WithTagsFrom(settings.DefaultCredentials);
+
+ ActivityTagsCollection CreateServerAttributes(string? host, int? port) => new() {
+ { TelemetryAttributes.ServerAddress, host },
+ { TelemetryAttributes.ServerPort, port }
+ };
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal static ActivityTagsCollection WithTagsFrom(
+ this ActivityTagsCollection tags, UserCredentials? userCredentials
+ ) {
+ return tags.WithTag(TelemetryAttributes.DatabaseUser, userCredentials?.Username);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal static ActivityTagsCollection WithTags(this ActivityTagsCollection current, ActivityTagsCollection? tags)
+ => tags == null
+ ? current
+ : tags.Aggregate(current, (newTags, tag) => newTags.WithTag(tag.Key, tag.Value));
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static ActivityTagsCollection WithTag(this ActivityTagsCollection tags, string key, object? value) {
+ if (value != null)
+ tags[key] = value;
+
+ return tags;
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs b/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs
new file mode 100644
index 000000000..99f83c7e2
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/EventMetadataExtensions.cs
@@ -0,0 +1,84 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Text.Json;
+using EventStore.Client.Diagnostics.Tracing;
+
+namespace EventStore.Client.Diagnostics;
+
+static class EventMetadataExtensions {
+ // TODO JC: TEMPORARY WORKAROUND CODE TO USE CUSTOM METADATA UNTIL DATABASE CHANGES ARE READY
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ReadOnlySpan InjectTracingMetadata(
+ this ReadOnlyMemory rawCustomMetadata
+ ) {
+ if (Activity.Current == null) return rawCustomMetadata.Span;
+
+ try {
+ using var customMetadataJson = JsonDocument.Parse(rawCustomMetadata);
+ var tracingMetadata = Activity.Current.GetTracingMetadata();
+
+ using var stream = new MemoryStream();
+ using var writer = new Utf8JsonWriter(stream);
+
+ writer.WriteStartObject();
+
+ foreach (var prop in customMetadataJson.RootElement.EnumerateObject())
+ prop.WriteTo(writer);
+
+ writer.WriteIfNotNull(TracingConstants.Metadata.TraceId, tracingMetadata.TraceId)
+ .WriteIfNotNull(TracingConstants.Metadata.SpanId, tracingMetadata.SpanId);
+
+ writer.WriteEndObject();
+ writer.Flush();
+
+ return stream.ToArray();
+ } catch (Exception) {
+ return rawCustomMetadata.Span;
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityContext? RestoreTracingContext(this ReadOnlyMemory rawCustomMetadata) {
+ var (traceId, spanId) = rawCustomMetadata.ExtractTracingMetadata();
+
+ if (traceId == null || spanId == null)
+ return default;
+
+ try {
+ return new(
+ ActivityTraceId.CreateFromString(traceId.ToCharArray()),
+ ActivitySpanId.CreateFromString(spanId.ToCharArray()),
+ ActivityTraceFlags.Recorded,
+ isRemote: true
+ );
+ } catch (Exception) {
+ return default;
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory rawCustomMetadata) {
+ try {
+ using var customMetadataJson = JsonDocument.Parse(rawCustomMetadata);
+
+ return new TracingMetadata(
+ customMetadataJson.RootElement.GetProperty(TracingConstants.Metadata.TraceId).GetString(),
+ customMetadataJson.RootElement.GetProperty(TracingConstants.Metadata.SpanId).GetString()
+ );
+ } catch (Exception) {
+ return TracingMetadata.None();
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static Utf8JsonWriter WriteIfNotNull(
+ this Utf8JsonWriter jsonWriter, string key, string? value
+ ) {
+ if (string.IsNullOrEmpty(value)) return jsonWriter;
+
+ jsonWriter.WritePropertyName(key);
+ jsonWriter.WriteStringValue(value);
+
+ return jsonWriter;
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs b/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs
new file mode 100644
index 000000000..98e4b96e2
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/EventStoreClientDiagnostics.cs
@@ -0,0 +1,77 @@
+using System.Diagnostics;
+using EventStore.Client.Diagnostics.Telemetry;
+using EventStore.Client.Diagnostics.Tracing;
+
+namespace EventStore.Client.Diagnostics;
+
+static class EventStoreClientDiagnostics {
+ static readonly ActivitySource _activitySource =
+ new ActivitySource(EventStoreClientInstrumentation.ActivitySourceName);
+
+ static readonly ActivityTagsCollection _defaultTags = [new(TelemetryAttributes.DatabaseSystem, "eventstoredb")];
+
+ public static Activity? StartActivity(
+ string operation, ActivityTagsCollection? tags, ActivityKind activityKind = default,
+ ActivityContext? activityContext = null
+ ) {
+ var activity = _activitySource.CreateActivity(
+ operation,
+ activityKind,
+ parentContext: activityContext ?? default,
+ new ActivityTagsCollection {
+ { TelemetryAttributes.DatabaseOperation, operation }
+ }
+ .WithTags(_defaultTags)
+ .WithTags(tags),
+ idFormat: ActivityIdFormat.W3C
+ );
+
+ return activity?.Start();
+ }
+
+ public static async ValueTask TraceOperation(
+ Func> tracedOperation, string operationName, ActivityTagsCollection? tags = null
+ ) {
+ using var activity = StartActivity(operationName, tags, ActivityKind.Client);
+
+ try {
+ var res = await tracedOperation().ConfigureAwait(false);
+ activity?.SetActivityStatus(ActivityStatus.Ok());
+ return res;
+ } catch (Exception ex) {
+ activity?.SetActivityStatus(ActivityStatus.Error(ex));
+ throw;
+ }
+ }
+
+ public static void TraceSubscriptionEvent(
+ string? subscriptionId, ResolvedEvent evnt, ChannelInfo channelInfo,
+ EventStoreClientSettings settings, UserCredentials? userCredentials
+ ) {
+ var restoredTracingContext =
+ evnt.OriginalEvent.Metadata.RestoreTracingContext();
+
+ if (restoredTracingContext != null)
+ StartActivity(
+ TracingConstants.Operations.Subscribe,
+ new ActivityTagsCollection {
+ {
+ TelemetryAttributes.EventStoreStream,
+ evnt.OriginalEvent.EventStreamId
+ }, {
+ TelemetryAttributes.EventStoreSubscriptionId,
+ subscriptionId
+ }, {
+ TelemetryAttributes.EventStoreEventId,
+ evnt.OriginalEvent.EventId.ToString()
+ }, {
+ TelemetryAttributes.EventStoreEventType,
+ evnt.OriginalEvent.EventType
+ }
+ }.WithTagsFrom(channelInfo, settings)
+ .WithTagsFrom(userCredentials),
+ ActivityKind.Consumer,
+ restoredTracingContext
+ )?.Dispose();
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs b/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs
new file mode 100644
index 000000000..7ae9b6ba1
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/EventStoreClientInstrumentation.cs
@@ -0,0 +1,5 @@
+namespace EventStore.Client.Diagnostics;
+
+static class EventStoreClientInstrumentation {
+ public const string ActivitySourceName = "eventstoredb.client";
+}
diff --git a/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs b/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs
new file mode 100644
index 000000000..5b63d4e36
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/Telemetry/TelemetryAttributes.cs
@@ -0,0 +1,29 @@
+namespace EventStore.Client.Diagnostics.Telemetry;
+
+// The attributes below match the specification of v1.24.0 of the Open Telemetry semantic conventions.
+// Some attributes are ignored where not required or relevant.
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/general/trace.md
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/database/database-spans.md
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/exceptions/exceptions-spans.md
+static class TelemetryAttributes {
+ public const string DatabaseUser = "db.user";
+ public const string DatabaseSystem = "db.system";
+ public const string DatabaseOperation = "db.operation";
+
+ public const string ServerAddress = "server.address";
+ public const string ServerPort = "server.port";
+
+ public const string ExceptionEventName = "exception";
+ public const string ExceptionType = "exception.type";
+ public const string ExceptionMessage = "exception.message";
+ public const string ExceptionStacktrace = "exception.stacktrace";
+
+ public const string OtelStatusCode = "otel.status_code";
+ public const string OtelStatusDescription = "otel.status_description";
+
+ // Custom attributes
+ public const string EventStoreStream = "db.eventstoredb.stream";
+ public const string EventStoreSubscriptionId = "db.eventstoredb.subscription.id";
+ public const string EventStoreEventId = "db.eventstoredb.event.id";
+ public const string EventStoreEventType = "db.eventstoredb.event.type";
+}
diff --git a/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs b/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs
new file mode 100644
index 000000000..2c2d22cf3
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/Tracing/TracingConstants.cs
@@ -0,0 +1,13 @@
+namespace EventStore.Client.Diagnostics.Tracing;
+
+static class TracingConstants {
+ public static class Metadata {
+ public const string TraceId = "$traceId";
+ public const string SpanId = "$spanId";
+ }
+
+ public static class Operations {
+ public const string Append = "streams.append";
+ public const string Subscribe = "streams.subscribe";
+ }
+}
diff --git a/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs b/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs
new file mode 100644
index 000000000..f907cc49d
--- /dev/null
+++ b/src/EventStore.Client/Diagnostics/Tracing/TracingMetadata.cs
@@ -0,0 +1,5 @@
+namespace EventStore.Client.Diagnostics.Tracing;
+
+record TracingMetadata(string? TraceId, string? SpanId) {
+ public static TracingMetadata None() => new(null, null);
+}
diff --git a/src/EventStore.Client/EventStore.Client.csproj b/src/EventStore.Client/EventStore.Client.csproj
index ddf36c6af..8c9ffaaf2 100644
--- a/src/EventStore.Client/EventStore.Client.csproj
+++ b/src/EventStore.Client/EventStore.Client.csproj
@@ -8,6 +8,7 @@
+
diff --git a/src/EventStore.Client/EventStoreClientBase.cs b/src/EventStore.Client/EventStoreClientBase.cs
index 39f579fcc..2b4a5b2f6 100644
--- a/src/EventStore.Client/EventStoreClientBase.cs
+++ b/src/EventStore.Client/EventStoreClientBase.cs
@@ -1,10 +1,7 @@
-using System;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
using EventStore.Client.Interceptors;
using Grpc.Core;
using Grpc.Core.Interceptors;
+using Enum = System.Enum;
namespace EventStore.Client {
///
@@ -13,28 +10,29 @@ namespace EventStore.Client {
public abstract class EventStoreClientBase :
IDisposable, // for grpc.net we can dispose synchronously, but not for grpc.core
IAsyncDisposable {
-
- private readonly Dictionary> _exceptionMap;
- private readonly CancellationTokenSource _cts;
- private readonly ChannelCache _channelCache;
+ private readonly Dictionary> _exceptionMap;
+ private readonly CancellationTokenSource _cts;
+ private readonly ChannelCache _channelCache;
private readonly SharingProvider _channelInfoProvider;
- private readonly Lazy _httpFallback;
-
+ private readonly Lazy _httpFallback;
+
/// The name of the connection.
public string ConnectionName { get; }
-
+
/// The .
protected EventStoreClientSettings Settings { get; }
/// Constructs a new .
- protected EventStoreClientBase(EventStoreClientSettings? settings,
- Dictionary> exceptionMap) {
- Settings = settings ?? new EventStoreClientSettings();
+ protected EventStoreClientBase(
+ EventStoreClientSettings? settings,
+ Dictionary> exceptionMap
+ ) {
+ Settings = settings ?? new EventStoreClientSettings();
_exceptionMap = exceptionMap;
- _cts = new CancellationTokenSource();
+ _cts = new CancellationTokenSource();
_channelCache = new(Settings);
_httpFallback = new Lazy(() => new HttpFallback(Settings));
-
+
ConnectionName = Settings.ConnectionName ?? $"ES-{Guid.NewGuid()}";
var channelSelector = new ChannelSelector(Settings, _channelCache);
@@ -43,17 +41,18 @@ protected EventStoreClientBase(EventStoreClientSettings? settings,
GetChannelInfoExpensive(endPoint, onBroken, channelSelector, _cts.Token),
factoryRetryDelay: Settings.ConnectivitySettings.DiscoveryInterval,
initialInput: ReconnectionRequired.Rediscover.Instance,
- loggerFactory: Settings.LoggerFactory);
+ loggerFactory: Settings.LoggerFactory
+ );
}
-
+
// Select a channel and query its capabilities. This is an expensive call that
// we don't want to do often.
private async Task GetChannelInfoExpensive(
ReconnectionRequired reconnectionRequired,
Action onReconnectionRequired,
IChannelSelector channelSelector,
- CancellationToken cancellationToken) {
-
+ CancellationToken cancellationToken
+ ) {
var channel = reconnectionRequired switch {
ReconnectionRequired.Rediscover => await channelSelector.SelectChannelAsync(cancellationToken)
.ConfigureAwait(false),
@@ -78,11 +77,10 @@ private async Task GetChannelInfoExpensive(
return new(channel, caps, invoker);
}
-
+
/// Gets the current channel info.
protected async ValueTask GetChannelInfo(CancellationToken cancellationToken) =>
await _channelInfoProvider.CurrentAsync.WithCancellation(cancellationToken).ConfigureAwait(false);
-
///
/// Only exists so that we can manually trigger rediscovery in the tests
@@ -95,20 +93,30 @@ internal Task RediscoverAsync() {
}
/// Returns the result of an HTTP Get request based on the client settings.
- protected async Task HttpGet(string path, Action onNotFound, ChannelInfo channelInfo,
- TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken) {
-
+ protected async Task HttpGet(
+ string path, Action onNotFound, ChannelInfo channelInfo,
+ TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken
+ ) {
return await _httpFallback.Value
.HttpGetAsync(path, channelInfo, deadline, userCredentials, onNotFound, cancellationToken)
.ConfigureAwait(false);
}
/// Executes an HTTP Post request based on the client settings.
- protected async Task HttpPost(string path, string query, Action onNotFound, ChannelInfo channelInfo,
- TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken) {
-
+ protected async Task HttpPost(
+ string path, string query, Action onNotFound, ChannelInfo channelInfo,
+ TimeSpan? deadline, UserCredentials? userCredentials, CancellationToken cancellationToken
+ ) {
await _httpFallback.Value
- .HttpPostAsync(path, query, channelInfo, deadline, userCredentials, onNotFound, cancellationToken)
+ .HttpPostAsync(
+ path,
+ query,
+ channelInfo,
+ deadline,
+ userCredentials,
+ onNotFound,
+ cancellationToken
+ )
.ConfigureAwait(false);
}
@@ -118,7 +126,7 @@ public virtual void Dispose() {
_cts.Cancel();
_cts.Dispose();
_channelCache.Dispose();
-
+
if (_httpFallback.IsValueCreated) {
_httpFallback.Value.Dispose();
}
diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs
new file mode 100644
index 000000000..597bd7401
--- /dev/null
+++ b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/PersistentSubscriptionsTracingInstrumentationTests.cs
@@ -0,0 +1,74 @@
+using EventStore.Client.Diagnostics.Tracing;
+
+namespace EventStore.Client.PersistentSubscriptions.Tests.Diagnostics;
+
+[Trait("Category", "Diagnostics:Tracing")]
+public class PersistentSubscriptionsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture)
+ : EventStoreTests(output, fixture) {
+ [Fact]
+ public async Task PersistentSubscriptionIsInstrumentedWithTracingAndRestoresRemoteAppendContextAsExpected() {
+ var stream = Fixture.GetStreamName();
+ var events = Fixture.CreateTestEventsWithWorkaroundCustomMetadata(2).ToArray();
+
+ var groupName = $"{stream}-group";
+ await Fixture.Subscriptions.CreateToStreamAsync(
+ stream,
+ groupName,
+ new()
+ );
+
+ await Fixture.Streams.AppendToStreamAsync(
+ stream,
+ StreamState.NoStream,
+ events
+ );
+
+ string? subscriptionId = null;
+ await Subscribe().WithTimeout();
+
+ var appendActivity = Fixture
+ .GetActivitiesForOperation(TracingConstants.Operations.Append, stream)
+ .SingleOrDefault()
+ .ShouldNotBeNull();
+
+ var subscribeActivities = Fixture
+ .GetActivitiesForOperation(TracingConstants.Operations.Subscribe, stream)
+ .ToArray();
+
+ subscriptionId.ShouldNotBeNull();
+ subscribeActivities.Length.ShouldBe(events.Length);
+
+ for (var i = 0; i < subscribeActivities.Length; i++) {
+ subscribeActivities[i].TraceId.ShouldBe(appendActivity.Context.TraceId);
+ subscribeActivities[i].ParentSpanId.ShouldBe(appendActivity.Context.SpanId);
+ subscribeActivities[i].HasRemoteParent.ShouldBeTrue();
+
+ Fixture.AssertSubscriptionActivityHasExpectedTags(
+ subscribeActivities[i],
+ stream,
+ events[i].EventId.ToString(),
+ subscriptionId
+ );
+ }
+
+ return;
+
+ async Task Subscribe() {
+ await using var subscription = Fixture.Subscriptions.SubscribeToStream(stream, groupName);
+ await using var enumerator = subscription.Messages.GetAsyncEnumerator();
+
+ int eventsAppeared = 0;
+ while (await enumerator.MoveNextAsync()) {
+ if (enumerator.Current is PersistentSubscriptionMessage.SubscriptionConfirmation(var sid))
+ subscriptionId = sid;
+
+ if (enumerator.Current is not PersistentSubscriptionMessage.Event(_, _))
+ continue;
+
+ eventsAppeared++;
+ if (eventsAppeared >= events.Length)
+ return;
+ }
+ }
+ }
+}
diff --git a/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/TracingInstrumentationTests.cs b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/TracingInstrumentationTests.cs
new file mode 100644
index 000000000..1868aef48
--- /dev/null
+++ b/test/EventStore.Client.PersistentSubscriptions.Tests/Diagnostics/TracingInstrumentationTests.cs
@@ -0,0 +1,10 @@
+using System.Diagnostics;
+using EventStore.Client.Diagnostics.Telemetry;
+using EventStore.Client.Diagnostics.Tracing;
+
+namespace EventStore.Client.PersistentSubscriptions.Tests.Diagnostics;
+
+[Trait("Category", "Diagnostics:Tracing")]
+public class TracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture)
+ : EventStoreTests(output, fixture) {
+}
diff --git a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
index 989a6b6bf..6cd5b813d 100644
--- a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
+++ b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs
@@ -5,7 +5,8 @@ namespace EventStore.Client.Streams.Tests.Append;
[Trait("Category", "Target:Stream")]
[Trait("Category", "Operation:Append")]
-public class append_to_stream(ITestOutputHelper output, EventStoreFixture fixture) : EventStoreTests(output, fixture) {
+public class append_to_stream(ITestOutputHelper output, EventStoreFixture fixture)
+ : EventStoreTests(output, fixture) {
public static IEnumerable