diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 9df019e4..df4da77e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -25,26 +25,21 @@ jobs: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true steps: - # - name: Dump GitHub context - # env: - # GITHUB_CONTEXT: ${{ toJson(github) }} - # run: echo "$GITHUB_CONTEXT" - - name: PR - Checkout if: github.event_name == 'pull_request_target' - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 ref: ${{ github.event.pull_request.head.sha }} - name: Checkout if: github.event_name != 'pull_request_target' - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Setup .NET - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: 8.0.x cache: false @@ -53,14 +48,9 @@ jobs: run: dotnet restore $SOLUTION_NAME working-directory: ./src - - name: Install Coverlet - #if: false - run: find . -name "*.Test.csproj" | xargs -t -I {} dotnet add {} package coverlet.collector - working-directory: ./src - - name: SonarCloud - Setup Java17 #if: github.event_name == 'pull_request_target' - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: "adopt" java-version: "17" @@ -137,7 +127,7 @@ jobs: _redis_connectionstring: ${{ secrets.redis_connectionstring }} - _sqlserver_connectionstring: ${{ secrets.sqlserver_connectionstring }} + sqlserver_connectionstring: ${{ secrets.sqlserver_connectionstring }} # Connects to the local Test Containers kafka_brokers: localhost:9092 @@ -155,7 +145,7 @@ jobs: redis_connectionstring: localhost:6379 - sqlserver_connectionstring: "Server=localhost;Initial Catalog=SlimMessageBus_Outbox;User ID=sa;Password=SuperSecretP@55word;TrustServerCertificate=true;MultipleActiveResultSets=true;" + _sqlserver_connectionstring: "Server=localhost;Initial Catalog=SlimMessageBus_Outbox;User ID=sa;Password=SuperSecretP@55word;TrustServerCertificate=true;MultipleActiveResultSets=true;" nats_endpoint: "nats://localhost:4222" diff --git a/src/Infrastructure/docker-compose.yml b/src/Infrastructure/docker-compose.yml index 037928ad..f713eb53 100644 --- a/src/Infrastructure/docker-compose.yml +++ b/src/Infrastructure/docker-compose.yml @@ -16,7 +16,7 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1" + KAFKA_CREATE_TOPICS: "test-ping:2:1,test-echo:2:1,test-echo-resp:2:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 depends_on: - zookeeper diff --git a/src/Samples/Sample.AsyncApi.Service/Sample.AsyncApi.Service.csproj b/src/Samples/Sample.AsyncApi.Service/Sample.AsyncApi.Service.csproj index 79d3283c..4ab654b2 100644 --- a/src/Samples/Sample.AsyncApi.Service/Sample.AsyncApi.Service.csproj +++ b/src/Samples/Sample.AsyncApi.Service/Sample.AsyncApi.Service.csproj @@ -9,7 +9,7 @@ - + diff --git a/src/Samples/Sample.DomainEvents.Application/Sample.DomainEvents.Application.csproj b/src/Samples/Sample.DomainEvents.Application/Sample.DomainEvents.Application.csproj index 06527a84..251dcf6f 100644 --- a/src/Samples/Sample.DomainEvents.Application/Sample.DomainEvents.Application.csproj +++ b/src/Samples/Sample.DomainEvents.Application/Sample.DomainEvents.Application.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/Samples/Sample.DomainEvents.Domain/Sample.DomainEvents.Domain.csproj b/src/Samples/Sample.DomainEvents.Domain/Sample.DomainEvents.Domain.csproj index 1a77a0ce..104d9879 100644 --- a/src/Samples/Sample.DomainEvents.Domain/Sample.DomainEvents.Domain.csproj +++ b/src/Samples/Sample.DomainEvents.Domain/Sample.DomainEvents.Domain.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/Samples/Sample.DomainEvents.WebApi/Sample.DomainEvents.WebApi.csproj b/src/Samples/Sample.DomainEvents.WebApi/Sample.DomainEvents.WebApi.csproj index 3cc9c16b..04608631 100644 --- a/src/Samples/Sample.DomainEvents.WebApi/Sample.DomainEvents.WebApi.csproj +++ b/src/Samples/Sample.DomainEvents.WebApi/Sample.DomainEvents.WebApi.csproj @@ -14,9 +14,9 @@ - - - + + + diff --git a/src/Samples/Sample.Hybrid.ConsoleApp/Sample.Hybrid.ConsoleApp.csproj b/src/Samples/Sample.Hybrid.ConsoleApp/Sample.Hybrid.ConsoleApp.csproj index 99f27c92..05d6b51a 100644 --- a/src/Samples/Sample.Hybrid.ConsoleApp/Sample.Hybrid.ConsoleApp.csproj +++ b/src/Samples/Sample.Hybrid.ConsoleApp/Sample.Hybrid.ConsoleApp.csproj @@ -16,11 +16,11 @@ - - - - - + + + + + diff --git a/src/Samples/Sample.Images.WebApi/Sample.Images.WebApi.csproj b/src/Samples/Sample.Images.WebApi/Sample.Images.WebApi.csproj index cd5f03f9..0ad4ea03 100644 --- a/src/Samples/Sample.Images.WebApi/Sample.Images.WebApi.csproj +++ b/src/Samples/Sample.Images.WebApi/Sample.Images.WebApi.csproj @@ -15,9 +15,9 @@ - - - + + + diff --git a/src/Samples/Sample.Images.Worker/Sample.Images.Worker.csproj b/src/Samples/Sample.Images.Worker/Sample.Images.Worker.csproj index 15925308..4242d30f 100644 --- a/src/Samples/Sample.Images.Worker/Sample.Images.Worker.csproj +++ b/src/Samples/Sample.Images.Worker/Sample.Images.Worker.csproj @@ -8,11 +8,11 @@ - - - - - + + + + + diff --git a/src/Samples/Sample.Nats.WebApi/Sample.Nats.WebApi.csproj b/src/Samples/Sample.Nats.WebApi/Sample.Nats.WebApi.csproj index 236d723d..561b2c7f 100644 --- a/src/Samples/Sample.Nats.WebApi/Sample.Nats.WebApi.csproj +++ b/src/Samples/Sample.Nats.WebApi/Sample.Nats.WebApi.csproj @@ -7,8 +7,8 @@ - - + + diff --git a/src/Samples/Sample.OutboxWebApi/Sample.OutboxWebApi.csproj b/src/Samples/Sample.OutboxWebApi/Sample.OutboxWebApi.csproj index 641244fc..f6d3de10 100644 --- a/src/Samples/Sample.OutboxWebApi/Sample.OutboxWebApi.csproj +++ b/src/Samples/Sample.OutboxWebApi/Sample.OutboxWebApi.csproj @@ -7,13 +7,13 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/src/Samples/Sample.Serialization.ConsoleApp/Sample.Serialization.ConsoleApp.csproj b/src/Samples/Sample.Serialization.ConsoleApp/Sample.Serialization.ConsoleApp.csproj index b5c7bab4..55f417ca 100644 --- a/src/Samples/Sample.Serialization.ConsoleApp/Sample.Serialization.ConsoleApp.csproj +++ b/src/Samples/Sample.Serialization.ConsoleApp/Sample.Serialization.ConsoleApp.csproj @@ -8,9 +8,9 @@ - - - + + + diff --git a/src/Samples/Sample.Simple.ConsoleApp/Program.cs b/src/Samples/Sample.Simple.ConsoleApp/Program.cs index ee7910f0..08552f5f 100644 --- a/src/Samples/Sample.Simple.ConsoleApp/Program.cs +++ b/src/Samples/Sample.Simple.ConsoleApp/Program.cs @@ -140,14 +140,6 @@ private static void ConfigureMessageBus(MessageBusBuilder mbb, IConfiguration co var consumerGroup = "consoleapp"; var responseGroup = "consoleapp-1"; - if (provider == Provider.Kafka) - { - // Note: We are using the free plan of CloudKarafka to host the Kafka infrastructure. The free plan has a limit on topic you can get free and it requires these topic prefixes. - topicForAddCommand = "4p5ma6io-test-ping"; - topicForMultiplyRequest = "4p5ma6io-multiply-request"; - topicForResponses = "4p5ma6io-responses"; - } - /* Azure Event Hub setup notes: diff --git a/src/Samples/Sample.Simple.ConsoleApp/Sample.Simple.ConsoleApp.csproj b/src/Samples/Sample.Simple.ConsoleApp/Sample.Simple.ConsoleApp.csproj index c98d028d..bf1a5f0e 100644 --- a/src/Samples/Sample.Simple.ConsoleApp/Sample.Simple.ConsoleApp.csproj +++ b/src/Samples/Sample.Simple.ConsoleApp/Sample.Simple.ConsoleApp.csproj @@ -6,14 +6,8 @@ enable - - - Always - - - - + diff --git a/src/Samples/Sample.Simple.ConsoleApp/appsettings.json b/src/Samples/Sample.Simple.ConsoleApp/appsettings.json index aea38f72..dcce6a12 100644 --- a/src/Samples/Sample.Simple.ConsoleApp/appsettings.json +++ b/src/Samples/Sample.Simple.ConsoleApp/appsettings.json @@ -10,8 +10,7 @@ "Kafka": { "Brokers": "{{kafka_brokers}}", "Username": "{{kafka_username}}", - "Password": "{{kafka_password}}", - "Secure": "{{mqtt_secure}}" + "Password": "{{kafka_password}}" }, "Azure": { "EventHub": { diff --git a/src/Samples/Sample.ValidatingWebApi/Sample.ValidatingWebApi.csproj b/src/Samples/Sample.ValidatingWebApi/Sample.ValidatingWebApi.csproj index 4ba1e992..6432a850 100644 --- a/src/Samples/Sample.ValidatingWebApi/Sample.ValidatingWebApi.csproj +++ b/src/Samples/Sample.ValidatingWebApi/Sample.ValidatingWebApi.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/SlimMessageBus.Host.AspNetCore/SlimMessageBus.Host.AspNetCore.csproj b/src/SlimMessageBus.Host.AspNetCore/SlimMessageBus.Host.AspNetCore.csproj index 41b7c173..71110403 100644 --- a/src/SlimMessageBus.Host.AspNetCore/SlimMessageBus.Host.AspNetCore.csproj +++ b/src/SlimMessageBus.Host.AspNetCore/SlimMessageBus.Host.AspNetCore.csproj @@ -8,10 +8,10 @@ icon.png - + - + diff --git a/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj b/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj index 56450fd2..114d18e0 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj +++ b/src/SlimMessageBus.Host.AzureServiceBus/SlimMessageBus.Host.AzureServiceBus.csproj @@ -11,7 +11,7 @@ - + diff --git a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj index a1b9b632..f6114bee 100644 --- a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj +++ b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaCommitController.cs b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaCommitController.cs index 31c902f0..b8ed8204 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaCommitController.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaCommitController.cs @@ -1,6 +1,10 @@ namespace SlimMessageBus.Host.Kafka; public interface IKafkaCommitController -{ +{ + /// + /// The offset of the topic-parition that should be commited onto the consumer group + /// + /// void Commit(TopicPartitionOffset offset); } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaPartitionConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaPartitionConsumer.cs index acd083bd..4c0e22fb 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaPartitionConsumer.cs @@ -1,6 +1,6 @@ namespace SlimMessageBus.Host.Kafka; -using ConsumeResult = Confluent.Kafka.ConsumeResult; +using ConsumeResult = ConsumeResult; /// /// The processor of assigned partition (). diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs index 028a3a3b..81166204 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs @@ -1,4 +1,5 @@ namespace SlimMessageBus.Host.Kafka; + public static class KafkaExtensions { public static TopicPartitionOffset AddOffset(this TopicPartitionOffset topicPartitionOffset, int addOffset) diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs index 9dc3efcf..169b5533 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs @@ -245,11 +245,11 @@ protected virtual void OnStatistics(string json) Logger.LogTrace("Group [{Group}]: Statistics: {statistics}", Group, json); } - #region Implementation of IKafkaCoordinator + #region Implementation of IKafkaCommitController public void Commit(TopicPartitionOffset offset) { - Logger.LogDebug("Group [{Group}]: Commit Offset, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset); + Logger.LogDebug("Group [{Group}]: Commit Offset, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset); _consumer.Commit([offset]); } diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs index 1a8fa618..4e15ca13 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs @@ -168,10 +168,13 @@ public void Commit(TopicPartitionOffset offset) { if (offset != null && (_lastCheckpointOffset == null || offset.Offset > _lastCheckpointOffset.Offset)) { - _logger.LogDebug("Group [{Group}]: Commit at Offset: {Offset}, Partition: {Partition}, Topic: {Topic}", Group, offset.Offset, offset.Partition, offset.Topic); - _lastCheckpointOffset = offset; - _commitController.Commit(offset); + + // See https://github.com/confluentinc/confluent-kafka-dotnet/blob/25f320a672b4324d732304cb4efa2288867b320c/src/Confluent.Kafka/Consumer.cs#L338 + // See https://github.com/confluentinc/confluent-kafka-dotnet/issues/1380#issuecomment-672036089 + // The commit has to have the last processed message + 1 + + _commitController.Commit(offset.AddOffset(1)); CheckpointTrigger?.Reset(); } diff --git a/src/SlimMessageBus.Host.Kafka/SlimMessageBus.Host.Kafka.csproj b/src/SlimMessageBus.Host.Kafka/SlimMessageBus.Host.Kafka.csproj index 389d080d..9c41663c 100644 --- a/src/SlimMessageBus.Host.Kafka/SlimMessageBus.Host.Kafka.csproj +++ b/src/SlimMessageBus.Host.Kafka/SlimMessageBus.Host.Kafka.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/SlimMessageBus.Host.Nats/SlimMessageBus.Host.Nats.csproj b/src/SlimMessageBus.Host.Nats/SlimMessageBus.Host.Nats.csproj index 713f219b..23a24010 100644 --- a/src/SlimMessageBus.Host.Nats/SlimMessageBus.Host.Nats.csproj +++ b/src/SlimMessageBus.Host.Nats/SlimMessageBus.Host.Nats.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/SlimMessageBus.Host.Serialization.Avro/SlimMessageBus.Host.Serialization.Avro.csproj b/src/SlimMessageBus.Host.Serialization.Avro/SlimMessageBus.Host.Serialization.Avro.csproj index 3f29ef64..9d41a782 100644 --- a/src/SlimMessageBus.Host.Serialization.Avro/SlimMessageBus.Host.Serialization.Avro.csproj +++ b/src/SlimMessageBus.Host.Serialization.Avro/SlimMessageBus.Host.Serialization.Avro.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/SlimMessageBus.Host.Serialization.GoogleProtobuf/SlimMessageBus.Host.Serialization.GoogleProtobuf.csproj b/src/SlimMessageBus.Host.Serialization.GoogleProtobuf/SlimMessageBus.Host.Serialization.GoogleProtobuf.csproj index 765357e2..77e9f0a9 100644 --- a/src/SlimMessageBus.Host.Serialization.GoogleProtobuf/SlimMessageBus.Host.Serialization.GoogleProtobuf.csproj +++ b/src/SlimMessageBus.Host.Serialization.GoogleProtobuf/SlimMessageBus.Host.Serialization.GoogleProtobuf.csproj @@ -9,11 +9,11 @@ - + - + diff --git a/src/SlimMessageBus.Host.Serialization.Hybrid/SlimMessageBus.Host.Serialization.Hybrid.csproj b/src/SlimMessageBus.Host.Serialization.Hybrid/SlimMessageBus.Host.Serialization.Hybrid.csproj index cdb5e71c..bc61ff4c 100644 --- a/src/SlimMessageBus.Host.Serialization.Hybrid/SlimMessageBus.Host.Serialization.Hybrid.csproj +++ b/src/SlimMessageBus.Host.Serialization.Hybrid/SlimMessageBus.Host.Serialization.Hybrid.csproj @@ -1,4 +1,4 @@ - + @@ -15,7 +15,7 @@ - + diff --git a/src/SlimMessageBus.Host.Serialization.Json/SlimMessageBus.Host.Serialization.Json.csproj b/src/SlimMessageBus.Host.Serialization.Json/SlimMessageBus.Host.Serialization.Json.csproj index 3ca786c4..6e9ab2e2 100644 --- a/src/SlimMessageBus.Host.Serialization.Json/SlimMessageBus.Host.Serialization.Json.csproj +++ b/src/SlimMessageBus.Host.Serialization.Json/SlimMessageBus.Host.Serialization.Json.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/SlimMessageBus.Host.Serialization.SystemTextJson/SlimMessageBus.Host.Serialization.SystemTextJson.csproj b/src/SlimMessageBus.Host.Serialization.SystemTextJson/SlimMessageBus.Host.Serialization.SystemTextJson.csproj index 9d5021ff..4b0f16ff 100644 --- a/src/SlimMessageBus.Host.Serialization.SystemTextJson/SlimMessageBus.Host.Serialization.SystemTextJson.csproj +++ b/src/SlimMessageBus.Host.Serialization.SystemTextJson/SlimMessageBus.Host.Serialization.SystemTextJson.csproj @@ -10,9 +10,9 @@ - - - + + + diff --git a/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj b/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj index e0e2e5f0..1bbb7a7a 100644 --- a/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj +++ b/src/SlimMessageBus.Host/SlimMessageBus.Host.csproj @@ -13,11 +13,11 @@ - + - + diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index 8c6d7c31..303fee7c 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -119,7 +119,6 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{A6C28448-3839-490C-BE30-580C1A45E225}" ProjectSection(SolutionItems) = preProject .editorconfig = .editorconfig - cloudkarafka_2023-10.pem = cloudkarafka_2023-10.pem Common.NuGet.Properties.xml = Common.NuGet.Properties.xml Common.Properties.xml = Common.Properties.xml ..\CONTRIBUTING.md = ..\CONTRIBUTING.md @@ -277,7 +276,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Nats-SingleNode", "Nats-Sin Samples\Infrastructure\Nats-SingleNode\docker-compose.yml = Samples\Infrastructure\Nats-SingleNode\docker-compose.yml EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.AspNetCore.Test", "Tests\SlimMessageBus.Host.AspNetCore.Test\SlimMessageBus.Host.AspNetCore.Test.csproj", "{9FCBF788-1F0C-43E2-909D-1F96B2685F38}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AspNetCore.Test", "Tests\SlimMessageBus.Host.AspNetCore.Test\SlimMessageBus.Host.AspNetCore.Test.csproj", "{9FCBF788-1F0C-43E2-909D-1F96B2685F38}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/src/Tests/Host.Test.Properties.xml b/src/Tests/Host.Test.Properties.xml index 313f6857..52f790e8 100644 --- a/src/Tests/Host.Test.Properties.xml +++ b/src/Tests/Host.Test.Properties.xml @@ -12,10 +12,10 @@ - + - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/Tests/SlimMessageBus.Host.AspNetCore.Test/SlimMessageBus.Host.AspNetCore.Test.csproj b/src/Tests/SlimMessageBus.Host.AspNetCore.Test/SlimMessageBus.Host.AspNetCore.Test.csproj index 9c28d9b8..35eefc6e 100644 --- a/src/Tests/SlimMessageBus.Host.AspNetCore.Test/SlimMessageBus.Host.AspNetCore.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.AspNetCore.Test/SlimMessageBus.Host.AspNetCore.Test.csproj @@ -3,7 +3,7 @@ - + diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForConsumersTest.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForConsumersTest.cs index 87e27784..e8a61a40 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForConsumersTest.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForConsumersTest.cs @@ -61,7 +61,7 @@ public async Task When_OnPartitionEndReached_Then_ShouldCommit() _subject.Value.OnPartitionEndReached(); // assert - _commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset), Times.Once); + _commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset.AddOffset(1)), Times.Once); } [Fact] @@ -98,7 +98,7 @@ public async Task When_OnMessage_Given_CheckpointTriggerFires_Then_ShouldCommit( await _subject.Value.OnMessage(message3); // assert - _commitControllerMock.Verify(x => x.Commit(message3.TopicPartitionOffset), Times.Once); + _commitControllerMock.Verify(x => x.Commit(message3.TopicPartitionOffset.AddOffset(1)), Times.Once); } private ConsumeResult GetSomeMessage(int offsetAdd = 0) diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs index 4d426782..53ec6a22 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs @@ -46,7 +46,7 @@ public void When_NewInstance_Then_TopicPartitionSet() public async Task When_OnPartitionEndReached_Then_ShouldCommit() { // arrange - var partition = new TopicPartitionOffset(_topicPartition, new Offset(10)); + var messageOffset = new TopicPartitionOffset(_topicPartition, new Offset(10)); var message = GetSomeMessage(); _subject.OnPartitionAssigned(_topicPartition); @@ -56,7 +56,7 @@ public async Task When_OnPartitionEndReached_Then_ShouldCommit() _subject.OnPartitionEndReached(); // assert - _commitControllerMock.Verify(x => x.Commit(partition), Times.Once); + _commitControllerMock.Verify(x => x.Commit(messageOffset.AddOffset(1)), Times.Once); } [Fact] @@ -98,7 +98,7 @@ public async Task When_OnMessage_Given_CheckpointReturnTrue_Then_ShouldCommit() await _subject.OnMessage(message); // assert - _commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset), Times.Once); + _commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset.AddOffset(1)), Times.Once); } [Fact] diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs index e435442c..ec010812 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs @@ -26,28 +26,12 @@ namespace SlimMessageBus.Host.Kafka.Test; public class KafkaMessageBusIt(ITestOutputHelper testOutputHelper) : BaseIntegrationTest(testOutputHelper) { - private const int NumberOfMessages = 77; - private string TopicPrefix { get; set; } - - private static void AddSsl(string username, string password, ClientConfig c) - { - // cloudkarafka.com uses SSL with SASL authentication - c.SecurityProtocol = SecurityProtocol.SaslSsl; - c.SaslUsername = username; - c.SaslPassword = password; - c.SaslMechanism = SaslMechanism.ScramSha256; - c.SslCaLocation = "cloudkarafka_2023-10.pem"; - } + private const int NumberOfMessages = 300; + private readonly static TimeSpan DelayTimeSpan = TimeSpan.FromSeconds(5); protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) { var kafkaBrokers = Secrets.Service.PopulateSecrets(configuration["Kafka:Brokers"]); - var kafkaUsername = Secrets.Service.PopulateSecrets(configuration["Kafka:Username"]); - var kafkaPassword = Secrets.Service.PopulateSecrets(configuration["Kafka:Password"]); - var kafkaSecure = Convert.ToBoolean(Secrets.Service.PopulateSecrets(configuration["Kafka:Secure"])); - - // Topics on cloudkarafka.com are prefixed with username - TopicPrefix = $"{kafkaUsername}-"; services .AddSlimMessageBus((mbb) => @@ -59,12 +43,6 @@ protected override void SetupServices(ServiceCollection services, IConfiguration { config.LingerMs = 5; // 5ms config.SocketNagleDisable = true; - - if (kafkaSecure) - { - AddSsl(kafkaUsername, kafkaPassword, config); - } - }; cfg.ConsumerConfig = (config) => { @@ -72,11 +50,6 @@ protected override void SetupServices(ServiceCollection services, IConfiguration config.SocketNagleDisable = true; // when the test containers start there is no consumer group yet, so we want to start from the beginning config.AutoOffsetReset = AutoOffsetReset.Earliest; - - if (kafkaSecure) - { - AddSsl(kafkaUsername, kafkaPassword, config); - } }; }); mbb.AddServicesFromAssemblyContaining(); @@ -90,13 +63,15 @@ protected override void SetupServices(ServiceCollection services, IConfiguration public IMessageBus MessageBus => ServiceProvider.GetRequiredService(); - [Fact] - public async Task BasicPubSub() + [Theory] + [InlineData(300, 100, 120)] + [InlineData(300, 120, 100)] + public async Task BasicPubSub(int numberOfMessages, int delayProducerAt, int delayConsumerAt) { // arrange AddBusConfiguration(mbb => { - var topic = $"{TopicPrefix}test-ping"; + var topic = "test-ping"; mbb.Produce(x => { x.DefaultTopic(topic); @@ -118,6 +93,7 @@ public async Task BasicPubSub() }); var consumedMessages = ServiceProvider.GetRequiredService>(); + var consumerControl = ServiceProvider.GetRequiredService(); var messageBus = MessageBus; // act @@ -126,26 +102,55 @@ public async Task BasicPubSub() await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5); consumedMessages.Clear(); + var pauseAtOffsets = new HashSet { delayConsumerAt }; + + consumedMessages.OnAdded += (IList messages, ConsumedMessage message) => + { + // At the given index message stop the consumers, to simulate a pause in the processing to check if it resumes exactly from the next message + if (pauseAtOffsets.Contains(message.Message.Counter)) + { + // Remove self to cause only delay once (in case the same message gets repeated) + pauseAtOffsets.Remove(message.Message.Counter); + + consumerControl.Stop().ContinueWith(async (_) => + { + await Task.Delay(DelayTimeSpan); + await consumerControl.Start(); + }); + } + }; + // publish var stopwatch = Stopwatch.StartNew(); var messages = Enumerable - .Range(0, NumberOfMessages) + .Range(0, numberOfMessages) .Select(i => new PingMessage(DateTime.UtcNow, i)) .ToList(); - await Task.WhenAll(messages.Select(m => messageBus.Publish(m))); + var index = 0; + foreach (var m in messages) + { + if (index == delayProducerAt) + { + // We want to force the Partition EOF event to be triggered by Kafka + Logger.LogInformation("Waiting some time before publish to force Partition EOF event (MessageIndex: {MessageIndex})", index); + await Task.Delay(DelayTimeSpan); + } + await messageBus.Publish(m); + index++; + } stopwatch.Stop(); - Logger.LogInformation("Published {MessageCount} messages in {PublishTime}", messages.Count, stopwatch.Elapsed); + Logger.LogInformation("Published {MessageCount} messages in {ProduceTime} including simulated delay {DelayTime}", messages.Count, stopwatch.Elapsed, DelayTimeSpan); // consume stopwatch.Restart(); - await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5); + await consumedMessages.WaitUntilArriving(newMessagesTimeout: 10); stopwatch.Stop(); - Logger.LogInformation("Consumed {MessageCount} messages in {ConsumedTime}", consumedMessages.Count, stopwatch.Elapsed); + Logger.LogInformation("Consumed {MessageCount} messages in {ConsumeTime} including simlulated delay {DelayTime}", consumedMessages.Count, stopwatch.Elapsed, DelayTimeSpan); // assert @@ -174,7 +179,7 @@ public async Task BasicReqResp() AddBusConfiguration(mbb => { - var topic = $"{TopicPrefix}test-echo"; + var topic = "test-echo"; mbb .Produce(x => { @@ -191,7 +196,7 @@ public async Task BasicReqResp() .CheckpointAfter(TimeSpan.FromSeconds(10))) .ExpectRequestResponses(x => { - x.ReplyToTopic($"{TopicPrefix}test-echo-resp"); + x.ReplyToTopic("test-echo-resp"); x.KafkaGroup("response-reader"); // for subsequent test runs allow enough time for kafka to reassign the partitions x.DefaultTimeout(TimeSpan.FromSeconds(60)); diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/SlimMessageBus.Host.Kafka.Test.csproj b/src/Tests/SlimMessageBus.Host.Kafka.Test/SlimMessageBus.Host.Kafka.Test.csproj index 83dccebd..0e3937a0 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/SlimMessageBus.Host.Kafka.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/SlimMessageBus.Host.Kafka.Test.csproj @@ -15,9 +15,6 @@ Always - - Always - PreserveNewest diff --git a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/SlimMessageBus.Host.Memory.Benchmark.csproj b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/SlimMessageBus.Host.Memory.Benchmark.csproj index 1ecab481..14c770d7 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Benchmark/SlimMessageBus.Host.Memory.Benchmark.csproj +++ b/src/Tests/SlimMessageBus.Host.Memory.Benchmark/SlimMessageBus.Host.Memory.Benchmark.csproj @@ -9,7 +9,7 @@ - + diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs index ca7c45f3..924fb70a 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs @@ -15,16 +15,6 @@ public class OutboxTests(ITestOutputHelper testOutputHelper) : BaseOutboxIntegra protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) { - static void AddKafkaSsl(string username, string password, ClientConfig c) - { - // cloudkarafka.com uses SSL with SASL authentication - c.SecurityProtocol = SecurityProtocol.SaslSsl; - c.SaslUsername = username; - c.SaslPassword = password; - c.SaslMechanism = SaslMechanism.ScramSha256; - c.SslCaLocation = "cloudkarafka_2023-10.pem"; - } - void ConfigureExternalBus(MessageBusBuilder mbb) { var topic = "test-ping"; @@ -33,7 +23,6 @@ void ConfigureExternalBus(MessageBusBuilder mbb) var kafkaBrokers = Secrets.Service.PopulateSecrets(configuration["Kafka:Brokers"]); var kafkaUsername = Secrets.Service.PopulateSecrets(configuration["Kafka:Username"]); var kafkaPassword = Secrets.Service.PopulateSecrets(configuration["Kafka:Password"]); - var kafkaSecure = bool.TryParse(Secrets.Service.PopulateSecrets(configuration["Kafka:Secure"]), out var secure) && secure; mbb.WithProviderKafka(cfg => { @@ -42,11 +31,6 @@ void ConfigureExternalBus(MessageBusBuilder mbb) { config.LingerMs = 5; // 5ms config.SocketNagleDisable = true; - - if (kafkaSecure) - { - AddKafkaSsl(kafkaUsername, kafkaPassword, config); - } }; cfg.ConsumerConfig = (config) => { @@ -54,15 +38,8 @@ void ConfigureExternalBus(MessageBusBuilder mbb) config.SocketNagleDisable = true; // when the test containers start there is no consumer group yet, so we want to start from the beginning config.AutoOffsetReset = AutoOffsetReset.Earliest; - - if (kafkaSecure) - { - AddKafkaSsl(kafkaUsername, kafkaPassword, config); - } }; }); - // Topics on cloudkarafka.com are prefixed with username - topic = $"{kafkaUsername}-test-ping"; } if (_testParamBusType == BusType.AzureSB) { diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/SlimMessageBus.Host.Outbox.DbContext.Test.csproj b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/SlimMessageBus.Host.Outbox.DbContext.Test.csproj index b3cebfc2..91f43ffb 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/SlimMessageBus.Host.Outbox.DbContext.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/SlimMessageBus.Host.Outbox.DbContext.Test.csproj @@ -21,9 +21,6 @@ - - Always - Always true diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj index c6a10cb4..bee621cc 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SlimMessageBus.Host.Outbox.Sql.Test.csproj @@ -11,10 +11,7 @@ - - - all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Tests/SlimMessageBus.Host.Serialization.Avro.Test/SlimMessageBus.Host.Serialization.Avro.Test.csproj b/src/Tests/SlimMessageBus.Host.Serialization.Avro.Test/SlimMessageBus.Host.Serialization.Avro.Test.csproj index 16ba83ae..47390bd8 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.Avro.Test/SlimMessageBus.Host.Serialization.Avro.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Serialization.Avro.Test/SlimMessageBus.Host.Serialization.Avro.Test.csproj @@ -3,7 +3,7 @@ - + diff --git a/src/Tests/SlimMessageBus.Host.Serialization.Benchmark/SlimMessageBus.Host.Serialization.Benchmark.csproj b/src/Tests/SlimMessageBus.Host.Serialization.Benchmark/SlimMessageBus.Host.Serialization.Benchmark.csproj index 07f5b34f..c64abeca 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.Benchmark/SlimMessageBus.Host.Serialization.Benchmark.csproj +++ b/src/Tests/SlimMessageBus.Host.Serialization.Benchmark/SlimMessageBus.Host.Serialization.Benchmark.csproj @@ -19,10 +19,7 @@ - - - all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Tests/SlimMessageBus.Host.Serialization.GoogleProtobuf.Test/SlimMessageBus.Host.Serialization.GoogleProtobuf.Test.csproj b/src/Tests/SlimMessageBus.Host.Serialization.GoogleProtobuf.Test/SlimMessageBus.Host.Serialization.GoogleProtobuf.Test.csproj index 4760ffb3..d4b0734f 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.GoogleProtobuf.Test/SlimMessageBus.Host.Serialization.GoogleProtobuf.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Serialization.GoogleProtobuf.Test/SlimMessageBus.Host.Serialization.GoogleProtobuf.Test.csproj @@ -7,12 +7,12 @@ - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + @@ -20,10 +20,7 @@ - - - all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Tests/SlimMessageBus.Host.Serialization.Hybrid.Test/SlimMessageBus.Host.Serialization.Hybrid.Test.csproj b/src/Tests/SlimMessageBus.Host.Serialization.Hybrid.Test/SlimMessageBus.Host.Serialization.Hybrid.Test.csproj index 5006b25a..43ce03b1 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.Hybrid.Test/SlimMessageBus.Host.Serialization.Hybrid.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Serialization.Hybrid.Test/SlimMessageBus.Host.Serialization.Hybrid.Test.csproj @@ -3,7 +3,7 @@ - + diff --git a/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/SlimMessageBus.Host.Serialization.Json.Test.csproj b/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/SlimMessageBus.Host.Serialization.Json.Test.csproj index f2111720..5d5363ae 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/SlimMessageBus.Host.Serialization.Json.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Serialization.Json.Test/SlimMessageBus.Host.Serialization.Json.Test.csproj @@ -3,7 +3,7 @@ - + diff --git a/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/SlimMessageBus.Host.Serialization.SystemTextJson.Test.csproj b/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/SlimMessageBus.Host.Serialization.SystemTextJson.Test.csproj index 2ee68a7d..eae29eb1 100644 --- a/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/SlimMessageBus.Host.Serialization.SystemTextJson.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Serialization.SystemTextJson.Test/SlimMessageBus.Host.Serialization.SystemTextJson.Test.csproj @@ -3,7 +3,7 @@ - + diff --git a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/TestEventCollector.cs b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/TestEventCollector.cs index 09c53278..02d98706 100644 --- a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/TestEventCollector.cs +++ b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/TestEventCollector.cs @@ -2,17 +2,20 @@ public class TestEventCollector { - private readonly IList list = new List(); + private readonly IList list = []; private bool isStarted = false; public bool IsStarted => isStarted; + public event Action, T>? OnAdded; + public void Add(T item) { lock (list) { list.Add(item); + OnAdded?.Invoke(list, item); } } diff --git a/src/Tests/SlimMessageBus.Host.Test.Common/SlimMessageBus.Host.Test.Common.csproj b/src/Tests/SlimMessageBus.Host.Test.Common/SlimMessageBus.Host.Test.Common.csproj index 3d1b99fc..2d190d20 100644 --- a/src/Tests/SlimMessageBus.Host.Test.Common/SlimMessageBus.Host.Test.Common.csproj +++ b/src/Tests/SlimMessageBus.Host.Test.Common/SlimMessageBus.Host.Test.Common.csproj @@ -8,11 +8,11 @@ - - - + + + - + @@ -22,10 +22,7 @@ - - - all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/cloudkarafka_2023-10.pem b/src/cloudkarafka_2023-10.pem deleted file mode 100644 index 3188acc7..00000000 --- a/src/cloudkarafka_2023-10.pem +++ /dev/null @@ -1,22 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDjjCCAnagAwIBAgIQAzrx5qcRqaC7KGSxHQn65TANBgkqhkiG9w0BAQsFADBh -MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3 -d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBH -MjAeFw0xMzA4MDExMjAwMDBaFw0zODAxMTUxMjAwMDBaMGExCzAJBgNVBAYTAlVT -MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j -b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IEcyMIIBIjANBgkqhkiG -9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuzfNNNx7a8myaJCtSnX/RrohCgiN9RlUyfuI -2/Ou8jqJkTx65qsGGmvPrC3oXgkkRLpimn7Wo6h+4FR1IAWsULecYxpsMNzaHxmx -1x7e/dfgy5SDN67sH0NO3Xss0r0upS/kqbitOtSZpLYl6ZtrAGCSYP9PIUkY92eQ -q2EGnI/yuum06ZIya7XzV+hdG82MHauVBJVJ8zUtluNJbd134/tJS7SsVQepj5Wz -tCO7TG1F8PapspUwtP1MVYwnSlcUfIKdzXOS0xZKBgyMUNGPHgm+F6HmIcr9g+UQ -vIOlCsRnKPZzFBQ9RnbDhxSJITRNrw9FDKZJobq7nMWxM4MphQIDAQABo0IwQDAP -BgNVHRMBAf8EBTADAQH/MA4GA1UdDwEB/wQEAwIBhjAdBgNVHQ4EFgQUTiJUIBiV -5uNu5g/6+rkS7QYXjzkwDQYJKoZIhvcNAQELBQADggEBAGBnKJRvDkhj6zHd6mcY -1Yl9PMWLSn/pvtsrF9+wX3N3KjITOYFnQoQj8kVnNeyIv/iPsGEMNKSuIEyExtv4 -NeF22d+mQrvHRAiGfzZ0JFrabA0UWTW98kndth/Jsw1HKj2ZL7tcu7XUIOGZX1NG -Fdtom/DzMNU+MeKNhJ7jitralj41E6Vf8PlwUHBHQRFXGU7Aj64GxJUTFy8bJZ91 -8rGOmaFvE7FBcf6IKshPECBV1/MUReXgRPTqh5Uykw7+U0b6LJ3/iyK5S9kJRaTe -pLiaWN0bfVKfjllDiIGknibVb63dDcY3fe0Dkhvld1927jyNxF1WW6LZZm6zNTfl -MrY= ------END CERTIFICATE-----