Skip to content

Commit

Permalink
Pre 1.0 tidying w/ minor defaulting changes (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored May 22, 2019
1 parent 0381810 commit 47e561b
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 71 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- each configuration DSL now has a `customize` function to admit post-processing after defaults and `custom` have taken effect [#29](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/29)
- Producer/Consumer both have an `Inner` to enable custom logic [#29](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/29)

### Changed

- default auto-commit interval dropped from 10s to 5s (which is the `Confluent.Kafka` default) [#29](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/29)

### Removed
### Fixed

Expand All @@ -18,7 +25,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Fixed

# Make custom parameters in consumer config a seq once again [#28](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/28) [@szer](https://github.com/Szer)
- Make custom parameters in consumer config a seq once again [#28](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/28) [@szer](https://github.com/Szer)

<a name="1.0.0-rc7"></a>
## [1.0.0-rc7] - 2019-05-16
Expand Down
159 changes: 89 additions & 70 deletions src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,59 @@ module private Config =

/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings
[<NoComparison>]
type KafkaProducerConfig private (conf, cfgs, broker : Uri, compression : CompressionType, acks : Acks) =
member val Conf : ProducerConfig = conf
member val Acks = acks
member val Broker = broker
member val Compression = compression
member __.Kvps = Seq.append conf cfgs

/// Creates a Kafka producer instance with supplied configuration
type KafkaProducerConfig private (inner, broker : Uri) =
member __.Inner : ProducerConfig = inner
member __.Broker = broker

member __.Acks = let v = inner.Acks in v.Value
member __.MaxInFlight = let v = inner.MaxInFlight in v.Value
member __.Compression = let v = inner.CompressionType in v.GetValueOrDefault(CompressionType.None)

/// Creates and wraps a Confluent.Kafka ProducerConfig with the specified settings
static member Create
( clientId : string, broker : Uri, acks,
/// Message compression. Defaults to None
/// Message compression. Defaults to None.
?compression,
/// Maximum in-flight requests; <> 1 implies potential reordering of writes should a batch fail and then succeed in a subsequent retry. Defaults to 1.
/// Maximum in-flight requests. Default: 1_000_000.
/// NB <> 1 implies potential reordering of writes should a batch fail and then succeed in a subsequent retry
?maxInFlight,
/// Number of retries. Defaults to 60.
/// Time to wait for other items to be produced before sending a batch. Confluent.Kafka default: 0ms. Default 0ms
/// NB the linger setting alone does provide any hard guarantees; see KafkaBatchProducer.CreateWithBatchingOverrides
?linger : TimeSpan,
/// Number of retries. Confluent.Kafka default: 2. Default: 60.
?retries,
/// Backoff interval. Defaults to 1 second.
/// Backoff interval. Confluent.Kafka default: 100ms. Default: 1s.
?retryBackoff,
/// Statistics Interval. Defaults to no stats.
/// Statistics Interval. Default: no stats.
?statisticsInterval,
/// Defaults to true.
/// Confluent.Kafka default: false. Defaults to true.
?socketKeepAlive,
/// Defaults to 10 ms.
?linger : TimeSpan,
/// Defaults to 'consistent_random'.
/// Partition algorithm. Default: `ConsistentRandom`.
?partitioner,
/// Misc configuration parameter to be passed to the underlying CK producer.
?custom) =
let compression = defaultArg compression CompressionType.None
/// Miscellaneous configuration parameters to be passed to the underlying Confluent.Kafka producer configuration.
?custom,
/// Postprocesses the ProducerConfig after the rest of the rules have been applied
?customize) =
let c =
ProducerConfig(
ClientId = clientId, BootstrapServers = Config.validateBrokerUri broker,
RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000),
MessageSendMaxRetries = Nullable (defaultArg retries 60),
RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), // CK default 100ms
MessageSendMaxRetries = Nullable (defaultArg retries 60), // default 2
Acks = Nullable acks,
CompressionType = Nullable compression,
LingerMs = Nullable (match linger with Some t -> int t.TotalMilliseconds | None -> 10),
SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true),
Partitioner = Nullable (defaultArg partitioner Partitioner.ConsistentRandom),
MaxInFlight = Nullable (defaultArg maxInFlight 1000000),
LingerMs = Nullable (match linger with Some t -> int t.TotalMilliseconds | None -> 10), // default 0
SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true), // default: false
LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
statisticsInterval |> Option.iter (fun (i : TimeSpan) -> c.StatisticsIntervalMs <- Nullable (int i.TotalMilliseconds))
KafkaProducerConfig(c, defaultArg custom Seq.empty, broker, compression, acks)

type KafkaProducer private (log: ILogger, inner : IProducer<string, string>, topic : string) =
maxInFlight |> Option.iter (fun x -> c.MaxInFlight <- Nullable x) // default 1_000_000
partitioner |> Option.iter (fun x -> c.Partitioner <- x)
compression |> Option.iter (fun x -> c.CompressionType <- Nullable x)
statisticsInterval |> Option.iter<TimeSpan> (fun x -> c.StatisticsIntervalMs <- Nullable (int x.TotalMilliseconds))
custom |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v))
customize |> Option.iter (fun f -> f c)
KafkaProducerConfig(c, broker)

/// Creates and wraps a Confluent.Kafka Producer with the supplied configuration
type KafkaProducer private (log, inner : IProducer<string, string>, topic : string) =
member __.Inner = inner
member __.Topic = topic

interface IDisposable with member __.Dispose() = inner.Dispose()
Expand All @@ -78,13 +86,24 @@ type KafkaProducer private (log: ILogger, inner : IProducer<string, string>, top
member __.ProduceAsync(key, value) : Async<DeliveryResult<_,_>>= async {
return! inner.ProduceAsync(topic, Message<_,_>(Key=key, Value=value)) |> Async.AwaitTaskCorrect }

static member Create(log : ILogger, config : KafkaProducerConfig, topic : string): KafkaProducer =
if String.IsNullOrEmpty topic then nullArg "topic"
log.Information("Producing... {broker} / {topic} compression={compression} maxInFlight={maxInFlight} acks={acks} lingerMs={linger}",
config.Broker, topic, config.Compression, config.MaxInFlight, config.Acks, (let l = config.Inner.LingerMs in l.Value))
let p =
ProducerBuilder<string, string>(config.Inner)
.SetLogHandler(fun _p m -> log.Information("{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
.SetErrorHandler(fun _p e -> log.Error("{reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
.Build()
new KafkaProducer(log, p, topic)

/// Produces a batch of supplied key/value messages. Results are returned in order of writing (which may vary from order of submission).
/// <throws>
/// 1. if there is an immediate local config issue
/// 2. upon receipt of the first failed `DeliveryReport` (NB without waiting for any further reports) </throws>
/// 2. upon receipt of the first failed `DeliveryReport` (NB without waiting for any further reports, which can potentially leave some results in doubt should a 'batch' get split) </throws>
/// <remarks>
/// Note that the delivery and/or write order may vary from the supplied order (unless you drop `maxInFlight` down to 1, massively constraining throughput).
/// Thus it's important to note that supplying >1 item into the queue bearing the same key risks them being written to the topic out of order. <remarks/>
/// Note that the delivery and/or write order may vary from the supplied order unless `maxInFlight` is 1 (which massively constrains throughput).
/// Thus it's important to note that supplying >1 item into the queue bearing the same key without maxInFlight=1 risks them being written out of order onto the topic.<remarks/>
member __.ProduceBatch(keyValueBatch : (string * string)[]) = async {
if Array.isEmpty keyValueBatch then return [||] else

Expand All @@ -111,19 +130,9 @@ type KafkaProducer private (log: ILogger, inner : IProducer<string, string>, top
log.Debug("Produced {count}",!numCompleted)
return! Async.AwaitTaskCorrect tcs.Task }

static member Create(log : ILogger, config : KafkaProducerConfig, topic : string) =
if String.IsNullOrEmpty topic then nullArg "topic"
log.Information("Producing... {broker} / {topic} compression={compression} acks={acks}", config.Broker, topic, config.Compression, config.Acks)
let producer =
ProducerBuilder<string, string>(config.Kvps)
.SetLogHandler(fun _p m -> log.Information("{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
.SetErrorHandler(fun _p e -> log.Error("{reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
.Build()
new KafkaProducer(log, producer, topic)

type ConsumerBufferingConfig = { minInFlightBytes : int64; maxInFlightBytes : int64; maxBatchSize : int; maxBatchDelay : TimeSpan }
type ConsumerBufferingConfig = { minInFlightBytes : int64; maxInFlightBytes : int64; maxBatchSize : int; maxBatchDelay : TimeSpan }

module private ConsumerImpl =
module private ConsumerImpl =
/// guesstimate approximate message size in bytes
let approximateMessageBytes (message : ConsumeResult<string, string>) =
let inline len (x:string) = match x with null -> 0 | x -> sizeof<char> * x.Length
Expand Down Expand Up @@ -253,26 +262,27 @@ type KafkaProducer private (log: ILogger, inner : IProducer<string, string>, top

/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings
[<NoComparison>]
type KafkaConsumerConfig = private { conf: ConsumerConfig; custom: seq<KeyValuePair<string,string>>; topics: string list; buffering: ConsumerBufferingConfig } with
member __.Kvps = Seq.append __.conf __.custom
type KafkaConsumerConfig = private { inner: ConsumerConfig; topics: string list; buffering: ConsumerBufferingConfig } with
/// Builds a Kafka Consumer Config suitable for KafkaConsumer.Start*
static member Create
( /// Identify this consumer in logs etc
clientId, broker : Uri, topics,
/// Consumer group identifier.
groupId,
/// Default Earliest.
/// Specifies handling when Consumer Group does not yet have an offset recorded. Confluent.Kafka default: start from Latest. Default: start from Earliest.
?autoOffsetReset,
/// Default 100kB.
?fetchMaxBytes,
/// Default 10B.
/// Minimum number of bytes to wait for (subject to timeout with default of 100ms). Default 1B.
?fetchMinBytes,
/// Stats reporting interval for the consumer in ms. By default, the reporting is turned off.
/// Stats reporting interval for the consumer. Default: no reporting.
?statisticsInterval,
/// Consumed offsets commit interval. Default 10s. (WAS 1s)
/// Consumed offsets commit interval. Default 5s.
?offsetCommitInterval,
/// Misc configuration parameter to be passed to the underlying CK consumer.
?custom,
/// Postprocesses the ConsumerConfig after the rest of the rules have been applied
?customize,

(* Client side batching *)

Expand All @@ -284,19 +294,21 @@ type KafkaConsumerConfig = private { conf: ConsumerConfig; custom: seq<KeyValueP
?minInFlightBytes,
/// Maximum total size of consumed messages in-memory before broker polling is throttled. Default 24MiB.
?maxInFlightBytes) =
let conf =
let c =
ConsumerConfig(
ClientId=clientId, BootstrapServers=Config.validateBrokerUri broker, GroupId=groupId,
AutoOffsetReset = Nullable (defaultArg autoOffsetReset AutoOffsetReset.Earliest),
FetchMaxBytes = Nullable (defaultArg fetchMaxBytes 100000),
MessageMaxBytes = Nullable (defaultArg fetchMaxBytes 100000),
FetchMinBytes = Nullable (defaultArg fetchMinBytes 10), // TODO check if sane default
EnableAutoCommit = Nullable true,
AutoCommitIntervalMs = Nullable (match offsetCommitInterval with Some (i: TimeSpan) -> int i.TotalMilliseconds | None -> 1000*10),
EnableAutoOffsetStore = Nullable false,
AutoOffsetReset = Nullable (defaultArg autoOffsetReset AutoOffsetReset.Earliest), // default: latest
FetchMaxBytes = Nullable (defaultArg fetchMaxBytes 100_000), // default: 524_288_000
MessageMaxBytes = Nullable (defaultArg fetchMaxBytes 100_000), // default 1_000_000
EnableAutoCommit = Nullable true, // at AutoCommitIntervalMs interval, write value supplied by StoreOffset call
EnableAutoOffsetStore = Nullable false, // explicit calls to StoreOffset are the only things that effect progression in offsets
LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
statisticsInterval |> Option.iter (fun (i : TimeSpan) -> conf.StatisticsIntervalMs <- Nullable (int i.TotalMilliseconds))
{ conf = conf; custom = defaultArg custom Seq.empty
fetchMinBytes |> Option.iter (fun x -> c.FetchMinBytes <- x) // Fetch waits for this amount of data for up to FetchWaitMaxMs (100)
offsetCommitInterval |> Option.iter<TimeSpan> (fun x -> c.AutoCommitIntervalMs <- Nullable <| int x.TotalMilliseconds)
statisticsInterval |> Option.iter<TimeSpan> (fun x -> c.StatisticsIntervalMs <- Nullable <| int x.TotalMilliseconds)
custom |> Option.iter<seq<KeyValuePair<string,string>>> (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v))
customize |> Option.iter<ConsumerConfig -> unit> (fun f -> f c)
{ inner = c
topics = match Seq.toList topics with [] -> invalidArg "topics" "must be non-empty collection" | ts -> ts
buffering = {
maxBatchSize = defaultArg maxBatchSize 1000
Expand All @@ -322,29 +334,36 @@ type KafkaPartitionMetrics =
[<JsonProperty("consumer_lag")>]
consumerLag: int64 }

type KafkaConsumer private (log : ILogger, consumer : IConsumer<string, string>, task : Task<unit>, cts : CancellationTokenSource) =
/// Creates and wraps a Confluent.Kafka IConsumer, wrapping it to afford a batched consumption mode with implicit offset progression at the end of each
/// (parallel across partitions, sequenced/monotinic within) batch of processing carried out by the `partitionHandler`
/// When the `partionHandler` throws and/or `Stop()` is called, conclusion can be awaited by via `AwaitCompletion()`
type KafkaConsumer private (log : ILogger, inner : IConsumer<string, string>, task : Task<unit>, cts : CancellationTokenSource) =
member __.Inner = inner

/// Asynchronously awaits until consumer stops or is faulted
member __.AwaitCompletion() = Async.AwaitTaskCorrect task
interface IDisposable with member __.Dispose() = __.Stop()

member __.Status = task.Status
/// Request cancellation of processing
member __.Stop() =
log.Information("Consuming ... Stopping {name}", consumer.Name)
log.Information("Consuming ... Stopping {name}", inner.Name)
cts.Cancel();

/// Inspects current status of processing task
member __.Status = task.Status
/// Asynchronously awaits until consumer stops or is faulted
member __.AwaitCompletion() = Async.AwaitTaskCorrect task

/// Starts a kafka consumer with provider configuration and batch message handler.
/// Batches are grouped by topic partition. Batches belonging to the same topic partition will be scheduled sequentially and monotonically,
/// however batches from different partitions can be run concurrently.
/// Yielding an exception from the `partitionHandler` terminates the processing
static member Start (log : ILogger) (config : KafkaConsumerConfig) (partitionHandler : ConsumeResult<string,string>[] -> Async<unit>) =
if List.isEmpty config.topics then invalidArg "config" "must specify at least one topic"
log.Information("Consuming... {broker} {topics} {groupId}" (*autoOffsetReset={autoOffsetReset}*) + " fetchMaxBytes={fetchMaxB} maxInFlightBytes={maxInFlightB} maxBatchSize={maxBatchB} maxBatchDelay={maxBatchDelay}s",
config.conf.BootstrapServers, config.topics, config.conf.GroupId, (*config.conf.AutoOffsetReset.Value,*) config.conf.FetchMaxBytes,
config.inner.BootstrapServers, config.topics, config.inner.GroupId, (*config.conf.AutoOffsetReset.Value,*) config.inner.FetchMaxBytes,
config.buffering.maxInFlightBytes, config.buffering.maxBatchSize, (let t = config.buffering.maxBatchDelay in t.TotalSeconds))

let partitionedCollection = new ConsumerImpl.PartitionedBlockingCollection<TopicPartition, ConsumeResult<string, string>>()
let consumer =
ConsumerBuilder<_,_>(config.Kvps)
ConsumerBuilder<_,_>(config.inner)
.SetLogHandler(fun _c m -> log.Information("consumer_info|{message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
.SetErrorHandler(fun _c e -> log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
.SetStatisticsHandler(fun _c json ->
Expand Down

0 comments on commit 47e561b

Please sign in to comment.