Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPERIMENT] Channel lock experiment #646

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ConsoleApp1/ConsoleApp1.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\src\NATS.Client.Core\NATS.Client.Core.csproj" />
</ItemGroup>





</Project>
26 changes: 26 additions & 0 deletions ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using NATS.Client.Core;

var nc = new NatsConnection();

List<Task> tasks = new();

var bytes = new byte[1024];
for (var i = 0; i < 10; i++)
{
tasks.Add(Task.Run(async () =>
{
while (true)
{
try
{
await nc.PublishAsync("foo", bytes);
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}));
}

await Task.WhenAll(tasks);
6 changes: 6 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{
.github\workflows\test.yml = .github\workflows\test.yml
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConsoleApp1", "ConsoleApp1\ConsoleApp1.csproj", "{A28740D2-5653-4B29-AAB6-1A0ED5B8DBFA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -341,6 +343,10 @@ Global
{9521D9E0-642A-4C7E-BD10-372DF235CF62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9521D9E0-642A-4C7E-BD10-372DF235CF62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9521D9E0-642A-4C7E-BD10-372DF235CF62}.Release|Any CPU.Build.0 = Release|Any CPU
{A28740D2-5653-4B29-AAB6-1A0ED5B8DBFA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A28740D2-5653-4B29-AAB6-1A0ED5B8DBFA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A28740D2-5653-4B29-AAB6-1A0ED5B8DBFA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A28740D2-5653-4B29-AAB6-1A0ED5B8DBFA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
84 changes: 39 additions & 45 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal sealed class CommandWriter : IAsyncDisposable
private readonly Channel<int> _channelSize;
private readonly PipeReader _pipeReader;
private readonly PipeWriter _pipeWriter;
private readonly SemaphoreSlim _semLock = new(1);
private readonly Channel<int> _semLock = Channel.CreateBounded<int>(1);
private readonly PartialSendFailureCounter _partialSendFailureCounter = new();
private ISocketConnection? _socketConnection;
private Task? _flushTask;
Expand Down Expand Up @@ -185,7 +185,7 @@ public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancella

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
if (!_semLock.Writer.TryWrite(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand All @@ -209,7 +209,7 @@ public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancella
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}

return default;
Expand All @@ -224,7 +224,7 @@ public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellati

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
if (!_semLock.Writer.TryWrite(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand All @@ -249,7 +249,7 @@ public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellati
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}

return default;
Expand All @@ -264,7 +264,7 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default)

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
if (!_semLock.Writer.TryWrite(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand All @@ -288,7 +288,7 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default)
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}

return default;
Expand Down Expand Up @@ -342,7 +342,7 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
if (!_semLock.Writer.TryWrite(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand All @@ -366,7 +366,7 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);

payloadBuffer.Reset();
_pool.Return(payloadBuffer);
Expand All @@ -390,7 +390,7 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
if (!_semLock.Writer.TryWrite(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand All @@ -414,7 +414,7 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}

return default;
Expand All @@ -429,7 +429,7 @@ public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cance

#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_semLock.Wait(0))
if (!_semLock.Writer.TryWrite(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand All @@ -453,7 +453,7 @@ public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cance
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}

return default;
Expand All @@ -462,7 +462,7 @@ public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cance
// only used for internal testing
internal async Task TestStallFlushAsync(TimeSpan timeSpan, CancellationToken cancellationToken)
{
await _semLock.WaitAsync().ConfigureAwait(false);
await _semLock.Writer.WriteAsync(0, cancellationToken).ConfigureAwait(false);

try
{
Expand All @@ -475,7 +475,7 @@ internal async Task TestStallFlushAsync(TimeSpan timeSpan, CancellationToken can
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}
}

Expand Down Expand Up @@ -704,10 +704,9 @@ private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts conne
{
if (!lockHeld)
{
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new OperationCanceledException();
}
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(_defaultCommandTimeout);
await _semLock.Writer.WriteAsync(0, cancellationTokenSource.Token).ConfigureAwait(false);
}

try
Expand All @@ -733,18 +732,17 @@ private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts conne
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}
}

private async ValueTask PingStateMachineAsync(bool lockHeld, PingCommand pingCommand, CancellationToken cancellationToken)
{
if (!lockHeld)
{
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new OperationCanceledException();
}
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(_defaultCommandTimeout);
await _semLock.Writer.WriteAsync(0, cancellationTokenSource.Token).ConfigureAwait(false);
}

try
Expand All @@ -771,18 +769,17 @@ private async ValueTask PingStateMachineAsync(bool lockHeld, PingCommand pingCom
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}
}

private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken cancellationToken)
{
if (!lockHeld)
{
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new OperationCanceledException();
}
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(_defaultCommandTimeout);
await _semLock.Writer.WriteAsync(0, cancellationTokenSource.Token).ConfigureAwait(false);
}

try
Expand All @@ -808,7 +805,7 @@ private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken c
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}
}

Expand All @@ -821,10 +818,9 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject,
{
if (!lockHeld)
{
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new OperationCanceledException();
}
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(_defaultCommandTimeout);
await _semLock.Writer.WriteAsync(0, cancellationTokenSource.Token).ConfigureAwait(false);
}

try
Expand All @@ -850,7 +846,7 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject,
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}
}
finally
Expand All @@ -870,10 +866,9 @@ private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, strin
{
if (!lockHeld)
{
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new OperationCanceledException();
}
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(_defaultCommandTimeout);
await _semLock.Writer.WriteAsync(0, cancellationTokenSource.Token).ConfigureAwait(false);
}

try
Expand All @@ -899,18 +894,17 @@ private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, strin
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}
}

private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, int sid, int? maxMsgs, CancellationToken cancellationToken)
{
if (!lockHeld)
{
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new OperationCanceledException();
}
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(_defaultCommandTimeout);
await _semLock.Writer.WriteAsync(0, cancellationTokenSource.Token).ConfigureAwait(false);
}

try
Expand All @@ -936,7 +930,7 @@ private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, int sid, int
}
finally
{
_semLock.Release();
_semLock.Reader.TryRead(out _);
}
}

Expand Down