Skip to content

Commit

Permalink
[Host.Kafka] Kafka consumers started twice on Host.Run #213
Browse files Browse the repository at this point in the history
  • Loading branch information
zarusz committed Feb 28, 2024
1 parent 3d507b2 commit 46a7934
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 72 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0;net8.0</TargetFrameworks>
<Version>2.2.2</Version>
<Version>2.2.3-rc1</Version>
</PropertyGroup>

</Project>
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected override async Task ProduceToTransport(object message, string path, by

if (messageHeaders != null && messageHeaders.Count > 0)
{
kafkaMessage.Headers = new Headers();
kafkaMessage.Headers = [];

foreach (var keyValue in messageHeaders)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,15 @@
/// <summary>
/// <see cref="IHostedService"/> responsible for starting message bus consumers.
/// </summary>
public class MessageBusHostedService : IHostedService
public class MessageBusHostedService(IConsumerControl bus, MessageBusSettings messageBusSettings) : IHostedService
{
private readonly IConsumerControl _bus;
private readonly MessageBusSettings _messageBusSettings;

public MessageBusHostedService(IConsumerControl bus, MessageBusSettings messageBusSettings)
{
_bus = bus;
_messageBusSettings = messageBusSettings;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
if (_messageBusSettings.AutoStartConsumers)
if (messageBusSettings.AutoStartConsumers)
{
await _bus.Start();
await bus.Start();
}
}

public Task StopAsync(CancellationToken cancellationToken) => _bus.Stop();
public Task StopAsync(CancellationToken cancellationToken) => bus.Stop();
}
115 changes: 69 additions & 46 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public abstract class MessageBusBase : IDisposable, IAsyncDisposable, IMasterMes
private CancellationTokenSource _cancellationTokenSource = new();
private IMessageSerializer _serializer;
private readonly MessageHeaderService _headerService;
private readonly List<AbstractConsumer> _consumers = new();
private readonly List<AbstractConsumer> _consumers = [];

/// <summary>
/// Special market reference that signifies a dummy producer settings for response types.
Expand Down Expand Up @@ -60,10 +60,11 @@ public virtual IMessageSerializer Serializer
#endregion

private readonly object _initTaskLock = new();

private Task _initTask = null;

#region Start & Stop

private readonly object _startLock = new();

public bool IsStarted { get; private set; }

Expand Down Expand Up @@ -201,58 +202,80 @@ private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType)
}

public async Task Start()
{
if (!IsStarted && !IsStarting)
{
IsStarting = true;
try
{
await EnsureInitFinished();

_logger.LogInformation("Starting consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false);
{
lock (_startLock)
{
if (IsStarting || IsStarted)
{
return;
}
IsStarting = true;
}

try
{
await EnsureInitFinished();

await CreateConsumers();
await OnStart().ConfigureAwait(false);
await Task.WhenAll(_consumers.Select(x => x.Start())).ConfigureAwait(false);
_logger.LogInformation("Starting consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false);

await OnBusLifecycle(MessageBusLifecycleEventType.Started).ConfigureAwait(false);
_logger.LogInformation("Started consumers for {BusName} bus", Name);

IsStarted = true;
}
finally
{
IsStarting = false;
}
await CreateConsumers();
await OnStart().ConfigureAwait(false);
await Task.WhenAll(_consumers.Select(x => x.Start())).ConfigureAwait(false);

await OnBusLifecycle(MessageBusLifecycleEventType.Started).ConfigureAwait(false);
_logger.LogInformation("Started consumers for {BusName} bus", Name);

lock (_startLock)
{
IsStarted = true;
}
}
finally
{
lock (_startLock)
{
IsStarting = false;
}
}
}

public async Task Stop()
{
if (IsStarted && !IsStopping)
{
IsStopping = true;
try
{
lock (_startLock)
{
if (IsStopping || !IsStarted)
{
await EnsureInitFinished();

_logger.LogInformation("Stopping consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false);
return;
}
IsStopping = true;
}

await Task.WhenAll(_consumers.Select(x => x.Stop())).ConfigureAwait(false);
await OnStop().ConfigureAwait(false);
await DestroyConsumers().ConfigureAwait(false);

await OnBusLifecycle(MessageBusLifecycleEventType.Stopped).ConfigureAwait(false);
_logger.LogInformation("Stopped consumers for {BusName} bus", Name);

IsStarted = false;
}
finally
{
IsStopping = false;
}
try
{
await EnsureInitFinished();

_logger.LogInformation("Stopping consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false);

await Task.WhenAll(_consumers.Select(x => x.Stop())).ConfigureAwait(false);
await OnStop().ConfigureAwait(false);
await DestroyConsumers().ConfigureAwait(false);

await OnBusLifecycle(MessageBusLifecycleEventType.Stopped).ConfigureAwait(false);
_logger.LogInformation("Stopped consumers for {BusName} bus", Name);

lock (_startLock)
{
IsStarted = false;
}
}
finally
{
lock (_startLock)
{
IsStopping = false;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

public class MessageBusBuilderTests
{
internal class DerivedMessageBusBuilder : MessageBusBuilder
internal class DerivedMessageBusBuilder(MessageBusBuilder other) : MessageBusBuilder(other)
{
public DerivedMessageBusBuilder(MessageBusBuilder other) : base(other)
{
}
}

[Fact]
Expand Down Expand Up @@ -90,14 +87,14 @@ public void Given_OtherBuilder_When_CopyConstructorUsed_Then_AllStateIsCopied()
// arrange
var subject = MessageBusBuilder.Create();
subject.WithProvider(Mock.Of<Func<MessageBusSettings, IMessageBus>>());
subject.AddChildBus("Bus1", mbb => { });
subject.AddChildBus("Bus1", mbb => { });

// act
var copy = new DerivedMessageBusBuilder(subject);

// assert
copy.Settings.Should().BeSameAs(subject.Settings);
copy.Settings.Name.Should().BeSameAs(subject.Settings.Name);
copy.Settings.Name.Should().BeSameAs(subject.Settings.Name);
copy.Children.Should().BeSameAs(subject.Children);
copy.BusFactory.Should().BeSameAs(subject.BusFactory);
copy.PostConfigurationActions.Should().BeSameAs(subject.PostConfigurationActions);
Expand Down
51 changes: 48 additions & 3 deletions src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public MessageBusBaseTests()
_serviceProviderMock = new Mock<IServiceProvider>();
_serviceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(new JsonMessageSerializer());
_serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver());
_serviceProviderMock.Setup(x => x.GetService(It.Is<Type>(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))).Returns(Enumerable.Empty<object>());
_serviceProviderMock.Setup(x => x.GetService(It.Is<Type>(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))).Returns((Type t) => Array.CreateInstance(t.GetGenericArguments()[0], 0));

BusBuilder = MessageBusBuilder.Create()
.Produce<RequestA>(x =>
Expand All @@ -65,13 +65,12 @@ public MessageBusBaseTests()
.WithDependencyResolver(_serviceProviderMock.Object)
.WithProvider(s =>
{
var bus = new MessageBusTested(s)
return new MessageBusTested(s)
{
// provide current time
CurrentTimeProvider = () => _timeNow,
OnProduced = (mt, n, m) => _producedMessages.Add(new(mt, n, m))
};
return bus;
});

_busLazy = new Lazy<MessageBusTested>(() => (MessageBusTested)BusBuilder.Build());
Expand Down Expand Up @@ -598,4 +597,50 @@ public async Task When_Send_Given_InterceptorsInDI_Then_InterceptorInfluenceIfTh
}
sendInterceptorMock.VerifyNoOtherCalls();
}

[Fact]
public async Task When_Start_Given_ConcurrentCalls_Then_ItOnlyStartsConsumersOnce()
{
ThreadPool.SetMinThreads(100, 100);

// arrange
BusBuilder
.Consume<SomeMessage>(x => x.Topic("topic"));

// trigger lazy bus creation here ahead of the Tasks
var bus = Bus;

// act
for (var i = 0; i < 10; i++)
{
await Task.WhenAll(Enumerable.Range(0, 1000).Select(x => bus.Start()).ToList());
}

// assert
bus._startedCount.Should().Be(1);
bus._stoppedCount.Should().Be(0);
}

[Fact]
public async Task When_Stop_Given_ConcurrentCalls_Then_ItOnlyStopsConsumersOnce()
{
// arrange
BusBuilder
.Consume<SomeMessage>(x => x.Topic("topic"));

// trigger lazy bus creation here ahead of the Tasks
var bus = Bus;

await bus.Start();

// act
for (var i = 0; i < 10; i++)
{
await Task.WhenAll(Enumerable.Range(0, 10000).Select(x => bus.Stop()).AsParallel());
}

// assert
bus._startedCount.Should().Be(1);
bus._stoppedCount.Should().Be(1);
}
}
19 changes: 17 additions & 2 deletions src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
namespace SlimMessageBus.Host.Test;

public class MessageBusTested : MessageBusBase
{
{
internal int _startedCount;
internal int _stoppedCount;

public MessageBusTested(MessageBusSettings settings)
: base(settings)
{
Expand All @@ -10,7 +13,7 @@ public MessageBusTested(MessageBusSettings settings)

OnBuildProvider();
}


public ProducerSettings Public_GetProducerSettings(Type messageType) => GetProducerSettings(messageType);

public int PendingRequestsCount => PendingRequestStore.GetCount();
Expand All @@ -19,6 +22,18 @@ public MessageBusTested(MessageBusSettings settings)
public Action<Type, string, object> OnProduced { get; set; }

#region Overrides of MessageBusBase

protected internal override Task OnStart()
{
Interlocked.Increment(ref _startedCount);
return base.OnStart();
}

protected internal override Task OnStop()
{
Interlocked.Increment(ref _stoppedCount);
return base.OnStop();
}

protected override async Task<object> ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders, CancellationToken cancellationToken = default)
{
Expand Down

0 comments on commit 46a7934

Please sign in to comment.