From afd8a4a09bfa57354f34b1e60428ceee9f8110da Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Thu, 31 Oct 2024 00:34:04 +0100 Subject: [PATCH] Host.Kafka] Support for High-Throughput Publishing to Kafka Signed-off-by: Tomasz Maruszak --- .github/workflows/build.yml | 1 - docs/provider_kafka.md | 85 ++++++++------ docs/provider_kafka.t.md | 70 +++++++----- src/Host.Plugin.Properties.xml | 2 +- .../Builders/AbstractConsumerBuilder.cs | 4 +- .../Builders/ConsumerBuilder.cs | 4 +- .../Builders/IBuilderWithSettings.cs | 6 + .../Builders/IConsumerBuilder.cs | 5 + .../Builders/IProducerBuilder.cs | 5 + .../Builders/MessageBusBuilder.cs | 9 +- .../Builders/ProducerBuilder.cs | 6 +- .../Builders/RequestResponseBuilder.cs | 4 +- .../SlimMessageBus.Host.Configuration.csproj | 2 +- .../Configs/BuilderExtensions.cs | 104 ------------------ .../KafkaAbstractConsumerBuilderExtensions.cs | 68 ++++++++++++ .../Configs/KafkaDelegates.cs | 5 + .../Configs/KafkaProducerBuilderExtensions.cs | 26 ++++- .../KafkaProducerSettingsExtensions.cs | 15 +-- .../Consumer/KafkaExtensions.cs | 2 +- .../Consumer/KafkaPartitionConsumer.cs | 2 +- .../KafkaMessageBus.cs | 59 ++++++---- ...afkaMessageBusSettingsValidationService.cs | 6 +- src/SlimMessageBus.sln | 1 + .../KafkaProducerBuilderExtensionsTest.cs | 4 +- .../KafkaMessageBusIt.cs | 19 +++- .../KafkaMessageBusTest.cs | 79 +++++++++++-- 26 files changed, 365 insertions(+), 228 deletions(-) create mode 100644 src/SlimMessageBus.Host.Configuration/Builders/IBuilderWithSettings.cs create mode 100644 src/SlimMessageBus.Host.Configuration/Builders/IConsumerBuilder.cs create mode 100644 src/SlimMessageBus.Host.Configuration/Builders/IProducerBuilder.cs delete mode 100644 src/SlimMessageBus.Host.Kafka/Configs/BuilderExtensions.cs create mode 100644 src/SlimMessageBus.Host.Kafka/Configs/KafkaAbstractConsumerBuilderExtensions.cs create mode 100644 src/SlimMessageBus.Host.Kafka/Configs/KafkaDelegates.cs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index df4da77e..e47c8269 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -6,7 +6,6 @@ on: pull_request_target: branches: ["master", "devops/*"] workflow_dispatch: - branches: ["master", "release/*", "feature/*"] permissions: contents: read diff --git a/docs/provider_kafka.md b/docs/provider_kafka.md index e6fc15cc..d91aae74 100644 --- a/docs/provider_kafka.md +++ b/docs/provider_kafka.md @@ -6,12 +6,14 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Configuration properties](#configuration-properties) - [Minimizing message latency](#minimizing-message-latency) - [SSL and password authentication](#ssl-and-password-authentication) -- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer) - - [Default partitioner with message key](#default-partitioner-with-message-key) - - [Assigning partition explicitly](#assigning-partition-explicitly) -- [Consumer context](#consumer-context) +- [Producers](#producers) + - [High throughput publish](#high-throughput-publish) + - [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer) + - [Default partitioner with message key](#default-partitioner-with-message-key) + - [Assigning partition explicitly](#assigning-partition-explicitly) - [Message Headers](#message-headers) - [Consumers](#consumers) + - [Consumer context](#consumer-context) - [Offset Commit](#offset-commit) - [Consumer Error Handling](#consumer-error-handling) - [Debugging](#debugging) @@ -29,7 +31,7 @@ When troubleshooting or fine tuning it is worth reading the `librdkafka` and `co ## Configuration properties -Producer, consumer and global configuration properties are described [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). +Producer, consumer and global configuration properties are described [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). The configuration on the underlying Kafka client can be adjusted like so: ```cs @@ -53,7 +55,7 @@ services.AddSlimMessageBus(mbb => ### Minimizing message latency -There is a good description [here](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus: +There is a good description [here](https://github.com/confluentinc/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus: ```cs services.AddSlimMessageBus(mbb => @@ -119,12 +121,32 @@ private static void AddSsl(string username, string password, ClientConfig c) The file `cloudkarafka_2020-12.ca` has to be set to `Copy to Output Directory` as `Copy always`. -## Selecting message partition for topic producer +## Producers + +### High throughput publish + +By default each [.Publish()](../src/SlimMessageBus/IPublishBus.cs) / [.Send()](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs) is producing the message to the Kafka transport and awaiting the response. +This is to ensure the errors in delivery to the Kafka transport are reported as [ProducerMessageBusException](../src/SlimMessageBus/Exceptions/ProducerMessageBusException.cs) and ensuring delivery to the kafka cluster. + +However, for scenarios where we want higher throughput with the sacrifice of delivery we can use the `.EnableProduceAwait(false)` on the producer or bus configuration. +When await is disabled the message will be delivered to the Kafka client without awaiting the produce result, and the client's internal buffering will be used more effectively. + +```cs +mbb.Produce(x => +{ + x.DefaultTopic(topic); + // Partition #0 - for even counters, and #1 - for odd counters + x.PartitionProvider((m, t) => m.Counter % 2); + x.EnableProduceAwait(enableProduceAwait); +}); +``` + +### Selecting message partition for topic producer Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message? There are two possible options: -### Default partitioner with message key +#### Default partitioner with message key Currently, [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) does not support custom partitioners (see [here](https://github.com/confluentinc/confluent-kafka-dotnet/issues/343)). The default partitioner is supported, which works in this way: @@ -148,7 +170,7 @@ mbb The key must be a `byte[]`. -### Assigning partition explicitly +#### Assigning partition explicitly SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example: @@ -167,33 +189,13 @@ mbb With this approach your provider needs to know the number of partitions for a topic. -## Consumer context - -The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message: - -```cs -public class PingConsumer : IConsumer, IConsumerWithContext -{ - public IConsumerContext Context { get; set; } - - public Task OnHandle(PingMessage message) - { - // SMB Kafka transport specific extension: - var transportMessage = Context.GetTransportMessage(); - var partition = transportMessage.TopicPartition.Partition; - } -} -``` - -This could be useful to extract the message's offset or partition. - ## Message Headers SMB uses headers to pass additional metadata information with the message. This includes the `MessageType` (of type `string`) or in the case of request/response messages the `RequestId` (of type `string`), `ReplyTo` (of type `string`) and `Expires` (of type `long`). The Kafka message header values are natively binary (`byte[]`) in the underlying .NET client, as a result SMB needs to serialize the header values. By default the [DefaultKafkaHeaderSerializer](../src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs) is used to serialize header values. -If you need to specify a different serializer provide a specfic `IMessageSerializer` implementation (custom or one of the available serialization plugins): +If you need to specify a different serializer provide a specific `IMessageSerializer` implementation (custom or one of the available serialization plugins): ```cs // MessageBusBuilder mbb; @@ -209,9 +211,30 @@ mbb ## Consumers +### Consumer context + +The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message: + +```cs +public class PingConsumer : IConsumer, IConsumerWithContext +{ + public IConsumerContext Context { get; set; } + + public Task OnHandle(PingMessage message) + { + // SMB Kafka transport specific extension: + var transportMessage = Context.GetTransportMessage(); + var partition = transportMessage.TopicPartition.Partition; + } +} +``` + +This could be useful to extract the message's offset or partition. + ### Offset Commit -In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer. This configuration is controlled through the following methods on the consumer builder: +In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th +is configuration is controlled through the following methods on the consumer builder: - `CheckpointEvery(int)` – Commits the offset after a specified number of processed messages. - `CheckpointAfter(TimeSpan)` – Commits the offset after a specified time interval. diff --git a/docs/provider_kafka.t.md b/docs/provider_kafka.t.md index 56e2fde3..0c5a7e30 100644 --- a/docs/provider_kafka.t.md +++ b/docs/provider_kafka.t.md @@ -6,12 +6,14 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Configuration properties](#configuration-properties) - [Minimizing message latency](#minimizing-message-latency) - [SSL and password authentication](#ssl-and-password-authentication) -- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer) - - [Default partitioner with message key](#default-partitioner-with-message-key) - - [Assigning partition explicitly](#assigning-partition-explicitly) -- [Consumer context](#consumer-context) +- [Producers](#producers) + - [High throughput publish](#high-throughput-publish) + - [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer) + - [Default partitioner with message key](#default-partitioner-with-message-key) + - [Assigning partition explicitly](#assigning-partition-explicitly) - [Message Headers](#message-headers) - [Consumers](#consumers) + - [Consumer context](#consumer-context) - [Offset Commit](#offset-commit) - [Consumer Error Handling](#consumer-error-handling) - [Debugging](#debugging) @@ -119,12 +121,24 @@ private static void AddSsl(string username, string password, ClientConfig c) The file `cloudkarafka_2020-12.ca` has to be set to `Copy to Output Directory` as `Copy always`. -## Selecting message partition for topic producer +## Producers + +### High throughput publish + +By default each [.Publish()](../src/SlimMessageBus/IPublishBus.cs) / [.Send()](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs) is producing the message to the Kafka transport and awaiting the response. +This is to ensure the errors in delivery to the Kafka transport are reported as [ProducerMessageBusException](../src/SlimMessageBus/Exceptions/ProducerMessageBusException.cs) and ensuring delivery to the kafka cluster. + +However, for scenarios where we want higher throughput with the sacrifice of delivery we can use the `.EnableProduceAwait(false)` on the producer or bus configuration. +When await is disabled the message will be delivered to the Kafka client without awaiting the produce result, and the client's internal buffering will be used more effectively. + +@[:cs](../src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs,ExampleEnableProduceAwait) + +### Selecting message partition for topic producer Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message? There are two possible options: -### Default partitioner with message key +#### Default partitioner with message key Currently, [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) does not support custom partitioners (see [here](https://github.com/confluentinc/confluent-kafka-dotnet/issues/343)). The default partitioner is supported, which works in this way: @@ -148,7 +162,7 @@ mbb The key must be a `byte[]`. -### Assigning partition explicitly +#### Assigning partition explicitly SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example: @@ -167,33 +181,13 @@ mbb With this approach your provider needs to know the number of partitions for a topic. -## Consumer context - -The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message: - -```cs -public class PingConsumer : IConsumer, IConsumerWithContext -{ - public IConsumerContext Context { get; set; } - - public Task OnHandle(PingMessage message) - { - // SMB Kafka transport specific extension: - var transportMessage = Context.GetTransportMessage(); - var partition = transportMessage.TopicPartition.Partition; - } -} -``` - -This could be useful to extract the message's offset or partition. - ## Message Headers SMB uses headers to pass additional metadata information with the message. This includes the `MessageType` (of type `string`) or in the case of request/response messages the `RequestId` (of type `string`), `ReplyTo` (of type `string`) and `Expires` (of type `long`). The Kafka message header values are natively binary (`byte[]`) in the underlying .NET client, as a result SMB needs to serialize the header values. By default the [DefaultKafkaHeaderSerializer](../src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs) is used to serialize header values. -If you need to specify a different serializer provide a specfic `IMessageSerializer` implementation (custom or one of the available serialization plugins): +If you need to specify a different serializer provide a specific `IMessageSerializer` implementation (custom or one of the available serialization plugins): ```cs // MessageBusBuilder mbb; @@ -209,6 +203,26 @@ mbb ## Consumers +### Consumer context + +The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message: + +```cs +public class PingConsumer : IConsumer, IConsumerWithContext +{ + public IConsumerContext Context { get; set; } + + public Task OnHandle(PingMessage message) + { + // SMB Kafka transport specific extension: + var transportMessage = Context.GetTransportMessage(); + var partition = transportMessage.TopicPartition.Partition; + } +} +``` + +This could be useful to extract the message's offset or partition. + ### Offset Commit In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index f7976ef1..b8c74f13 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ - 2.5.4-rc2 + 2.6.0-rc1 \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs index f573805b..3ee05187 100644 --- a/src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs +++ b/src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs @@ -1,6 +1,6 @@ namespace SlimMessageBus.Host; -public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder +public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder, IConsumerBuilder { public MessageBusSettings Settings { get; } @@ -8,6 +8,8 @@ public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder AbstractConsumerSettings IAbstractConsumerBuilder.ConsumerSettings => ConsumerSettings; + HasProviderExtensions IBuilderWithSettings.Settings => ConsumerSettings; + protected AbstractConsumerBuilder(MessageBusSettings settings, Type messageType, string path = null) { Settings = settings ?? throw new ArgumentNullException(nameof(settings)); diff --git a/src/SlimMessageBus.Host.Configuration/Builders/ConsumerBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/ConsumerBuilder.cs index 4572e7a6..a83c2358 100644 --- a/src/SlimMessageBus.Host.Configuration/Builders/ConsumerBuilder.cs +++ b/src/SlimMessageBus.Host.Configuration/Builders/ConsumerBuilder.cs @@ -1,7 +1,9 @@ namespace SlimMessageBus.Host; -public class ConsumerBuilder : AbstractConsumerBuilder +public class ConsumerBuilder : AbstractConsumerBuilder, IConsumerBuilder { + HasProviderExtensions IBuilderWithSettings.Settings => ConsumerSettings; + public ConsumerBuilder(MessageBusSettings settings, Type messageType = null) : base(settings, messageType ?? typeof(T)) { diff --git a/src/SlimMessageBus.Host.Configuration/Builders/IBuilderWithSettings.cs b/src/SlimMessageBus.Host.Configuration/Builders/IBuilderWithSettings.cs new file mode 100644 index 00000000..66bf3ca3 --- /dev/null +++ b/src/SlimMessageBus.Host.Configuration/Builders/IBuilderWithSettings.cs @@ -0,0 +1,6 @@ +namespace SlimMessageBus.Host; + +public interface IBuilderWithSettings +{ + HasProviderExtensions Settings { get; } +} diff --git a/src/SlimMessageBus.Host.Configuration/Builders/IConsumerBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/IConsumerBuilder.cs new file mode 100644 index 00000000..d629f380 --- /dev/null +++ b/src/SlimMessageBus.Host.Configuration/Builders/IConsumerBuilder.cs @@ -0,0 +1,5 @@ +namespace SlimMessageBus.Host; + +public interface IConsumerBuilder : IBuilderWithSettings +{ +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Configuration/Builders/IProducerBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/IProducerBuilder.cs new file mode 100644 index 00000000..7f419503 --- /dev/null +++ b/src/SlimMessageBus.Host.Configuration/Builders/IProducerBuilder.cs @@ -0,0 +1,5 @@ +namespace SlimMessageBus.Host; + +public interface IProducerBuilder : IBuilderWithSettings +{ +} diff --git a/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs index c99a469f..21e0dd13 100644 --- a/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs +++ b/src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs @@ -2,7 +2,7 @@ namespace SlimMessageBus.Host; using Microsoft.Extensions.DependencyInjection.Extensions; -public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder +public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder, IProducerBuilder { /// /// Parent bus builder. @@ -17,7 +17,9 @@ public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBui /// /// The current settings that are being built. /// - public MessageBusSettings Settings { get; private set; } = new(); + public MessageBusSettings Settings { get; private set; } = new(); + + HasProviderExtensions IBuilderWithSettings.Settings => Settings; /// /// The bus factory method. @@ -25,7 +27,8 @@ public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBui public Func BusFactory { get; private set; } public IList> PostConfigurationActions { get; } = []; - + + protected MessageBusBuilder() { } diff --git a/src/SlimMessageBus.Host.Configuration/Builders/ProducerBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/ProducerBuilder.cs index e0c7be82..710e0c55 100644 --- a/src/SlimMessageBus.Host.Configuration/Builders/ProducerBuilder.cs +++ b/src/SlimMessageBus.Host.Configuration/Builders/ProducerBuilder.cs @@ -1,11 +1,13 @@ namespace SlimMessageBus.Host; -public class ProducerBuilder +public class ProducerBuilder : IProducerBuilder { public ProducerSettings Settings { get; } public Type MessageType => Settings.MessageType; - + + HasProviderExtensions IBuilderWithSettings.Settings => Settings; + public ProducerBuilder(ProducerSettings settings) : this(settings, typeof(T)) { diff --git a/src/SlimMessageBus.Host.Configuration/Builders/RequestResponseBuilder.cs b/src/SlimMessageBus.Host.Configuration/Builders/RequestResponseBuilder.cs index 835b4c70..10ae1d58 100644 --- a/src/SlimMessageBus.Host.Configuration/Builders/RequestResponseBuilder.cs +++ b/src/SlimMessageBus.Host.Configuration/Builders/RequestResponseBuilder.cs @@ -1,11 +1,13 @@ namespace SlimMessageBus.Host; -public class RequestResponseBuilder : IAbstractConsumerBuilder +public class RequestResponseBuilder : IAbstractConsumerBuilder, IConsumerBuilder { public RequestResponseSettings Settings { get; } public AbstractConsumerSettings ConsumerSettings => Settings; + HasProviderExtensions IBuilderWithSettings.Settings => Settings; + public RequestResponseBuilder(RequestResponseSettings settings) { Settings = settings; diff --git a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj index fe027109..e273e21a 100644 --- a/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj +++ b/src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj @@ -6,7 +6,7 @@ Core configuration interfaces of SlimMessageBus SlimMessageBus SlimMessageBus.Host - 2.5.2-rc1 + 2.6.0-rc1 diff --git a/src/SlimMessageBus.Host.Kafka/Configs/BuilderExtensions.cs b/src/SlimMessageBus.Host.Kafka/Configs/BuilderExtensions.cs deleted file mode 100644 index 90faf50c..00000000 --- a/src/SlimMessageBus.Host.Kafka/Configs/BuilderExtensions.cs +++ /dev/null @@ -1,104 +0,0 @@ -namespace SlimMessageBus.Host.Kafka; - -public static class BuilderExtensions -{ - /// - /// Configures the Kafka consumer group. - /// - /// - /// - /// - [Obsolete("Use KafkaGroup() instead")] - public static T Group(this T builder, string group) where T : AbstractConsumerBuilder - => builder.KafkaGroup(group); - - public static T KafkaGroup(this T builder, string group) where T : AbstractConsumerBuilder - { - if (builder == null) throw new ArgumentNullException(nameof(builder)); - - builder.ConsumerSettings.SetGroup(group); - return builder; - } - - /// - /// Configures the Kafka consumer group. - /// - /// - /// - /// - [Obsolete("Use KafkaGroup() instead")] - public static RequestResponseBuilder Group(this RequestResponseBuilder builder, string group) - => builder.KafkaGroup(group); - - /// - /// Configures the Kafka consumer group. - /// - /// - /// - /// - public static RequestResponseBuilder KafkaGroup(this RequestResponseBuilder builder, string group) - { - if (builder == null) throw new ArgumentNullException(nameof(builder)); - - builder.Settings.SetGroup(group); - return builder; - } - - /// - /// Checkpoint every N-th processed message. - /// - /// - /// - /// - public static T CheckpointEvery(this T builder, int numberOfMessages) - where T : AbstractConsumerBuilder - { - if (builder == null) throw new ArgumentNullException(nameof(builder)); - - builder.ConsumerSettings.Properties[CheckpointSettings.CheckpointCount] = numberOfMessages; - return builder; - } - - /// - /// Checkpoint after T elapsed time. - /// - /// - /// - /// - public static T CheckpointAfter(this T builder, TimeSpan duration) - where T : AbstractConsumerBuilder - { - if (builder == null) throw new ArgumentNullException(nameof(builder)); - - builder.ConsumerSettings.Properties[CheckpointSettings.CheckpointDuration] = duration; - return builder; - } - - /// - /// Checkpoint every N-th processed message. - /// - /// - /// - /// - public static RequestResponseBuilder CheckpointEvery(this RequestResponseBuilder builder, int numberOfMessages) - { - if (builder == null) throw new ArgumentNullException(nameof(builder)); - - builder.Settings.Properties[CheckpointSettings.CheckpointCount] = numberOfMessages; - return builder; - } - - /// - /// Checkpoint after T elapsed time. - /// - /// - /// - /// - public static RequestResponseBuilder CheckpointAfter(this RequestResponseBuilder builder, TimeSpan duration) - { - if (builder == null) throw new ArgumentNullException(nameof(builder)); - - builder.Settings.Properties[CheckpointSettings.CheckpointDuration] = duration; - return builder; - } -} diff --git a/src/SlimMessageBus.Host.Kafka/Configs/KafkaAbstractConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.Kafka/Configs/KafkaAbstractConsumerBuilderExtensions.cs new file mode 100644 index 00000000..368f4e8e --- /dev/null +++ b/src/SlimMessageBus.Host.Kafka/Configs/KafkaAbstractConsumerBuilderExtensions.cs @@ -0,0 +1,68 @@ +namespace SlimMessageBus.Host.Kafka; + +using System.Diagnostics.CodeAnalysis; + +public static class KafkaAbstractConsumerBuilderExtensions +{ + /// + /// Configures the Kafka consumer group. + /// + /// + /// + /// + [Obsolete("Use KafkaGroup() instead")] + [ExcludeFromCodeCoverage] + public static T Group(this T builder, string group) + where T : IAbstractConsumerBuilder + => builder.KafkaGroup(group); + + public static T KafkaGroup(this T builder, string group) + where T : IAbstractConsumerBuilder + { + if (builder == null) throw new ArgumentNullException(nameof(builder)); + + builder.ConsumerSettings.SetGroup(group); + return builder; + } + + /// + /// Configures the Kafka consumer group. + /// + /// + /// + /// + [Obsolete("Use KafkaGroup() instead")] + [ExcludeFromCodeCoverage] + public static RequestResponseBuilder Group(this RequestResponseBuilder builder, string group) + => builder.KafkaGroup(group); + + /// + /// Checkpoint every N-th processed message. + /// + /// + /// + /// + public static T CheckpointEvery(this T builder, int numberOfMessages) + where T : IAbstractConsumerBuilder + { + if (builder == null) throw new ArgumentNullException(nameof(builder)); + + builder.ConsumerSettings.Properties[CheckpointSettings.CheckpointCount] = numberOfMessages; + return builder; + } + + /// + /// Checkpoint after T elapsed time. + /// + /// + /// + /// + public static T CheckpointAfter(this T builder, TimeSpan duration) + where T : IAbstractConsumerBuilder + { + if (builder == null) throw new ArgumentNullException(nameof(builder)); + + builder.ConsumerSettings.Properties[CheckpointSettings.CheckpointDuration] = duration; + return builder; + } +} diff --git a/src/SlimMessageBus.Host.Kafka/Configs/KafkaDelegates.cs b/src/SlimMessageBus.Host.Kafka/Configs/KafkaDelegates.cs new file mode 100644 index 00000000..e426b3d1 --- /dev/null +++ b/src/SlimMessageBus.Host.Kafka/Configs/KafkaDelegates.cs @@ -0,0 +1,5 @@ +namespace SlimMessageBus.Host.Kafka; + +public delegate byte[] KafkaKeyProvider(T message, string topic); + +public delegate int KafkaPartitionProvider(T message, string topic); diff --git a/src/SlimMessageBus.Host.Kafka/Configs/KafkaProducerBuilderExtensions.cs b/src/SlimMessageBus.Host.Kafka/Configs/KafkaProducerBuilderExtensions.cs index 1fd7e8c7..aa396c03 100644 --- a/src/SlimMessageBus.Host.Kafka/Configs/KafkaProducerBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Kafka/Configs/KafkaProducerBuilderExtensions.cs @@ -10,12 +10,12 @@ public static class KafkaProducerBuilderExtensions /// Delegate to determine the key for an message. Parameter meaning: (message, topic) => key. /// Ensure the implementation is thread-safe. /// - public static ProducerBuilder KeyProvider(this ProducerBuilder builder, Func keyProvider) + public static ProducerBuilder KeyProvider(this ProducerBuilder builder, KafkaKeyProvider keyProvider) { Assert.IsNotNull(keyProvider, () => new ConfigurationMessageBusException("Null value provided")); byte[] UntypedProvider(object message, string topic) => keyProvider((T)message, topic); - builder.Settings.Properties[KafkaProducerSettingsExtensions.KeyProviderKey] = (Func)UntypedProvider; + builder.Settings.Properties[KafkaProducerSettingsExtensions.KeyProviderKey] = (KafkaKeyProvider)UntypedProvider; return builder; } @@ -27,13 +27,29 @@ public static ProducerBuilder KeyProvider(this ProducerBuilder builder, /// Delegate to determine the partition number for an message. Parameter meaning: (message, topic) => partition. /// Ensure the implementation is thread-safe. /// - public static ProducerBuilder PartitionProvider(this ProducerBuilder builder, Func partitionProvider) + public static ProducerBuilder PartitionProvider(this ProducerBuilder builder, KafkaPartitionProvider partitionProvider) { Assert.IsNotNull(partitionProvider, () => new ConfigurationMessageBusException("Null value provided")); int UntypedProvider(object message, string topic) => partitionProvider((T)message, topic); - builder.Settings.Properties[KafkaProducerSettingsExtensions.PartitionProviderKey] = (Func)UntypedProvider; + builder.Settings.Properties[KafkaProducerSettingsExtensions.PartitionProviderKey] = (KafkaPartitionProvider)UntypedProvider; return builder; } - + + /// + /// Enables (or disables) awaiting for the message delivery result during producing of the message to the Kafka topic. + /// Allows to increase the throughput of the producer, but may lead to message loss in case of message delivery failures. + /// Internally the kafka driver will buffer the messages and deliver them in batches. + /// By default this is enabled. + /// + /// + /// + /// + /// + public static TBuilder EnableProduceAwait(this TBuilder builder, bool enable = true) + where TBuilder : IProducerBuilder + { + builder.Settings.Properties[KafkaProducerSettingsExtensions.EnableProduceAwaitKey] = enable; + return builder; + } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Configs/KafkaProducerSettingsExtensions.cs b/src/SlimMessageBus.Host.Kafka/Configs/KafkaProducerSettingsExtensions.cs index e611979b..48209318 100644 --- a/src/SlimMessageBus.Host.Kafka/Configs/KafkaProducerSettingsExtensions.cs +++ b/src/SlimMessageBus.Host.Kafka/Configs/KafkaProducerSettingsExtensions.cs @@ -4,14 +4,11 @@ public static class KafkaProducerSettingsExtensions { internal const string KeyProviderKey = "Kafka_KeyProvider"; internal const string PartitionProviderKey = "Kafka_PartitionProvider"; + internal const string EnableProduceAwaitKey = "Kafka_AwaitProduce"; - public static Func GetKeyProvider(this ProducerSettings ps) - { - return ps.GetOrDefault>(KeyProviderKey, null); - } + public static KafkaKeyProvider GetKeyProvider(this ProducerSettings ps) + => ps.GetOrDefault>(KeyProviderKey, null); - public static Func GetPartitionProvider(this ProducerSettings ps) - { - return ps.GetOrDefault>(PartitionProviderKey, null); - } -} \ No newline at end of file + public static KafkaPartitionProvider GetPartitionProvider(this ProducerSettings ps) + => ps.GetOrDefault>(PartitionProviderKey, null); +} diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs index 81166204..dad5c503 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs @@ -1,6 +1,6 @@ namespace SlimMessageBus.Host.Kafka; -public static class KafkaExtensions +internal static class KafkaExtensions { public static TopicPartitionOffset AddOffset(this TopicPartitionOffset topicPartitionOffset, int addOffset) => new(topicPartitionOffset.TopicPartition, topicPartitionOffset.Offset + addOffset); diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs index 4e15ca13..d4b73d02 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs @@ -45,7 +45,7 @@ private ICheckpointTrigger CreateCheckpointTrigger() { var f = new CheckpointTriggerFactory( LoggerFactory, - (configuredCheckpoints) => $"The checkpoint settings ({nameof(BuilderExtensions.CheckpointAfter)} and {nameof(BuilderExtensions.CheckpointEvery)}) across all the consumers that use the same Topic {TopicPartition.Topic} and Group {Group} must be the same (found settings are: {string.Join(", ", configuredCheckpoints)})"); + (configuredCheckpoints) => $"The checkpoint settings ({nameof(KafkaAbstractConsumerBuilderExtensions.CheckpointAfter)} and {nameof(KafkaAbstractConsumerBuilderExtensions.CheckpointEvery)}) across all the consumers that use the same Topic {TopicPartition.Topic} and Group {Group} must be the same (found settings are: {string.Join(", ", configuredCheckpoints)})"); return f.Create(ConsumerSettings); } diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs index 0a6c5580..3c042303 100644 --- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs +++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs @@ -129,16 +129,16 @@ protected override async Task> ProduceToTranspor // calculate message key var key = GetMessageKey(producerSettings, messageType, envelope.Message, path); - var kafkaMessage = new Message { Key = key, Value = messagePayload }; + var transportMessage = new Message { Key = key, Value = messagePayload }; if (envelope.Headers != null && envelope.Headers.Count > 0) { - kafkaMessage.Headers = []; + transportMessage.Headers = []; foreach (var keyValue in envelope.Headers) { var valueBytes = HeaderSerializer.Serialize(typeof(object), keyValue.Value); - kafkaMessage.Headers.Add(keyValue.Key, valueBytes); + transportMessage.Headers.Add(keyValue.Key, valueBytes); } } @@ -154,36 +154,55 @@ protected override async Task> ProduceToTranspor partition, key?.Length ?? 0, messagePayload?.Length ?? 0, - kafkaMessage.Headers?.Count ?? 0); + transportMessage.Headers?.Count ?? 0); - // send the message to topic - var task = partition == NoPartition - ? _producer.ProduceAsync(path, kafkaMessage, cancellationToken: cancellationToken) - : _producer.ProduceAsync(new TopicPartition(path, new Partition(partition)), kafkaMessage, cancellationToken: cancellationToken); + var topicPartition = partition == NoPartition ? null : new TopicPartition(path, partition); - // ToDo: Introduce support for not awaited produce - - var deliveryResult = await task.ConfigureAwait(false); - if (deliveryResult.Status == PersistenceStatus.NotPersisted) + // check if we should await for the message delivery result, by default true + var awaitProduceResult = producerSettings?.GetOrDefault(KafkaProducerSettingsExtensions.EnableProduceAwaitKey, Settings, true) ?? true; + if (awaitProduceResult) { - throw new ProducerMessageBusException($"Error while publish message {envelope.Message} of type {messageType?.Name} to topic {path}. Kafka persistence status: {deliveryResult.Status}"); + // send the message to topic and await result + var task = topicPartition == null + ? _producer.ProduceAsync(path, transportMessage, cancellationToken: cancellationToken) + : _producer.ProduceAsync(topicPartition, transportMessage, cancellationToken: cancellationToken); + + var deliveryResult = await task.ConfigureAwait(false); + if (deliveryResult.Status == PersistenceStatus.NotPersisted) + { + throw new ProducerMessageBusException($"Error while publish message {envelope.Message} of type {messageType?.Name} to topic {path}. Kafka persistence status: {deliveryResult.Status}"); + } + + // log some debug information + _logger.LogDebug("Message {Message} of type {MessageType} delivered to topic {Topic}, partition {Partition}, offset: {Offset}", + envelope.Message, messageType?.Name, deliveryResult.Topic, deliveryResult.Partition, deliveryResult.Offset); } + else + { + // send the message to topic and dont await result (potential message loss) + if (topicPartition == null) + { + _producer.Produce(path, transportMessage); + } + else + { + _producer.Produce(topicPartition, transportMessage); + } + // log some debug information + _logger.LogDebug("Message {Message} of type {MessageType} sent to topic {Topic} (result is not awaited)", + envelope.Message, messageType?.Name, path); + } dispatched.Add(envelope); - - // log some debug information - _logger.LogDebug("Message {Message} of type {MessageType} delivered to topic {Topic}, partition {Partition}, offset: {Offset}", - envelope.Message, messageType?.Name, deliveryResult.Topic, deliveryResult.Partition, deliveryResult.Offset); } } catch (Exception ex) { return new(dispatched, ex); } - return new(dispatched, null); - } - + } + protected byte[] GetMessageKey(ProducerSettings producerSettings, Type messageType, object message, string topic) { var keyProvider = producerSettings?.GetKeyProvider(); diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBusSettingsValidationService.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBusSettingsValidationService.cs index 58df38c8..cb420d5f 100644 --- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBusSettingsValidationService.cs +++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBusSettingsValidationService.cs @@ -24,7 +24,7 @@ protected override void AssertConsumer(ConsumerSettings consumerSettings) if (consumerSettings.GetGroup() == null) { - ThrowConsumerFieldNotSet(consumerSettings, nameof(BuilderExtensions.KafkaGroup)); + ThrowConsumerFieldNotSet(consumerSettings, nameof(KafkaAbstractConsumerBuilderExtensions.KafkaGroup)); } } @@ -36,12 +36,12 @@ protected override void AssertRequestResponseSettings() { if (Settings.RequestResponse.GetGroup() == null) { - ThrowRequestResponseFieldNotSet(nameof(BuilderExtensions.KafkaGroup)); + ThrowRequestResponseFieldNotSet(nameof(KafkaAbstractConsumerBuilderExtensions.KafkaGroup)); } if (Settings.Consumers.Any(x => x.GetGroup() == Settings.RequestResponse.GetGroup() && x.Path == Settings.RequestResponse.Path)) { - ThrowRequestResponseFieldNotSet(nameof(BuilderExtensions.KafkaGroup), "cannot use topic that is already being used by a consumer"); + ThrowRequestResponseFieldNotSet(nameof(KafkaAbstractConsumerBuilderExtensions.KafkaGroup), "cannot use topic that is already being used by a consumer"); } } } diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index 303fee7c..607ecccd 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -252,6 +252,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Infrastructure", "Infrastru EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{1A71BB05-58ED-4B27-B4A4-A03D9E608C1C}" ProjectSection(SolutionItems) = preProject + ..\.github\workflows\build.yml = ..\.github\workflows\build.yml ..\build\do_build.ps1 = ..\build\do_build.ps1 ..\build\do_package.ps1 = ..\build\do_package.ps1 ..\build\do_push_local.ps1 = ..\build\do_push_local.ps1 diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Config/KafkaProducerBuilderExtensionsTest.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Config/KafkaProducerBuilderExtensionsTest.cs index 6a05e076..0e3e14e2 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/Config/KafkaProducerBuilderExtensionsTest.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Config/KafkaProducerBuilderExtensionsTest.cs @@ -18,7 +18,7 @@ public void WhenKeyProviderThenCreatesUntypedWrapper() var message = new SomeMessage(); var messageKey = new byte[] { 1, 2 }; - var keyProviderMock = new Mock>(); + var keyProviderMock = new Mock>(); keyProviderMock.Setup(x => x(message, "topic1")).Returns(messageKey); // act @@ -36,7 +36,7 @@ public void WhenPartitionProviderThenCreatesUntypedWrapper() // arrange var message = new SomeMessage(); - var partitionProviderMock = new Mock>(); + var partitionProviderMock = new Mock>(); partitionProviderMock.Setup(x => x(message, "topic1")).Returns(1); // act diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs index b5cdc38e..48b731f4 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs @@ -64,21 +64,25 @@ protected override void SetupServices(ServiceCollection services, IConfiguration public IMessageBus MessageBus => ServiceProvider.GetRequiredService(); [Theory] - [InlineData(300, 100, 120)] - [InlineData(300, 120, 100)] - public async Task BasicPubSub(int numberOfMessages, int delayProducerAt, int delayConsumerAt) + [InlineData(300, 100, 120, true)] + [InlineData(300, 120, 100, true)] + [InlineData(300, 100, 120, false)] + [InlineData(300, 120, 100, false)] + public async Task BasicPubSub(int numberOfMessages, int delayProducerAt, int delayConsumerAt, bool enableProduceAwait) { // arrange AddBusConfiguration(mbb => { var topic = "test-ping"; + // doc:fragment:ExampleEnableProduceAwait mbb.Produce(x => { x.DefaultTopic(topic); - // Partition #0 for even counters - // Partition #1 for odd counters - x.PartitionProvider((m, t) => m.Counter % 2); + // Partition #0 - for even counters, and #1 - for odd counters + x.PartitionProvider((m, t) => m.Counter % 2); + x.EnableProduceAwait(enableProduceAwait); }); + // doc:fragment:ExampleEnableProduceAwait // doc:fragment:ExampleCheckpointConfig mbb.Consume(x => { @@ -90,6 +94,9 @@ public async Task BasicPubSub(int numberOfMessages, int delayProducerAt, int del .CheckpointAfter(TimeSpan.FromSeconds(600)); }); // doc:fragment:ExampleCheckpointConfig + + // or + //mbb.EnableProduceAwait(enableProduceAwait); }); var consumedMessages = ServiceProvider.GetRequiredService>(); diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusTest.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusTest.cs index 759ea2e3..a9780f00 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusTest.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusTest.cs @@ -1,22 +1,33 @@ -namespace SlimMessageBus.Host.Kafka.Test; +namespace SlimMessageBus.Host.Kafka.Test; + +using System.Linq.Expressions; +using System.Threading.Tasks; + +using Confluent.Kafka; public class KafkaMessageBusTest : IDisposable -{ +{ + private readonly Mock> _producerBuilderMock; + private readonly Mock> _producerMock; + private MessageBusSettings MbSettings { get; } private KafkaMessageBusSettings KafkaMbSettings { get; } private Lazy KafkaMb { get; } public KafkaMessageBusTest() { - var producerMock = new Mock>(); - producerMock.SetupGet(x => x.Name).Returns("Producer Name"); + _producerMock = new Mock>(); + _producerMock.SetupGet(x => x.Name).Returns("Producer Name"); - var producerBuilderMock = new Mock>(new ProducerConfig()); - producerBuilderMock.Setup(x => x.Build()).Returns(producerMock.Object); + _producerBuilderMock = new Mock>(new ProducerConfig()); + _producerBuilderMock.Setup(x => x.Build()).Returns(_producerMock.Object); + + var messageSerializerMock = new Mock(); var serviceProviderMock = new Mock(); serviceProviderMock.Setup(x => x.GetService(typeof(ILogger))).CallBase(); serviceProviderMock.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(new AssemblyQualifiedNameMessageTypeResolver()); + serviceProviderMock.Setup(x => x.GetService(typeof(IMessageSerializer))).Returns(messageSerializerMock.Object); MbSettings = new MessageBusSettings { @@ -24,7 +35,7 @@ public KafkaMessageBusTest() }; KafkaMbSettings = new KafkaMessageBusSettings("host") { - ProducerBuilderFactory = (config) => producerBuilderMock.Object + ProducerBuilderFactory = (config) => _producerBuilderMock.Object }; KafkaMb = new Lazy(() => new WrappedKafkaMessageBus(MbSettings, KafkaMbSettings)); } @@ -89,6 +100,60 @@ public void GetMessagePartition() // assert msgAPartition.Should().Be(10); msgBPartition.Should().Be(-1); + } + + [Theory] + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(false, false)] + public async Task When_Publish_Given_EnableAwaitProduce_Then_UsesSyncOrAsyncProduceVersionAsync(bool enableAwaitProduce, bool withPartitionProvider) + { + // arrange + var topic = "topic1"; + var producer = new ProducerSettings(); + var producerBuilder = new ProducerBuilder(producer) + .DefaultTopic(topic) + .EnableProduceAwait(enableAwaitProduce); + + var partitionNumber = 10; + if (withPartitionProvider) + { + producerBuilder.PartitionProvider((m, t) => partitionNumber); + } + + MbSettings.Producers.Add(producer); + + var cancellationToken = new CancellationTokenSource().Token; + var msg = new SomeMessage(); + + var deliveryReport = new DeliveryResult { Status = PersistenceStatus.Persisted }; + _producerMock + .Setup(x => x.ProduceAsync(It.IsAny(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(deliveryReport); + _producerMock + .Setup(x => x.ProduceAsync(It.IsAny(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(deliveryReport); + + // act + await KafkaMb.Value.ProducePublish(msg, cancellationToken: cancellationToken); + + // assert + Expression>> exp; + if (enableAwaitProduce) + { + exp = withPartitionProvider + ? x => x.ProduceAsync(It.Is(tp => tp.Partition == partitionNumber && tp.Topic == topic), It.IsAny>(), It.IsAny()) + : x => x.ProduceAsync(topic, It.IsAny>(), It.IsAny()); + } + else + { + exp = withPartitionProvider + ? x => x.Produce(It.Is(tp => tp.Partition == partitionNumber && tp.Topic == topic), It.IsAny>(), null) + : x => x.Produce(topic, It.IsAny>(), null); + } + _producerMock.Verify(exp, Times.Once); + _producerMock.VerifyNoOtherCalls(); } class MessageA