Skip to content

Commit

Permalink
Sonar tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Oct 6, 2024
1 parent 698a9a2 commit 4abd06a
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ dotnet_style_qualification_for_field = false:suggestion
dotnet_style_qualification_for_property = false:suggestion
dotnet_style_qualification_for_method = false:suggestion
dotnet_style_qualification_for_event = false:suggestion
dotnet_diagnostic.VSTHRD200.severity = none

[*.{csproj,xml}]
indent_style = space
Expand Down
7 changes: 4 additions & 3 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ protected override async Task OnStop()
if (_consumerTask == null)
{
throw new MessageBusException($"Consumer for group {Group} not yet started");
}

_consumerCts.Cancel();
}

await _consumerCts.CancelAsync();

try
{
await _consumerTask.ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected override void Build()
.ToDictionary(
x => x.Key,
// Note: The consumers will first have IConsumer<>, then IRequestHandler<>
x => CreateMessageProcessor(x.OrderBy(consumerSettings => ConsumerModeOrder(consumerSettings)).ToList(), x.Key));
x => CreateMessageProcessor([.. x.OrderBy(consumerSettings => ConsumerModeOrder(consumerSettings))], x.Key));

_messageProcessorQueueByPath = ProviderSettings.EnableBlockingPublish
? []
Expand Down
36 changes: 19 additions & 17 deletions src/SlimMessageBus.Host.Nats/NatsSubjectConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,7 @@ protected override async Task OnStart()
{
_subscription ??= await connection.SubscribeCoreAsync<TType>(subject, cancellationToken: CancellationToken);

_messageConsumerTask = Task.Factory.StartNew(async () =>
{
try
{
while (await _subscription.Msgs.WaitToReadAsync(CancellationToken))
{
while (_subscription.Msgs.TryRead(out var msg))
{
await messageProcessor.ProcessMessage(msg, msg.Headers.ToReadOnlyDictionary(), cancellationToken: CancellationToken).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException ex)
{
Logger.LogInformation(ex, "Consumer task was cancelled");
}
}, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
_messageConsumerTask = Task.Factory.StartNew(OnLoop, CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
}

protected override async Task OnStop()
Expand All @@ -41,5 +25,23 @@ protected override async Task OnStop()
await _subscription.UnsubscribeAsync().ConfigureAwait(false);
await _subscription.DisposeAsync();
}
}

private async Task OnLoop()
{
try
{
while (await _subscription!.Msgs.WaitToReadAsync(CancellationToken))
{
while (_subscription.Msgs.TryRead(out var msg))
{
await messageProcessor.ProcessMessage(msg, msg.Headers.ToReadOnlyDictionary(), cancellationToken: CancellationToken).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException ex)
{
Logger.LogInformation(ex, "Consumer task was cancelled");
}
}
}
14 changes: 8 additions & 6 deletions src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ internal class OutboxSendingTask(
private Task _startBusTask;
private Task _stopBusTask;

private int _busStartCount;

private int _busStartCount;

private DateTime? _cleanupNextRun;

private bool ShouldRunCleanup()
{
if (_outboxSettings.MessageCleanup?.Enabled == true)
{
var trigger = _cleanupNextRun is null || DateTime.UtcNow > _cleanupNextRun.Value;
var trigger = !_cleanupNextRun.HasValue || DateTime.UtcNow > _cleanupNextRun.Value;
if (trigger)
{
_cleanupNextRun = DateTime.UtcNow.Add(_outboxSettings.MessageCleanup.Interval);
Expand Down Expand Up @@ -78,7 +78,10 @@ protected async Task Stop()
{
_logger.LogDebug("Outbox loop stopping...");

_loopCts?.Cancel();
if (_loopCts != null)
{
await _loopCts.CancelAsync();
}

if (_loopTask != null)
{
Expand Down Expand Up @@ -261,8 +264,7 @@ async internal Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxR
{
var busName = busGroup.Key;
var bus = GetBus(compositeMessageBus, messageBusTarget, busName);
var bulkProducer = bus as IMessageBusBulkProducer;
if (bus == null || bulkProducer == null)
if (bus == null || bus is not IMessageBusBulkProducer bulkProducer)
{
foreach (var outboxMessage in busGroup)
{
Expand Down
9 changes: 3 additions & 6 deletions src/SlimMessageBus.Host/Consumer/AbstractConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ public abstract class AbstractConsumer : IAsyncDisposable, IConsumerControl

protected CancellationToken CancellationToken => _cancellationTokenSource.Token;

protected AbstractConsumer(ILogger logger)
{
Logger = logger;
}
protected AbstractConsumer(ILogger logger) => Logger = logger;

public async Task Start()
{
Expand All @@ -29,7 +26,7 @@ public async Task Start()
{
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource?.Cancel();
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = new CancellationTokenSource();
}

Expand All @@ -53,7 +50,7 @@ public async Task Stop()
_stopping = true;
try
{
_cancellationTokenSource.Cancel();
await _cancellationTokenSource.CancelAsync();

await OnStop().ConfigureAwait(false);

Expand Down
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ protected async virtual ValueTask DisposeAsyncCore()
await Stop().ConfigureAwait(false);

if (_cancellationTokenSource != null)
{
_cancellationTokenSource.Cancel();
{
await _cancellationTokenSource.CancelAsync();
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
}
Expand Down
18 changes: 18 additions & 0 deletions src/SlimMessageBus.Host/PlatformExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace SlimMessageBus.Host;

using System.Diagnostics.CodeAnalysis;

/// <summary>
/// A set of platform extensions to backfill functionality for some of the missing API prior in .NET 8.0.
/// </summary>
public static class PlatformExtensions
{
#if !NET8_0_OR_GREATER
[ExcludeFromCodeCoverage]
public static Task CancelAsync(this CancellationTokenSource cts)
{
cts.Cancel();
return Task.CompletedTask;
}
#endif
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public class ProcessMessagesTests
private readonly Mock<IMessageBusTarget> _mockMessageBusTarget;
private readonly Mock<IMasterMessageBus> _mockMasterMessageBus;
private readonly Mock<IMessageBusBulkProducer> _mockMessageBusBulkProducer;
private readonly Mock<ILogger<OutboxSendingTask>> _mockLogger;
private readonly OutboxSettings _outboxSettings;
private readonly OutboxSendingTask _sut;

Expand All @@ -104,7 +103,6 @@ public ProcessMessagesTests()
_mockMessageBusTarget = new Mock<IMessageBusTarget>();
_mockMasterMessageBus = new Mock<IMasterMessageBus>();
_mockMessageBusBulkProducer = _mockMasterMessageBus.As<IMessageBusBulkProducer>();
_mockLogger = new Mock<ILogger<OutboxSendingTask>>();

_outboxSettings = new OutboxSettings
{
Expand Down Expand Up @@ -178,8 +176,8 @@ public async Task ProcessMessages_ShouldAbortDelivery_WhenBusIsNotRecognised()
outboxMessages[0].BusName = null;
outboxMessages[7].BusName = null;

var knownBusCount = outboxMessages.Count(x => x.BusName != null);
var knownBusCount = outboxMessages.Count(x => x.BusName != null);

_mockMessageBusTarget.SetupGet(x => x.Target).Returns((IMessageBusProducer)null);

_mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny<string>())).Returns(_mockMasterMessageBus.Object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ public void When_GenerateGetterFunc_Given_TaskOfT_Then_ResultOfTaskIsObtained()
{
// arrange
var taskWithResult = Task.FromResult(1);
#pragma warning disable xUnit1031 // Do not use blocking task operations in test method
var resultPropertyInfo = typeof(Task<int>).GetProperty(nameof(Task<int>.Result));
#pragma warning restore xUnit1031 // Do not use blocking task operations in test method

// act
var getResultLambda = ReflectionUtils.GenerateGetterFunc(resultPropertyInfo);
Expand All @@ -25,7 +27,7 @@ public async Task When_GenerateMethodCallToFunc_Given_ConsumerWithOnHandlerAsync
var message = new SomeMessage();

var instanceType = typeof(IConsumer<SomeMessage>);
var consumerOnHandleMethodInfo = instanceType.GetMethod(nameof(IConsumer<SomeMessage>.OnHandle), new[] { typeof(SomeMessage) });
var consumerOnHandleMethodInfo = instanceType.GetMethod(nameof(IConsumer<SomeMessage>.OnHandle), [typeof(SomeMessage)]);

var consumerMock = new Mock<IConsumer<SomeMessage>>();
consumerMock.Setup(x => x.OnHandle(message)).Returns(Task.CompletedTask);
Expand Down Expand Up @@ -53,7 +55,7 @@ public void When_GenerateGenericMethodCallToFunc_Given_GenericMethid_Then_Method
var genericMethod = typeof(ClassWithGenericMethod).GetMethods().FirstOrDefault(x => x.Name == nameof(ClassWithGenericMethod.GenericMethod));

// act
var methodOfTypeBoolFunc = ReflectionUtils.GenerateGenericMethodCallToFunc<Func<object, object>>(genericMethod, new[] { typeof(bool) }, obj.GetType(), typeof(object));
var methodOfTypeBoolFunc = ReflectionUtils.GenerateGenericMethodCallToFunc<Func<object, object>>(genericMethod, [typeof(bool)], obj.GetType(), typeof(object));
var result = methodOfTypeBoolFunc(obj);

// assert
Expand All @@ -75,7 +77,9 @@ public async Task When_TaskOfObjectContinueWithTaskOfTypeFunc_Given_TaskOfObject

typedTask.GetType().Should().BeAssignableTo(typeof(Task<>).MakeGenericType(typeof(int)));

#pragma warning disable xUnit1031 // Do not use blocking task operations in test method
var resultFunc = ReflectionUtils.GenerateGetterFunc(typeof(Task<int>).GetProperty(nameof(Task<int>.Result)));
#pragma warning restore xUnit1031 // Do not use blocking task operations in test method
var result = resultFunc(typedTask);

result.Should().Be(10);
Expand Down

0 comments on commit 4abd06a

Please sign in to comment.