Skip to content

Commit

Permalink
Host.Kafka] Support for High-Throughput Publishing to Kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 2, 2024
1 parent 50d91cb commit afd8a4a
Show file tree
Hide file tree
Showing 26 changed files with 365 additions and 228 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ on:
pull_request_target:
branches: ["master", "devops/*"]
workflow_dispatch:
branches: ["master", "release/*", "feature/*"]

permissions:
contents: read
Expand Down
85 changes: 54 additions & 31 deletions docs/provider_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Configuration properties](#configuration-properties)
- [Minimizing message latency](#minimizing-message-latency)
- [SSL and password authentication](#ssl-and-password-authentication)
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
- [Default partitioner with message key](#default-partitioner-with-message-key)
- [Assigning partition explicitly](#assigning-partition-explicitly)
- [Consumer context](#consumer-context)
- [Producers](#producers)
- [High throughput publish](#high-throughput-publish)
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
- [Default partitioner with message key](#default-partitioner-with-message-key)
- [Assigning partition explicitly](#assigning-partition-explicitly)
- [Message Headers](#message-headers)
- [Consumers](#consumers)
- [Consumer context](#consumer-context)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
Expand All @@ -29,7 +31,7 @@ When troubleshooting or fine tuning it is worth reading the `librdkafka` and `co

## Configuration properties

Producer, consumer and global configuration properties are described [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
Producer, consumer and global configuration properties are described [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
The configuration on the underlying Kafka client can be adjusted like so:

```cs
Expand All @@ -53,7 +55,7 @@ services.AddSlimMessageBus(mbb =>

### Minimizing message latency

There is a good description [here](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:
There is a good description [here](https://github.com/confluentinc/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:

```cs
services.AddSlimMessageBus(mbb =>
Expand Down Expand Up @@ -119,12 +121,32 @@ private static void AddSsl(string username, string password, ClientConfig c)

The file `cloudkarafka_2020-12.ca` has to be set to `Copy to Output Directory` as `Copy always`.

## Selecting message partition for topic producer
## Producers

### High throughput publish

By default each [.Publish()](../src/SlimMessageBus/IPublishBus.cs) / [.Send()](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs) is producing the message to the Kafka transport and awaiting the response.
This is to ensure the errors in delivery to the Kafka transport are reported as [ProducerMessageBusException](../src/SlimMessageBus/Exceptions/ProducerMessageBusException.cs) and ensuring delivery to the kafka cluster.

However, for scenarios where we want higher throughput with the sacrifice of delivery we can use the `.EnableProduceAwait(false)` on the producer or bus configuration.
When await is disabled the message will be delivered to the Kafka client without awaiting the produce result, and the client's internal buffering will be used more effectively.

```cs
mbb.Produce<PingMessage>(x =>
{
x.DefaultTopic(topic);
// Partition #0 - for even counters, and #1 - for odd counters
x.PartitionProvider((m, t) => m.Counter % 2);
x.EnableProduceAwait(enableProduceAwait);
});
```

### Selecting message partition for topic producer

Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message?
There are two possible options:

### Default partitioner with message key
#### Default partitioner with message key

Currently, [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) does not support custom partitioners (see [here](https://github.com/confluentinc/confluent-kafka-dotnet/issues/343)).
The default partitioner is supported, which works in this way:
Expand All @@ -148,7 +170,7 @@ mbb

The key must be a `byte[]`.

### Assigning partition explicitly
#### Assigning partition explicitly

SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example:

Expand All @@ -167,33 +189,13 @@ mbb

With this approach your provider needs to know the number of partitions for a topic.

## Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
// SMB Kafka transport specific extension:
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}
```

This could be useful to extract the message's offset or partition.

## Message Headers

SMB uses headers to pass additional metadata information with the message. This includes the `MessageType` (of type `string`) or in the case of request/response messages the `RequestId` (of type `string`), `ReplyTo` (of type `string`) and `Expires` (of type `long`).

The Kafka message header values are natively binary (`byte[]`) in the underlying .NET client, as a result SMB needs to serialize the header values.
By default the [DefaultKafkaHeaderSerializer](../src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs) is used to serialize header values.
If you need to specify a different serializer provide a specfic `IMessageSerializer` implementation (custom or one of the available serialization plugins):
If you need to specify a different serializer provide a specific `IMessageSerializer` implementation (custom or one of the available serialization plugins):

```cs
// MessageBusBuilder mbb;
Expand All @@ -209,9 +211,30 @@ mbb
## Consumers

### Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
// SMB Kafka transport specific extension:
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}
```

This could be useful to extract the message's offset or partition.

### Offset Commit

In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer. This configuration is controlled through the following methods on the consumer builder:
In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th
is configuration is controlled through the following methods on the consumer builder:

- `CheckpointEvery(int)` – Commits the offset after a specified number of processed messages.
- `CheckpointAfter(TimeSpan)` – Commits the offset after a specified time interval.
Expand Down
70 changes: 42 additions & 28 deletions docs/provider_kafka.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Configuration properties](#configuration-properties)
- [Minimizing message latency](#minimizing-message-latency)
- [SSL and password authentication](#ssl-and-password-authentication)
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
- [Default partitioner with message key](#default-partitioner-with-message-key)
- [Assigning partition explicitly](#assigning-partition-explicitly)
- [Consumer context](#consumer-context)
- [Producers](#producers)
- [High throughput publish](#high-throughput-publish)
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
- [Default partitioner with message key](#default-partitioner-with-message-key)
- [Assigning partition explicitly](#assigning-partition-explicitly)
- [Message Headers](#message-headers)
- [Consumers](#consumers)
- [Consumer context](#consumer-context)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
Expand Down Expand Up @@ -119,12 +121,24 @@ private static void AddSsl(string username, string password, ClientConfig c)

The file `cloudkarafka_2020-12.ca` has to be set to `Copy to Output Directory` as `Copy always`.

## Selecting message partition for topic producer
## Producers

### High throughput publish

By default each [.Publish()](../src/SlimMessageBus/IPublishBus.cs) / [.Send()](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs) is producing the message to the Kafka transport and awaiting the response.
This is to ensure the errors in delivery to the Kafka transport are reported as [ProducerMessageBusException](../src/SlimMessageBus/Exceptions/ProducerMessageBusException.cs) and ensuring delivery to the kafka cluster.

However, for scenarios where we want higher throughput with the sacrifice of delivery we can use the `.EnableProduceAwait(false)` on the producer or bus configuration.
When await is disabled the message will be delivered to the Kafka client without awaiting the produce result, and the client's internal buffering will be used more effectively.

@[:cs](../src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs,ExampleEnableProduceAwait)

### Selecting message partition for topic producer

Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message?
There are two possible options:

### Default partitioner with message key
#### Default partitioner with message key

Currently, [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) does not support custom partitioners (see [here](https://github.com/confluentinc/confluent-kafka-dotnet/issues/343)).
The default partitioner is supported, which works in this way:
Expand All @@ -148,7 +162,7 @@ mbb

The key must be a `byte[]`.

### Assigning partition explicitly
#### Assigning partition explicitly

SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example:

Expand All @@ -167,33 +181,13 @@ mbb

With this approach your provider needs to know the number of partitions for a topic.

## Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
// SMB Kafka transport specific extension:
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}
```

This could be useful to extract the message's offset or partition.

## Message Headers

SMB uses headers to pass additional metadata information with the message. This includes the `MessageType` (of type `string`) or in the case of request/response messages the `RequestId` (of type `string`), `ReplyTo` (of type `string`) and `Expires` (of type `long`).

The Kafka message header values are natively binary (`byte[]`) in the underlying .NET client, as a result SMB needs to serialize the header values.
By default the [DefaultKafkaHeaderSerializer](../src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs) is used to serialize header values.
If you need to specify a different serializer provide a specfic `IMessageSerializer` implementation (custom or one of the available serialization plugins):
If you need to specify a different serializer provide a specific `IMessageSerializer` implementation (custom or one of the available serialization plugins):

```cs
// MessageBusBuilder mbb;
Expand All @@ -209,6 +203,26 @@ mbb
## Consumers

### Consumer context

The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
{
// SMB Kafka transport specific extension:
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}
```

This could be useful to extract the message's offset or partition.

### Offset Commit

In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th
Expand Down
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.5.4-rc2</Version>
<Version>2.6.0-rc1</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
namespace SlimMessageBus.Host;

public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder
public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder, IConsumerBuilder
{
public MessageBusSettings Settings { get; }

public ConsumerSettings ConsumerSettings { get; }

AbstractConsumerSettings IAbstractConsumerBuilder.ConsumerSettings => ConsumerSettings;

HasProviderExtensions IBuilderWithSettings.Settings => ConsumerSettings;

protected AbstractConsumerBuilder(MessageBusSettings settings, Type messageType, string path = null)
{
Settings = settings ?? throw new ArgumentNullException(nameof(settings));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace SlimMessageBus.Host;

public class ConsumerBuilder<T> : AbstractConsumerBuilder
public class ConsumerBuilder<T> : AbstractConsumerBuilder, IConsumerBuilder
{
HasProviderExtensions IBuilderWithSettings.Settings => ConsumerSettings;

public ConsumerBuilder(MessageBusSettings settings, Type messageType = null)
: base(settings, messageType ?? typeof(T))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SlimMessageBus.Host;

public interface IBuilderWithSettings
{
HasProviderExtensions Settings { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace SlimMessageBus.Host;

public interface IConsumerBuilder : IBuilderWithSettings
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace SlimMessageBus.Host;

public interface IProducerBuilder : IBuilderWithSettings
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace SlimMessageBus.Host;

using Microsoft.Extensions.DependencyInjection.Extensions;

public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder
public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder, IProducerBuilder
{
/// <summary>
/// Parent bus builder.
Expand All @@ -17,15 +17,18 @@ public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBui
/// <summary>
/// The current settings that are being built.
/// </summary>
public MessageBusSettings Settings { get; private set; } = new();
public MessageBusSettings Settings { get; private set; } = new();

HasProviderExtensions IBuilderWithSettings.Settings => Settings;

/// <summary>
/// The bus factory method.
/// </summary>
public Func<MessageBusSettings, IMessageBusProvider> BusFactory { get; private set; }

public IList<Action<IServiceCollection>> PostConfigurationActions { get; } = [];



protected MessageBusBuilder()
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
namespace SlimMessageBus.Host;

public class ProducerBuilder<T>
public class ProducerBuilder<T> : IProducerBuilder
{
public ProducerSettings Settings { get; }

public Type MessageType => Settings.MessageType;


HasProviderExtensions IBuilderWithSettings.Settings => Settings;

public ProducerBuilder(ProducerSettings settings)
: this(settings, typeof(T))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
namespace SlimMessageBus.Host;

public class RequestResponseBuilder : IAbstractConsumerBuilder
public class RequestResponseBuilder : IAbstractConsumerBuilder, IConsumerBuilder
{
public RequestResponseSettings Settings { get; }

public AbstractConsumerSettings ConsumerSettings => Settings;

HasProviderExtensions IBuilderWithSettings.Settings => Settings;

public RequestResponseBuilder(RequestResponseSettings settings)
{
Settings = settings;
Expand Down
Loading

0 comments on commit afd8a4a

Please sign in to comment.