Skip to content

Commit

Permalink
Return null for single host in discovery mode
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Feb 6, 2024
1 parent def981a commit 0ef472f
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 83 deletions.
70 changes: 59 additions & 11 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Grpc.Core;
using static EventStore.Client.Streams.ReadResp;
Expand Down Expand Up @@ -42,23 +37,76 @@ public ReadAllStreamResult ReadAllAsync(
Options = new() {
ReadDirection = direction switch {
Direction.Backwards => ReadReq.Types.Options.Types.ReadDirection.Backwards,
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
_ => throw InvalidOption(direction)
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
_ => throw InvalidOption(direction)
},
ResolveLinks = resolveLinkTos,
All = new() {
Position = new() {
CommitPosition = position.CommitPosition,
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
},
Count = (ulong)maxCount,
UuidOption = new() {Structured = new()},
NoFilter = new(),
Count = (ulong)maxCount,
UuidOption = new() {Structured = new()},
NoFilter = new(),
ControlOption = new() {Compatibility = 1}
}
}, Settings, deadline, userCredentials, cancellationToken);
}

/// <summary>
/// Asynchronously reads all events with filtering.
/// </summary>
/// <param name="direction">The <see cref="Direction"/> in which to read.</param>
/// <param name="position">The <see cref="Position"/> to start reading from.</param>
/// <param name="eventFilter">The <see cref="IEventFilter"/> to apply.</param>
/// <param name="maxCount">The maximum count to read.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="deadline"></param>
/// <param name="userCredentials">The optional <see cref="UserCredentials"/> to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public ReadAllStreamResult ReadAllAsync(
Direction direction,
Position position,
IEventFilter eventFilter,
long maxCount = long.MaxValue,
bool resolveLinkTos = false,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
if (maxCount <= 0) {
throw new ArgumentOutOfRangeException(nameof(maxCount));
}

var readReq = new ReadReq {
Options = new() {
ReadDirection = direction switch {
Direction.Backwards => ReadReq.Types.Options.Types.ReadDirection.Backwards,
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
_ => throw InvalidOption(direction)
},
ResolveLinks = resolveLinkTos,
All = new() {
Position = new() {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
},
Count = (ulong)maxCount,
UuidOption = new() { Structured = new() },
ControlOption = new() { Compatibility = 1 },
Filter = GetFilterOptions(eventFilter)
}
};

return new ReadAllStreamResult(async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
}, readReq, Settings, deadline, userCredentials, cancellationToken);
}

/// <summary>
/// A class that represents the result of a read operation on the $all stream. You may either enumerate this instance directly or <see cref="Messages"/>. Do not enumerate more than once.
Expand Down
12 changes: 6 additions & 6 deletions src/EventStore.Client.Streams/EventStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,11 @@ private StreamAppender CreateStreamAppender() {
}
}

private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
SubscriptionFilterOptions? filterOptions) {
if (filterOptions == null) {
private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(IEventFilter? filter, uint checkpointInterval = 0) {
if (filter == null) {
return null;
}

var filter = filterOptions.Filter;

var options = filter switch {
StreamFilter => new ReadReq.Types.Options.Types.FilterOptions {
StreamIdentifier = (filter.Prefixes, filter.Regex) switch {
Expand Down Expand Up @@ -127,11 +124,14 @@ private StreamAppender CreateStreamAppender() {
options.Count = new Empty();
}

options.CheckpointIntervalMultiplier = filterOptions.CheckpointInterval;
options.CheckpointIntervalMultiplier = checkpointInterval;

return options;
}

private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(SubscriptionFilterOptions? filterOptions)
=> filterOptions == null ? null : GetFilterOptions(filterOptions.Filter, filterOptions.CheckpointInterval);

/// <inheritdoc />
public override void Dispose() {
if (_streamAppenderLazy.IsValueCreated)
Expand Down
63 changes: 28 additions & 35 deletions src/EventStore.Client/EventStoreClientConnectivitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,26 @@ namespace EventStore.Client {
/// A class used to describe how to connect to an instance of EventStoreDB.
/// </summary>
public class EventStoreClientConnectivitySettings {
private const int DefaultPort = 2113;
private bool _insecure;
private Uri? _address;
private const int DefaultPort = 2113;
private bool _insecure;
private Uri? _address;

/// <summary>
/// The <see cref="Uri"/> of the EventStoreDB. Use this when connecting to a single node.
/// </summary>
public Uri Address {
get { return _address != null && IsSingleNode ? _address : DefaultAddress; }
set { _address = value; }
public Uri? Address {
get => IsSingleNode ? _address : null;
set => _address = value;
}

private Uri DefaultAddress {
get {
return new UriBuilder {
Scheme = _insecure ? Uri.UriSchemeHttp : Uri.UriSchemeHttps,
Port = DefaultPort
}.Uri;
}
}

internal Uri ResolvedAddressOrDefault => Address ?? DefaultAddress;

Uri DefaultAddress =>
new UriBuilder {
Scheme = _insecure ? Uri.UriSchemeHttp : Uri.UriSchemeHttps,
Port = DefaultPort
}.Uri;

/// <summary>
/// The maximum number of times to attempt <see cref="EndPoint"/> discovery.
/// </summary>
Expand All @@ -38,8 +37,8 @@ private Uri DefaultAddress {
public EndPoint[] GossipSeeds =>
((object?)DnsGossipSeeds ?? IpGossipSeeds) switch {
DnsEndPoint[] dns => Array.ConvertAll<DnsEndPoint, EndPoint>(dns, x => x),
IPEndPoint[] ip => Array.ConvertAll<IPEndPoint, EndPoint>(ip, x => x),
_ => Array.Empty<EndPoint>()
IPEndPoint[] ip => Array.ConvertAll<IPEndPoint, EndPoint>(ip, x => x),
_ => Array.Empty<EndPoint>()
};

/// <summary>
Expand Down Expand Up @@ -87,37 +86,31 @@ private Uri DefaultAddress {
/// True if pointing to a single EventStoreDB node.
/// </summary>
public bool IsSingleNode => GossipSeeds.Length == 0;

/// <summary>
/// True if communicating over an insecure channel; otherwise false.
/// </summary>
public bool Insecure {
get {
return IsSingleNode
? string.Equals(Address.Scheme, Uri.UriSchemeHttp, StringComparison.OrdinalIgnoreCase)
: _insecure;
}
set {
_insecure = value;
}
get => IsSingleNode ? string.Equals(Address?.Scheme, Uri.UriSchemeHttp, StringComparison.OrdinalIgnoreCase) : _insecure;
set => _insecure = value;
}

/// <summary>
/// True if certificates will be validated; otherwise false.
/// </summary>
public bool TlsVerifyCert { get; set; } = true;

/// <summary>
/// The default <see cref="EventStoreClientConnectivitySettings"/>.
/// </summary>
public static EventStoreClientConnectivitySettings Default => new EventStoreClientConnectivitySettings {
MaxDiscoverAttempts = 10,
GossipTimeout = TimeSpan.FromSeconds(5),
DiscoveryInterval = TimeSpan.FromMilliseconds(100),
NodePreference = NodePreference.Leader,
KeepAliveInterval = TimeSpan.FromSeconds(10),
KeepAliveTimeout = TimeSpan.FromSeconds(10),
TlsVerifyCert = true,
GossipTimeout = TimeSpan.FromSeconds(5),
DiscoveryInterval = TimeSpan.FromMilliseconds(100),
NodePreference = NodePreference.Leader,
KeepAliveInterval = TimeSpan.FromSeconds(10),
KeepAliveTimeout = TimeSpan.FromSeconds(10),
TlsVerifyCert = true,
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static class ConnectionStringParser {
private const string UriSchemeDiscover = "esdb+discover";

private static readonly string[] Schemes = {"esdb", UriSchemeDiscover};
private static readonly int DefaultPort = EventStoreClientConnectivitySettings.Default.Address.Port;
private static readonly int DefaultPort = EventStoreClientConnectivitySettings.Default.ResolvedAddressOrDefault.Port;
private static readonly bool DefaultUseTls = true;

private static readonly Dictionary<string, Type> SettingsType =
Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Client/HttpFallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ internal class HttpFallback : IDisposable {
private readonly string _addressScheme;

internal HttpFallback (EventStoreClientSettings settings) {
_addressScheme = settings.ConnectivitySettings.Address.Scheme;
_addressScheme = settings.ConnectivitySettings.ResolvedAddressOrDefault.Scheme;
_defaultCredentials = settings.DefaultCredentials;

var handler = new HttpClientHandler();
Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Client/SingleNodeChannelSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public SingleNodeChannelSelector(

_channelCache = channelCache;

var uri = settings.ConnectivitySettings.Address;
var uri = settings.ConnectivitySettings.ResolvedAddressOrDefault;
_endPoint = new DnsEndPoint(host: uri.Host, port: uri.Port);
}

Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Client/SingleNodeHttpHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public SingleNodeHttpHandler(EventStoreClientSettings settings) {
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
CancellationToken cancellationToken) {
request.RequestUri = new UriBuilder(request.RequestUri!) {
Scheme = _settings.ConnectivitySettings.Address.Scheme
Scheme = _settings.ConnectivitySettings.ResolvedAddressOrDefault.Scheme
}.Uri;
return base.SendAsync(request, cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ public ReadAllEventsFixture() {
userCredentials: TestCredentials.Root
);
Events = Enumerable
.Concat(
CreateTestEvents(20),
CreateTestEvents(2, metadataSize: 1_000_000)
)
Events = CreateTestEvents(20)
.Concat(CreateTestEvents(2, metadataSize: 1_000_000))
.Concat(CreateTestEvents(2, AnotherTestEventType))
.ToArray();
ExpectedStreamName = GetStreamName();
Expand All @@ -38,4 +36,4 @@ public ReadAllEventsFixture() {

public EventBinaryData ExpectedFirstEvent { get; private set; }
public EventBinaryData ExpectedLastEvent { get; private set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public async Task with_timeout_fails_when_operation_expired() {
ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded);
}

[Fact]
public async Task filter_events_by_type() {
var result = await Fixture.Streams
.ReadAllAsync(Direction.Backwards, Position.End, EventTypeFilter.Prefix(EventStoreFixture.AnotherTestEventTypePrefix))
.ToListAsync();

result.ForEach(x => x.Event.EventType.ShouldStartWith(EventStoreFixture.AnotherTestEventTypePrefix));
}

[Fact(Skip = "Not Implemented")]
public Task be_able_to_read_all_one_by_one_until_end_of_stream() => throw new NotImplementedException();

Expand All @@ -80,4 +89,4 @@ public async Task with_timeout_fails_when_operation_expired() {

[Fact(Skip = "Not Implemented")]
public Task when_got_int_max_value_as_maxcount_should_throw() => throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ await result.Messages.ToArrayAsync()
);
}

[Fact]
public async Task filter_events_by_type() {
var result = await Fixture.Streams
.ReadAllAsync(Direction.Forwards, Position.Start, EventTypeFilter.Prefix(EventStoreFixture.AnotherTestEventTypePrefix))
.ToListAsync();

result.ForEach(x => x.Event.EventType.ShouldStartWith(EventStoreFixture.AnotherTestEventTypePrefix));
}

[Fact(Skip = "Not Implemented")]
public Task be_able_to_read_all_one_by_one_until_end_of_stream() => throw new NotImplementedException();

Expand All @@ -147,4 +156,4 @@ await result.Messages.ToArrayAsync()

[Fact(Skip = "Not Implemented")]
public Task when_got_int_max_value_as_maxcount_should_throw() => throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ protected EventStoreClientFixtureBase(
TestServer = new EventStoreTestServerExternal();
else
TestServer = GlobalEnvironment.UseCluster
? new EventStoreTestServerCluster(hostCertificatePath, Settings.ConnectivitySettings.Address, env)
: new EventStoreTestServer(hostCertificatePath, Settings.ConnectivitySettings.Address, env);
? new EventStoreTestServerCluster(hostCertificatePath, Settings.ConnectivitySettings.ResolvedAddressOrDefault, env)
: new EventStoreTestServer(hostCertificatePath, Settings.ConnectivitySettings.ResolvedAddressOrDefault, env);
}

public IEventStoreTestServer TestServer { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
namespace EventStore.Client.Tests;

public partial class EventStoreFixture {
public const string TestEventType = "tst";
public const string TestEventType = "test-event-type";
public const string AnotherTestEventTypePrefix = "another";
public const string AnotherTestEventType = $"{AnotherTestEventTypePrefix}-test-event-type";

public T NewClient<T>(Action<EventStoreClientSettings> configure) where T : EventStoreClientBase, new() =>
(T)Activator.CreateInstance(typeof(T), new object?[] { ClientSettings.With(configure) })!;
Expand Down Expand Up @@ -50,4 +52,4 @@ public async Task RestartService(TimeSpan delay) {
await Streams.WarmUp();
Log.Information("Service restarted.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static EventStoreFixtureOptions DefaultOptions() {
protected override ContainerBuilder Configure() {
var env = Options.Environment.Select(pair => $"{pair.Key}={pair.Value}").ToArray();

var port = Options.ClientSettings.ConnectivitySettings.Address.Port;
var port = Options.ClientSettings.ConnectivitySettings.ResolvedAddressOrDefault.Port;
var certsPath = Path.Combine(Environment.CurrentDirectory, "certs");

var containerName = port == 2113
Expand Down
Loading

0 comments on commit 0ef472f

Please sign in to comment.