Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
josephcummings committed Mar 27, 2024
1 parent b4667b6 commit 42b7030
Show file tree
Hide file tree
Showing 16 changed files with 534 additions and 122 deletions.
10 changes: 5 additions & 5 deletions src/EventStore.Client.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public static class Exceptions {
}

public static class Metadata {
public const string Type = "type";
public const string Created = "created";
public const string ContentType = "content-type";
public static readonly string[] RequiredMetadata = { Type, ContentType };
public const string Type = "type";
public const string Created = "created";
public const string ContentType = "content-type";
public static readonly string[] RequiredMetadata = { Type, ContentType };

public static class ContentTypes {
public const string ApplicationJson = "application/json";
Expand All @@ -58,4 +58,4 @@ public static class Headers {
public const string ConnectionName = "connection-name";
public const string RequiresLeader = "requires-leader";
}
}
}
167 changes: 116 additions & 51 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Text.Json;
using System.Threading.Channels;
using Google.Protobuf;
using EventStore.Client.Streams;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using System.Runtime.CompilerServices;
using EventStore.Client.Diagnostics;
using EventStore.Client.Diagnostics.OpenTelemetry;
using EventStore.Client.Diagnostics.Tracing;

namespace EventStore.Client {
public partial class EventStoreClient {
Expand All @@ -30,7 +30,8 @@ public async Task<IWriteResult> AppendToStreamAsync(
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
CancellationToken cancellationToken = default
) {
var options = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(options);

Expand All @@ -41,13 +42,26 @@ public async Task<IWriteResult> AppendToStreamAsync(
userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false)
? batchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
(await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker,
await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new AppendReq.Types.Options {
StreamIdentifier = streamName,
Revision = expectedRevision
}
}, eventData, options, deadline, userCredentials, cancellationToken);
},
eventData,
options,
deadline,
userCredentials,
cancellationToken
).Trace(
TracingConstants.Operations.Append,
new ActivityTagsCollection {
{ SemanticAttributes.EventStoreStream, streamName }
}
.WithTagsFrom(Settings)
.WithTagsFrom(userCredentials)
);

return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(options);
}
Expand All @@ -70,56 +84,72 @@ public async Task<IWriteResult> AppendToStreamAsync(
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
CancellationToken cancellationToken = default
) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

_log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedState);
_log.LogDebug("Append to stream - {streamName}@{expectedState}.", streamName, expectedState);

var batchAppender = _streamAppender;
var task =
userCredentials == null && await batchAppender.IsUsable().ConfigureAwait(false)
? batchAppender.Append(streamName, expectedState, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
(await GetChannelInfo(cancellationToken).ConfigureAwait(false)).CallInvoker,
await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new AppendReq.Types.Options {
StreamIdentifier = streamName
}
}.WithAnyStreamRevision(expectedState), eventData, operationOptions, deadline, userCredentials,
cancellationToken);
}.WithAnyStreamRevision(expectedState),
eventData,
operationOptions,
deadline,
userCredentials,
cancellationToken
).Trace(
TracingConstants.Operations.Append,
new ActivityTagsCollection {
{ SemanticAttributes.EventStoreStream, streamName }
}
.WithTagsFrom(Settings)
.WithTagsFrom(userCredentials)
);

return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(operationOptions);
}

private async ValueTask<IWriteResult> AppendToStreamInternal(
CallInvoker callInvoker,
ChannelInfo channelInfo,
AppendReq header,
IEnumerable<EventData> eventData,
EventStoreClientOperationOptions operationOptions,
TimeSpan? deadline,
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
using var call = new Streams.Streams.StreamsClient(channelInfo.CallInvoker).Append(
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
);

await call.RequestStream.WriteAsync(header).ConfigureAwait(false);

foreach (var e in eventData) {
await call.RequestStream.WriteAsync(
new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
},
var appendReq = new AppendReq {
ProposedMessage = new AppendReq.Types.ProposedMessage {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
}
}
).ConfigureAwait(false);
};

appendReq.ProposedMessage.Metadata.InjectTracingMetadata();

await call.RequestStream.WriteAsync(appendReq).ConfigureAwait(false);
}

await call.RequestStream.CompleteAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -150,7 +180,8 @@ private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header)
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
header.Options.StreamIdentifier,
position,
currentRevision);
currentRevision
);

return new SuccessResult(currentRevision, position);
}
Expand Down Expand Up @@ -227,28 +258,51 @@ private class StreamAppender : IDisposable {

private readonly Task<AsyncDuplexStreamingCall<BatchAppendReq, BatchAppendResp>?> _callTask;

public StreamAppender(EventStoreClientSettings settings,
Task<AsyncDuplexStreamingCall<BatchAppendReq, BatchAppendResp>?> callTask, CancellationToken cancellationToken,
Action<Exception> onException) {
public StreamAppender(
EventStoreClientSettings settings,
Task<AsyncDuplexStreamingCall<BatchAppendReq, BatchAppendResp>?> callTask,
CancellationToken cancellationToken,
Action<Exception> onException
) {
_settings = settings;
_callTask = callTask;
_cancellationToken = cancellationToken;
_onException = onException;
_channel = System.Threading.Channels.Channel.CreateBounded<BatchAppendReq>(10000);
_channel = Channel.CreateBounded<BatchAppendReq>(10000);
_pendingRequests = new ConcurrentDictionary<Uuid, TaskCompletionSource<IWriteResult>>();
_ = Task.Factory.StartNew(Send);
_ = Task.Factory.StartNew(Receive);
}

public ValueTask<IWriteResult> Append(string streamName, StreamRevision expectedStreamPosition,
IEnumerable<EventData> events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) =>
AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter),
events, cancellationToken);
public ValueTask<IWriteResult> Append(
string streamName, StreamRevision expectedStreamPosition,
IEnumerable<EventData> events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default
) =>
AppendInternal(
BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter),
events,
cancellationToken
).Trace(
TracingConstants.Operations.Append,
new ActivityTagsCollection {
{ SemanticAttributes.EventStoreStream, streamName }
}.WithTagsFrom(_settings)
);

public ValueTask<IWriteResult> Append(string streamName, StreamState expectedStreamState,
IEnumerable<EventData> events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default) =>
AppendInternal(BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter),
events, cancellationToken);
public ValueTask<IWriteResult> Append(
string streamName, StreamState expectedStreamState,
IEnumerable<EventData> events, TimeSpan? timeoutAfter, CancellationToken cancellationToken = default
) =>
AppendInternal(
BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter),
events,
cancellationToken
).Trace(
TracingConstants.Operations.Append,
new ActivityTagsCollection {
{ SemanticAttributes.EventStoreStream, streamName }
}.WithTagsFrom(_settings)
);

public async ValueTask<bool> IsUsable() {
var call = await _callTask.ConfigureAwait(false);
Expand All @@ -259,8 +313,7 @@ private async Task Receive() {
try {
var call = await _callTask.ConfigureAwait(false);
if (call is null) {
_channel.Writer.TryComplete(
new NotSupportedException("Server does not support batch append"));
_channel.Writer.TryComplete(new NotSupportedException("Server does not support batch append"));
return;
}

Expand Down Expand Up @@ -301,8 +354,10 @@ private async Task Send() {
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
}

private async ValueTask<IWriteResult> AppendInternal(BatchAppendReq.Types.Options options,
IEnumerable<EventData> events, CancellationToken cancellationToken) {
private async ValueTask<IWriteResult> AppendInternal(
BatchAppendReq.Types.Options options,
IEnumerable<EventData> events, CancellationToken cancellationToken
) {
var batchSize = 0;
var correlationId = Uuid.NewUuid();
var correlationIdDto = correlationId.ToDto();
Expand All @@ -323,17 +378,26 @@ private async ValueTask<IWriteResult> AppendInternal(BatchAppendReq.Types.Option
IEnumerable<BatchAppendReq> GetRequests() {
bool first = true;
var proposedMessages = new List<BatchAppendReq.Types.ProposedMessage>();

foreach (var @event in events) {
var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
Data = ByteString.CopyFrom(@event.Data.Span),
CustomMetadata = ByteString.CopyFrom(@event.Metadata.Span),
Id = @event.EventId.ToDto(),
Data = ByteString.CopyFrom(@event.Data.Span),
CustomMetadata = ByteString.CopyFrom(
// Temporary measure to test trace context propagation until the server supports
// propagation through system metadata (see Metadata below)
JsonSerializer.SerializeToUtf8Bytes(
new Dictionary<string, string>().InjectTracingMetadata()
)
),
Id = @event.EventId.ToDto(),
Metadata = {
{Constants.Metadata.Type, @event.Type},
{Constants.Metadata.ContentType, @event.ContentType}
{ Constants.Metadata.Type, @event.Type },
{ Constants.Metadata.ContentType, @event.ContentType }
}
};

proposedMessage.Metadata.InjectTracingMetadata();

proposedMessages.Add(proposedMessage);

if ((batchSize += proposedMessage.CalculateSize()) <
Expand All @@ -342,17 +406,18 @@ IEnumerable<BatchAppendReq> GetRequests() {
}

yield return new BatchAppendReq {
ProposedMessages = {proposedMessages},
ProposedMessages = { proposedMessages },
CorrelationId = correlationIdDto,
Options = first ? options : null
};

first = false;
proposedMessages.Clear();
batchSize = 0;
}

yield return new BatchAppendReq {
ProposedMessages = {proposedMessages},
ProposedMessages = { proposedMessages },
IsFinal = true,
CorrelationId = correlationIdDto,
Options = first ? options : null
Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Client.Streams/EventStoreClient.Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private async Task<IWriteResult> SetStreamMetadataInternal(StreamMetadata metada
CancellationToken cancellationToken) {

var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return await AppendToStreamInternal(channelInfo.CallInvoker, appendReq, new[] {
return await AppendToStreamInternal(channelInfo, appendReq, new[] {
new EventData(Uuid.NewUuid(), SystemEventTypes.StreamMetadata,
JsonSerializer.SerializeToUtf8Bytes(metadata, StreamMetadataJsonSerializerOptions)),
}, operationOptions, deadline, userCredentials, cancellationToken).ConfigureAwait(false);
Expand Down
Loading

0 comments on commit 42b7030

Please sign in to comment.