diff --git a/src/kafka-net/Common/BigEndianBinaryReader.cs b/src/kafka-net/Common/BigEndianBinaryReader.cs index 7cc0c7a7..6d105cf2 100644 --- a/src/kafka-net/Common/BigEndianBinaryReader.cs +++ b/src/kafka-net/Common/BigEndianBinaryReader.cs @@ -31,7 +31,7 @@ public BigEndianBinaryReader(IEnumerable payload) : base(new MemoryStream( } public long Length{get{return base.BaseStream.Length;}} - public long Position { get { return base.BaseStream.Position; } set { base.BaseStream.Position = 0; } } + public long Position { get { return base.BaseStream.Position; } set { base.BaseStream.Position = value; } } public bool HasData { get { return base.BaseStream.Position < base.BaseStream.Length; } } public bool Available(int dataSize) @@ -104,6 +104,11 @@ public override UInt64 ReadUInt64() return EndianAwareRead(8, BitConverter.ToUInt64); } + public byte[] ReadBytes() + { + return ReadIntPrefixedBytes(); + } + public string ReadInt16String() { var size = ReadInt16(); diff --git a/src/kafka-net/Common/Extensions.cs b/src/kafka-net/Common/Extensions.cs index 096275c2..3c4ddd17 100644 --- a/src/kafka-net/Common/Extensions.cs +++ b/src/kafka-net/Common/Extensions.cs @@ -234,5 +234,17 @@ public static Exception ExtractException(this Task task) return new ApplicationException("Unknown exception occured."); } + + private static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); + + public static long ToUnixEpochMilliseconds(this DateTime pointInTime) + { + return pointInTime > UnixEpoch ? (long)(pointInTime - UnixEpoch).TotalMilliseconds : 0L; + } + + public static DateTime FromUnixEpochMilliseconds(this long milliseconds) + { + return UnixEpoch.AddMilliseconds(milliseconds); + } } } diff --git a/src/kafka-net/Interfaces/IKafkaRequest.cs b/src/kafka-net/Interfaces/IKafkaRequest.cs index cfec5deb..b616cb30 100644 --- a/src/kafka-net/Interfaces/IKafkaRequest.cs +++ b/src/kafka-net/Interfaces/IKafkaRequest.cs @@ -27,6 +27,10 @@ public interface IKafkaRequest /// ApiKeyRequestType ApiKey { get; } /// + /// This is a numeric version number for the api request. It allows the server to properly interpret the request as the protocol evolves. Responses will always be in the format corresponding to the request version. + /// + short ApiVersion { get; set; } + /// /// Encode this request into the Kafka wire protocol. /// /// Byte[] representing the binary wire protocol of this request. diff --git a/src/kafka-net/Protocol/BaseRequest.cs b/src/kafka-net/Protocol/BaseRequest.cs index 7ffde936..be611e92 100644 --- a/src/kafka-net/Protocol/BaseRequest.cs +++ b/src/kafka-net/Protocol/BaseRequest.cs @@ -14,9 +14,9 @@ public abstract class BaseRequest /// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol /// protected const int ReplicaId = -1; - protected const Int16 ApiVersion = 0; private string _clientId = "Kafka-Net"; private int _correlationId = 1; + private short _apiVersion = 0; /// /// Descriptive name of the source of the messages sent to kafka @@ -29,6 +29,11 @@ public abstract class BaseRequest /// public int CorrelationId { get { return _correlationId; } set { _correlationId = value; } } + /// + /// This is a numeric version number for the api request. It allows the server to properly interpret the request as the protocol evolves. Responses will always be in the format corresponding to the request version. + /// + public short ApiVersion { get { return _apiVersion; } set { _apiVersion = value; } } + /// /// Flag which tells the broker call to expect a response for this request. /// @@ -43,7 +48,7 @@ public static KafkaMessagePacker EncodeHeader(IKafkaRequest request) { return new KafkaMessagePacker() .Pack(((Int16)request.ApiKey)) - .Pack(ApiVersion) + .Pack(request.ApiVersion) .Pack(request.CorrelationId) .Pack(request.ClientId, StringPrefixEncoding.Int16); } diff --git a/src/kafka-net/Protocol/FetchRequest.cs b/src/kafka-net/Protocol/FetchRequest.cs index c7e87540..1078ef45 100644 --- a/src/kafka-net/Protocol/FetchRequest.cs +++ b/src/kafka-net/Protocol/FetchRequest.cs @@ -37,7 +37,7 @@ public KafkaDataPayload Encode() public IEnumerable Decode(byte[] payload) { - return DecodeFetchResponse(payload); + return DecodeFetchResponse(ApiVersion, payload); } private KafkaDataPayload EncodeFetchRequest(FetchRequest request) @@ -78,12 +78,16 @@ private KafkaDataPayload EncodeFetchRequest(FetchRequest request) } } - private IEnumerable DecodeFetchResponse(byte[] data) + private IEnumerable DecodeFetchResponse(int version, byte[] data) { using (var stream = new BigEndianBinaryReader(data)) { var correlationId = stream.ReadInt32(); + if (version >= 1) { + var throttleTime = stream.ReadInt32(); + } + var topicCount = stream.ReadInt32(); for (int i = 0; i < topicCount; i++) { diff --git a/src/kafka-net/Protocol/Message.cs b/src/kafka-net/Protocol/Message.cs index c325b650..793d09db 100644 --- a/src/kafka-net/Protocol/Message.cs +++ b/src/kafka-net/Protocol/Message.cs @@ -40,6 +40,10 @@ public class Message public byte MagicNumber { get; set; } /// /// Attribute value outside message body used for added codec/compression info. + /// + /// The lowest 3 bits contain the compression codec used for the message. + /// The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0) + /// All other bits should be set to 0. /// public byte Attribute { get; set; } /// @@ -50,6 +54,10 @@ public class Message /// The message body contents. Can contain compress message set. /// public byte[] Value { get; set; } + /// + /// This is the timestamp of the message. The timestamp type is indicated in the attributes. Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)). + /// + public DateTime Timestamp { get; set; } /// /// Construct an empty message. @@ -134,11 +142,14 @@ public static byte[] EncodeMessage(Message message) { using(var stream = new KafkaMessagePacker()) { - return stream.Pack(message.MagicNumber) - .Pack(message.Attribute) - .Pack(message.Key) - .Pack(message.Value) - .CrcPayload(); + stream.Pack(message.MagicNumber) + .Pack(message.Attribute); + if (message.MagicNumber >= 1) { + stream.Pack(message.Timestamp.ToUnixEpochMilliseconds()); + } + return stream.Pack(message.Key) + .Pack(message.Value) + .CrcPayload(); } } @@ -162,9 +173,16 @@ public static IEnumerable DecodeMessage(long offset, byte[] payload) Meta = new MessageMetadata { Offset = offset }, MagicNumber = stream.ReadByte(), Attribute = stream.ReadByte(), - Key = stream.ReadIntPrefixedBytes() }; + if (message.MagicNumber >= 1) { + var timestamp = stream.ReadInt64(); + if (timestamp >= 0) { + message.Timestamp = timestamp.FromUnixEpochMilliseconds(); + } + } + message.Key = stream.ReadIntPrefixedBytes(); + var codec = (MessageCodec)(ProtocolConstants.AttributeCodeMask & message.Attribute); switch (codec) { diff --git a/src/kafka-net/Protocol/OffsetCommitRequest.cs b/src/kafka-net/Protocol/OffsetCommitRequest.cs index f758c5ac..dd412c22 100644 --- a/src/kafka-net/Protocol/OffsetCommitRequest.cs +++ b/src/kafka-net/Protocol/OffsetCommitRequest.cs @@ -12,7 +12,10 @@ namespace KafkaNet.Protocol public class OffsetCommitRequest : BaseRequest, IKafkaRequest { public ApiKeyRequestType ApiKey { get { return ApiKeyRequestType.OffsetCommit; } } + public int GenerationId { get; set; } + public string MemberId { get; set; } public string ConsumerGroup { get; set; } + public TimeSpan? OffsetRetention { get; set; } public List OffsetCommits { get; set; } public KafkaDataPayload Encode() @@ -31,6 +34,18 @@ private KafkaDataPayload EncodeOffsetCommitRequest(OffsetCommitRequest request) using (var message = EncodeHeader(request).Pack(request.ConsumerGroup, StringPrefixEncoding.Int16)) { + if (request.ApiVersion >= 1) { + message.Pack(request.GenerationId) + .Pack(request.MemberId, StringPrefixEncoding.Int16); + } + if (request.ApiVersion >= 2) { + if (request.OffsetRetention.HasValue) { + message.Pack((long) request.OffsetRetention.Value.TotalMilliseconds); + } else { + message.Pack(-1L); + } + } + var topicGroups = request.OffsetCommits.GroupBy(x => x.Topic).ToList(); message.Pack(topicGroups.Count); @@ -45,9 +60,11 @@ private KafkaDataPayload EncodeOffsetCommitRequest(OffsetCommitRequest request) foreach (var commit in partition) { message.Pack(partition.Key) - .Pack(commit.Offset) - .Pack(commit.TimeStamp) - .Pack(commit.Metadata, StringPrefixEncoding.Int16); + .Pack(commit.Offset); + if (ApiVersion == 1) { + message.Pack(commit.TimeStamp); + } + message.Pack(commit.Metadata, StringPrefixEncoding.Int16); } } } diff --git a/src/kafka-net/Protocol/ProduceRequest.cs b/src/kafka-net/Protocol/ProduceRequest.cs index 63a94a22..69015160 100644 --- a/src/kafka-net/Protocol/ProduceRequest.cs +++ b/src/kafka-net/Protocol/ProduceRequest.cs @@ -38,11 +38,11 @@ public KafkaDataPayload Encode() public IEnumerable Decode(byte[] payload) { - return DecodeProduceResponse(payload); + return DecodeProduceResponse(ApiVersion, payload); } #region Protocol... - private KafkaDataPayload EncodeProduceRequest(ProduceRequest request) + private static KafkaDataPayload EncodeProduceRequest(ProduceRequest request) { int totalCompressedBytes = 0; if (request.Payload == null) request.Payload = new List(); @@ -95,7 +95,7 @@ private KafkaDataPayload EncodeProduceRequest(ProduceRequest request) } } - private CompressedMessageResult CreateGzipCompressedMessage(IEnumerable messages) + private static CompressedMessageResult CreateGzipCompressedMessage(IEnumerable messages) { var messageSet = Message.EncodeMessageSet(messages); @@ -114,7 +114,7 @@ private CompressedMessageResult CreateGzipCompressedMessage(IEnumerable }; } - private IEnumerable DecodeProduceResponse(byte[] data) + private static IEnumerable DecodeProduceResponse(int version, byte[] data) { using (var stream = new BigEndianBinaryReader(data)) { @@ -136,9 +136,20 @@ private IEnumerable DecodeProduceResponse(byte[] data) Offset = stream.ReadInt64() }; + if (version >= 2) { + var milliseconds = stream.ReadInt64(); + if (milliseconds >= 0) { + response.Timestamp = milliseconds.FromUnixEpochMilliseconds(); + } + } + yield return response; } } + + if (version >= 2) { + var throttleTime = stream.ReadInt32(); + } } } #endregion @@ -168,6 +179,13 @@ public class ProduceResponse /// The offset number to commit as completed. /// public long Offset { get; set; } + /// + /// If LogAppendTime is used for the topic, this is the timestamp assigned by the broker to the message set. + /// All the messages in the message set have the same timestamp. + /// If CreateTime is used, this field is always -1. The producer can assume the timestamp of the messages in the + /// produce request has been accepted by the broker if there is no error code returned. + /// + public DateTime? Timestamp { get; set; } public override bool Equals(object obj) { diff --git a/src/kafka-net/Protocol/Protocol.cs b/src/kafka-net/Protocol/Protocol.cs index 84b913fe..dd1b752e 100644 --- a/src/kafka-net/Protocol/Protocol.cs +++ b/src/kafka-net/Protocol/Protocol.cs @@ -124,6 +124,11 @@ public enum ErrorResponseCode : short /// OffsetMetadataTooLargeCode = 12, + /// + /// The server disconnected before a response was received. + /// + NetworkException = 13, + /// /// The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition). /// @@ -137,7 +142,103 @@ public enum ErrorResponseCode : short /// /// The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for. /// - NotCoordinatorForConsumerCode = 16 + NotCoordinatorForConsumerCode = 16, + + /// + /// The request attempted to perform an operation on an invalid topic. + /// + InvalidTopic = 17, + + /// + /// The request included message batch larger than the configured segment size on the server. + /// + RecordListTooLarge = 18, + + /// + /// Messages are rejected since there are fewer in-sync replicas than required. + /// + NotEnoughReplicas = 19, + + /// + /// Messages are written to the log, but to fewer in-sync replicas than required. + /// + NotEnoughReplicasAfterAppend = 20, + + /// + /// Produce request specified an invalid value for required acks. + /// + InvalidRequiredAcks = 21, + + /// + /// Specified group generation id is not valid. + /// + IllegalGeneration = 22, + + /// + /// The group member's supported protocols are incompatible with those of existing members. + /// + InconsistentGroupProtocol = 23, + + /// + /// The configured groupId is invalid. + /// + InvalidGroupId = 24, + + /// + /// The coordinator is not aware of this member. + /// + UnknownMemberId = 25, + + /// + /// The session timeout is not within the range allowed by the broker (as configured + /// by group.min.session.timeout.ms and group.max.session.timeout.ms). + /// + InvalidSessionTimeout = 26, + + /// + /// The group is rebalancing, so a rejoin is needed. + /// + RebalanceInProgress = 27, + + /// + /// The committing offset data size is not valid + /// + InvalidCommitOffsetSize = 28, + + /// + /// Not authorized to access topic. + /// + TopicAuthorizationFailed = 29, + + /// + /// Not authorized to access group. + /// + GroupAuthorizationFailed = 30, + + /// + /// Cluster authorization failed. + /// + ClusterAuthorizationFailed = 31, + + /// + /// The timestamp of the message is out of acceptable range. + /// + InvalidTimestamp = 32, + + /// + /// The broker does not support the requested SASL mechanism. + /// + UnsupportedSaslMechanism = 33, + + /// + /// Request is not valid given the current SASL state. + /// + IllegalSaslState = 34, + + /// + /// The version of API is not supported. + /// + UnsupportedVersion = 35 } /// diff --git a/src/kafka-tests/Unit/ProtocolAssertionExtensions.cs b/src/kafka-tests/Unit/ProtocolAssertionExtensions.cs new file mode 100644 index 00000000..8deafa35 --- /dev/null +++ b/src/kafka-tests/Unit/ProtocolAssertionExtensions.cs @@ -0,0 +1,619 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using KafkaNet; +using KafkaNet.Common; +using KafkaNet.Protocol; +using NUnit.Framework; + +namespace kafka_tests.Unit +{ + public static class ProtocolAssertionExtensions + { + /// + /// GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort + /// ErrorCode => int16 -- The error code. + /// CoordinatorId => int32 -- The broker id. + /// CoordinatorHost => string -- The hostname of the broker. + /// CoordinatorPort => int32 -- The port on which the broker accepts requests. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + public static void AssertGroupCoordinatorResponse(this BigEndianBinaryReader reader, ConsumerMetadataResponse response) + { + Assert.That(reader.ReadInt16(), Is.EqualTo(response.Error), "ErrorCode"); + Assert.That(reader.ReadInt32(), Is.EqualTo(response.CoordinatorId), "CoordinatorId"); + Assert.That(reader.ReadInt16String(), Is.EqualTo(response.CoordinatorHost), "CoordinatorHost"); + Assert.That(reader.ReadInt32(), Is.EqualTo(response.CoordinatorPort), "CoordinatorPort"); + } + + /// + /// GroupCoordinatorRequest => GroupId + /// GroupId => string -- The consumer group id. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + public static void AssertGroupCoordinatorRequest(this BigEndianBinaryReader reader, ConsumerMetadataRequest request) + { + Assert.That(reader.ReadInt16String(), Is.EqualTo(request.ConsumerGroup), "ConsumerGroup"); + } + + /// + /// OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]] + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition. + /// Offset => int64 -- The offset, or -1 if none exists. + /// Metadata => string -- The metadata associated with the topic and partition. + /// ErrorCode => int16 -- The error code for the partition, if any. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + public static void AssertOffsetFetchResponse(this BigEndianBinaryReader reader, IEnumerable response) + { + var responses = response.GroupBy(r => r.Topic).ToList(); + Assert.That(reader.ReadInt32(), Is.EqualTo(responses.Count), "[TopicName]"); + foreach (var payload in responses) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Key), "TopicName"); + var partitions = payload.ToList(); + Assert.That(reader.ReadInt32(), Is.EqualTo(partitions.Count), "[Partition]"); + foreach (var partition in partitions) { + Assert.That(reader.ReadInt32(), Is.EqualTo(partition.PartitionId), "Partition"); + Assert.That(reader.ReadInt64(), Is.EqualTo(partition.Offset), "Offset"); + Assert.That(reader.ReadInt16String(), Is.EqualTo(partition.MetaData), "Metadata"); + Assert.That(reader.ReadInt16(), Is.EqualTo((short)partition.Error), "ErrorCode"); + } + } + } + + /// + /// OffsetFetchRequest => ConsumerGroup [TopicName [Partition]] + /// ConsumerGroup => string -- The consumer group id. + /// TopicName => string -- The topic to commit. + /// Partition => int32 -- The partition id. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + public static void AssertOffsetFetchRequest(this BigEndianBinaryReader reader, OffsetFetchRequest request) + { + Assert.That(reader.ReadInt16String(), Is.EqualTo(request.ConsumerGroup), "ConsumerGroup"); + + Assert.That(reader.ReadInt32(), Is.EqualTo(request.Topics.Count), "[TopicName]"); + foreach (var payload in request.Topics) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Topic), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(1), "[Partition]"); // this is a mismatch between the protocol and the object model + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.PartitionId), "Partition"); + } + } + + /// + /// OffsetCommitResponse => [TopicName [Partition ErrorCode]]] + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition. + /// ErrorCode => int16 -- The error code for the partition, if any. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + public static void AssertOffsetCommitResponse(this BigEndianBinaryReader reader, IEnumerable response) + { + var responses = response.GroupBy(r => r.Topic).ToList(); + Assert.That(reader.ReadInt32(), Is.EqualTo(responses.Count), "[TopicName]"); + foreach (var payload in responses) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Key), "TopicName"); + var partitions = payload.ToList(); + Assert.That(reader.ReadInt32(), Is.EqualTo(partitions.Count), "[Partition]"); + foreach (var partition in partitions) { + Assert.That(reader.ReadInt32(), Is.EqualTo(partition.PartitionId), "Partition"); + Assert.That(reader.ReadInt16(), Is.EqualTo((short)partition.Error), "ErrorCode"); + } + } + } + + /// + /// OffsetCommitRequest => ConsumerGroup *ConsumerGroupGenerationId *MemberId *RetentionTime [TopicName [Partition Offset *TimeStamp Metadata]] + /// *ConsumerGroupGenerationId, MemberId is only version 1 (0.8.2) and above + /// *TimeStamp is only version 1 (0.8.2) + /// *RetentionTime is only version 2 (0.9.0) and above + /// ConsumerGroupId => string -- The consumer group id. + /// ConsumerGroupGenerationId => int32 -- The generation of the consumer group. + /// MemberId => string -- The consumer id assigned by the group coordinator. + /// RetentionTime => int64 -- Time period in ms to retain the offset. + /// TopicName => string -- The topic to commit. + /// Partition => int32 -- The partition id. + /// Offset => int64 -- message offset to be committed. + /// Timestamp => int64 -- Commit timestamp. + /// Metadata => string -- Any associated metadata the client wants to keep + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + public static void AssertOffsetCommitRequest(this BigEndianBinaryReader reader, OffsetCommitRequest request) + { + Assert.That(reader.ReadInt16String(), Is.EqualTo(request.ConsumerGroup), "ConsumerGroup"); + + if (request.ApiVersion >= 1) { + Assert.That(reader.ReadInt32(), Is.EqualTo(request.GenerationId), "ConsumerGroupGenerationId"); + Assert.That(reader.ReadInt16String(), Is.EqualTo(request.MemberId), "MemberId"); + } + if (request.ApiVersion >= 2) { + var expectedRetention = request.OffsetRetention.HasValue + ? (long) request.OffsetRetention.Value.TotalMilliseconds + : -1L; + Assert.That(reader.ReadInt64(), Is.EqualTo(expectedRetention), "RetentionTime"); + } + + Assert.That(reader.ReadInt32(), Is.EqualTo(request.OffsetCommits.Count), "[TopicName]"); + foreach (var payload in request.OffsetCommits) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Topic), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(1), "[Partition]"); // this is a mismatch between the protocol and the object model + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.PartitionId), "Partition"); + Assert.That(reader.ReadInt64(), Is.EqualTo(payload.Offset), "Offset"); + + if (request.ApiVersion == 1) { + Assert.That(reader.ReadInt64(), Is.EqualTo(payload.TimeStamp), "TimeStamp"); + } + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Metadata), "Metadata"); + } + } + + /// + /// MetadataResponse => [Broker][TopicMetadata] + /// Broker => NodeId Host Port (any number of brokers may be returned) + /// -- The node id, hostname, and port information for a kafka broker + /// NodeId => int32 + /// Host => string + /// Port => int32 + /// TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] + /// TopicErrorCode => int16 + /// PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr + /// PartitionErrorCode => int16 + /// PartitionId => int32 + /// Leader => int32 -- The node id for the kafka broker currently acting as leader for this partition. + /// If no leader exists because we are in the middle of a leader election this id will be -1. + /// Replicas => [int32] -- The set of alive nodes that currently acts as slaves for the leader for this partition. + /// Isr => [int32] -- The set subset of the replicas that are "caught up" to the leader + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + /// + public static void AssertMetadataResponse(this BigEndianBinaryReader reader, MetadataResponse response) + { + Assert.That(reader.ReadInt32(), Is.EqualTo(response.Brokers.Count), "[Broker]"); + foreach (var payload in response.Brokers) { + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.BrokerId), "NodeId"); + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Host), "Host"); + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.Port), "Port"); + } + Assert.That(reader.ReadInt32(), Is.EqualTo(response.Topics.Count), "[TopicMetadata]"); + foreach (var payload in response.Topics) { + Assert.That(reader.ReadInt16(), Is.EqualTo((short)payload.ErrorCode), "TopicErrorCode"); + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Name), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.Partitions.Count), "[PartitionMetadata]"); + foreach (var partition in payload.Partitions) { + Assert.That(reader.ReadInt16(), Is.EqualTo((short) partition.ErrorCode), "PartitionErrorCode"); + Assert.That(reader.ReadInt32(), Is.EqualTo(partition.PartitionId), "PartitionId"); + Assert.That(reader.ReadInt32(), Is.EqualTo(partition.LeaderId), "Leader"); + Assert.That(reader.ReadInt32(), Is.EqualTo(partition.Replicas.Count), "[Replicas]"); + foreach (var replica in partition.Replicas) { + Assert.That(reader.ReadInt32(), Is.EqualTo(replica), "Replicas"); + } + Assert.That(reader.ReadInt32(), Is.EqualTo(partition.Isrs.Count), "[Isr]"); + foreach (var isr in partition.Isrs) { + Assert.That(reader.ReadInt32(), Is.EqualTo(isr), "Isr"); + } + } + } + } + + /// + /// TopicMetadataRequest => [TopicName] + /// TopicName => string -- The topics to produce metadata for. If empty the request will yield metadata for all topics. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + /// + public static void AssertMetadataRequest(this BigEndianBinaryReader reader, MetadataRequest request) + { + Assert.That(reader.ReadInt32(), Is.EqualTo(request.Topics.Count), "[TopicName]"); + foreach (var payload in request.Topics) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload), "TopicName"); + } + } + + /// + /// OffsetResponse => [TopicName [PartitionOffsets]] + /// PartitionOffsets => Partition ErrorCode [Offset] + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition the fetch is for. + /// ErrorCode => int16 -- The error from this partition, if any. Errors are given on a per-partition basis because a given partition may + /// be unavailable or maintained on a different host, while others may have successfully accepted the produce request. + /// Offset => int64 + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) + /// + public static void AssertOffsetResponse(this BigEndianBinaryReader reader, IEnumerable response) + { + var responses = response.ToList(); + Assert.That(reader.ReadInt32(), Is.EqualTo(responses.Count), "[TopicName]"); + foreach (var payload in responses) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Topic), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(1), "[Partition]"); // this is a mismatch between the protocol and the object model + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.PartitionId), "Partition"); + Assert.That(reader.ReadInt16(), Is.EqualTo(payload.Error), "ErrorCode"); + + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.Offsets.Count), "[Offset]"); + foreach (var offset in payload.Offsets) { + Assert.That(reader.ReadInt64(), Is.EqualTo(offset), "Offset"); + } + } + } + + /// + /// OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] + /// ReplicaId => int32 -- The replica id indicates the node id of the replica initiating this request. Normal client consumers should always + /// specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted + /// to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes. + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition the fetch is for. + /// Time => int64 -- Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the + /// latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note + /// that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element. + /// MaxNumberOfOffsets => int32 + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) + /// + public static void AssertOffsetRequest(this BigEndianBinaryReader reader, OffsetRequest request) + { + Assert.That(reader.ReadInt32(), Is.EqualTo(-1), "ReplicaId"); + + Assert.That(reader.ReadInt32(), Is.EqualTo(request.Offsets.Count), "[TopicName]"); + foreach (var payload in request.Offsets) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Topic), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(1), "[Partition]"); // this is a mismatch between the protocol and the object model + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.PartitionId), "Partition"); + + Assert.That(reader.ReadInt64(), Is.EqualTo(payload.Time), "Time"); + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.MaxOffsets), "MaxNumberOfOffsets"); + } + } + + /// + /// FetchResponse => *ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] + /// *ThrottleTime is only version 1 (0.9.0) and above + /// ThrottleTime => int32 -- Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not + /// violate any quota.) + /// TopicName => string -- The topic this response entry corresponds to. + /// Partition => int32 -- The partition this response entry corresponds to. + /// ErrorCode => int16 -- The error from this partition, if any. Errors are given on a per-partition basis because a given partition may + /// be unavailable or maintained on a different host, while others may have successfully accepted the produce request. + /// HighwaterMarkOffset => int64 -- The offset at the end of the log for this partition. This can be used by the client to determine how many messages + /// behind the end of the log they are. + /// MessageSetSize => int32 -- The size in bytes of the message set for this partition + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse + /// + public static void AssertFetchResponse(this BigEndianBinaryReader reader, int version, int throttleTime, IEnumerable response) + { + var responses = response.ToList(); + if (version >= 1) { + Assert.That(reader.ReadInt32(), Is.EqualTo(throttleTime), "ThrottleTime"); + } + Assert.That(reader.ReadInt32(), Is.EqualTo(responses.Count), "[TopicName]"); + foreach (var payload in responses) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Topic), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(1), "[Partition]"); // this is a mismatch between the protocol and the object model + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.PartitionId), "Partition"); + Assert.That(reader.ReadInt16(), Is.EqualTo(payload.Error), "Error"); + Assert.That(reader.ReadInt64(), Is.EqualTo(payload.HighWaterMark), "HighwaterMarkOffset"); + + var finalPosition = reader.ReadInt32() + reader.Position; + reader.AssertMessageSet(version, payload.Messages); + Assert.That(reader.Position, Is.EqualTo(finalPosition), + string.Format("MessageSetSize was {0} but ended in a different spot.", finalPosition - 4)); + } + } + + /// + /// FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] + /// ReplicaId => int32 -- The replica id indicates the node id of the replica initiating this request. Normal client consumers should always + /// specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted + /// to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes. + /// MaxWaitTime => int32 -- The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available + /// at the time the request is issued. + /// MinBytes => int32 -- This is the minimum number of bytes of messages that must be available to give a response. If the client sets this + /// to 0 the server will always respond immediately, however if there is no new data since their last request they will + /// just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has + /// at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the + /// consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. + /// setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate + /// 64k of data before responding). + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition the fetch is for. + /// FetchOffset => int64 -- The offset to begin this fetch from. + /// MaxBytes => int32 -- The maximum bytes to include in the message set for this partition. This helps bound the size of the response. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI + /// + public static void AssertFetchRequest(this BigEndianBinaryReader reader, FetchRequest request) + { + Assert.That(reader.ReadInt32(), Is.EqualTo(-1), "ReplicaId"); + Assert.That(reader.ReadInt32(), Is.EqualTo(request.MaxWaitTime), "MaxWaitTime"); + Assert.That(reader.ReadInt32(), Is.EqualTo(request.MinBytes), "MinBytes"); + + Assert.That(reader.ReadInt32(), Is.EqualTo(request.Fetches.Count), "[TopicName]"); + foreach (var payload in request.Fetches) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Topic), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(1), "[Partition]"); // this is a mismatch between the protocol and the object model + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.PartitionId), "Partition"); + + Assert.That(reader.ReadInt64(), Is.EqualTo(payload.Offset), "FetchOffset"); + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.MaxBytes), "MaxBytes"); + } + } + + /// + /// ProduceResponse => [TopicName [Partition ErrorCode Offset *Timestamp]] *ThrottleTime + /// *ThrottleTime is only version 1 (0.9.0) and above + /// *Timestamp is only version 2 (0.10.0) and above + /// TopicName => string -- The topic this response entry corresponds to. + /// Partition => int32 -- The partition this response entry corresponds to. + /// ErrorCode => int16 -- The error from this partition, if any. Errors are given on a per-partition basis because a given partition may be + /// unavailable or maintained on a different host, while others may have successfully accepted the produce request. + /// Offset => int64 -- The offset assigned to the first message in the message set appended to this partition. + /// Timestamp => int64 -- If LogAppendTime is used for the topic, this is the timestamp assigned by the broker to the message set. + /// All the messages in the message set have the same timestamp. + /// If CreateTime is used, this field is always -1. The producer can assume the timestamp of the messages in the + /// produce request has been accepted by the broker if there is no error code returned. + /// Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)). + /// ThrottleTime => int32 -- Duration in milliseconds for which the request was throttled due to quota violation. + /// (Zero if the request did not violate any quota). + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + /// + public static void AssertProduceResponse(this BigEndianBinaryReader reader, int version, int throttleTime, IEnumerable response) + { + var responses = response.ToList(); + Assert.That(reader.ReadInt32(), Is.EqualTo(responses.Count), "[TopicName]"); + foreach (var payload in responses) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Topic), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(1), "[Partition]"); // this is a mismatch between the protocol and the object model + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.PartitionId), "Partition"); + Assert.That(reader.ReadInt16(), Is.EqualTo(payload.Error), "Error"); + Assert.That(reader.ReadInt64(), Is.EqualTo(payload.Offset), "Offset"); + if (version >= 2) { + var timestamp = reader.ReadInt64(); + if (timestamp == -1L) { + Assert.That(payload.Timestamp, Is.Null, "Timestamp"); + } else { + Assert.That(payload.Timestamp, Is.Not.Null, "Timestamp"); + Assert.That(payload.Timestamp.Value, Is.EqualTo(timestamp.FromUnixEpochMilliseconds()), "Timestamp"); + } + } + } + if (version >= 1) { + Assert.That(reader.ReadInt32(), Is.EqualTo(throttleTime), "ThrottleTime"); + } + } + + /// + /// ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] + /// RequiredAcks => int16 -- This field indicates how many acknowledgements the servers should receive before responding to the request. + /// If it is 0 the server will not send any response (this is the only case where the server will not reply to + /// a request). If it is 1, the server will wait the data is written to the local log before sending a response. + /// If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. + /// Timeout => int32 -- This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements + /// in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include + /// network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are + /// queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if + /// the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client + /// should use the socket timeout. + /// TopicName => string -- The topic that data is being published to. + /// Partition => int32 -- The partition that data is being published to. + /// MessageSetSize => int32 -- The size, in bytes, of the message set that follows. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + /// + public static void AssertProduceRequest(this BigEndianBinaryReader reader, ProduceRequest request) + { + Assert.That(reader.ReadInt16(), Is.EqualTo(request.Acks), "acks"); + Assert.That(reader.ReadInt32(), Is.EqualTo(request.TimeoutMS), "timeout"); + + Assert.That(reader.ReadInt32(), Is.EqualTo(request.Payload.Count), "[topic_data]"); + foreach (var payload in request.Payload) { + Assert.That(reader.ReadInt16String(), Is.EqualTo(payload.Topic), "TopicName"); + Assert.That(reader.ReadInt32(), Is.EqualTo(1), "[Partition]"); // this is a mismatch between the protocol and the object model + Assert.That(reader.ReadInt32(), Is.EqualTo(payload.Partition), "Partition"); + + var finalPosition = reader.ReadInt32() + reader.Position; + reader.AssertMessageSet(request.ApiVersion, payload.Messages); + Assert.That(reader.Position, Is.EqualTo(finalPosition), + string.Format("MessageSetSize was {0} but ended in a different spot.", finalPosition - 4)); + } + } + + /// + /// MessageSet => [Offset MessageSize Message] + /// Offset => int64 -- This is the offset used in kafka as the log sequence number. When the producer is sending non compressed messages, + /// it can set the offsets to anything. When the producer is sending compressed messages, to avoid server side recompression, + /// each compressed message should have offset starting from 0 and increasing by one for each inner message in the compressed message. + /// MessageSize => int32 + /// + /// NB. MessageSets are not preceded by an int32 like other array elements in the protocol: + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + /// + public static void AssertMessageSet(this BigEndianBinaryReader reader, int version, IEnumerable messages) + { + foreach (var message in messages) { + var offset = reader.ReadInt64(); + if (message.Attribute != (byte)MessageCodec.CodecNone) { + // TODO: assert offset? + } + var finalPosition = reader.ReadInt32() + reader.Position; + reader.AssertMessage(version, message); + Assert.That(reader.Position, Is.EqualTo(finalPosition), + string.Format("MessageSize was {0} but ended in a different spot.", finalPosition - 4)); + } + } + + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + /// + /// Message => Crc MagicByte Attributes *Timestamp Key Value + /// *Timestamp is only version 2 (0.10) and above + /// Crc => int32 -- The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer. + /// MagicByte => int8 -- This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 1. + /// Attributes => int8 -- This byte holds metadata attributes about the message. + /// The lowest 3 bits contain the compression codec used for the message. + /// The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0) + /// All other bits should be set to 0. + /// Timestamp => int64 -- This is the timestamp of the message. The timestamp type is indicated in the attributes. Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)). + /// Key => bytes -- The key is an optional message key that was used for partition assignment. The key can be null. + /// Value => bytes -- The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null. + /// + public static void AssertMessage(this BigEndianBinaryReader reader, int version, Message message) + { + var crc = (uint)reader.ReadInt32(); + var positionAfterCrc = reader.Position; + Assert.That(reader.ReadByte(), Is.EqualTo(message.MagicNumber), "MagicByte"); + Assert.That(reader.ReadByte(), Is.EqualTo(message.Attribute), "Attributes"); + if (version == 2) { + Assert.That(reader.ReadInt64(), Is.EqualTo(message.Timestamp.ToUnixEpochMilliseconds()), "Timestamp"); + } + Assert.That(reader.ReadBytes(), Is.EqualTo(message.Key), "Key"); + Assert.That(reader.ReadBytes(), Is.EqualTo(message.Value), "Value"); + + var positionAfterMessage = reader.Position; + reader.Position = positionAfterCrc; + var crcBytes = reader.ReadBytes((int) (positionAfterMessage - positionAfterCrc)); + Assert.That(Crc32Provider.Compute(crcBytes), Is.EqualTo(crc)); + reader.Position = positionAfterMessage; + } + + /// + /// From http://kafka.apache.org/protocol.html#protocol_messages + /// + /// Request Header => api_key api_version correlation_id client_id + /// api_key => INT16 -- The id of the request type. + /// api_version => INT16 -- The version of the API. + /// correlation_id => INT32 -- A user-supplied integer value that will be passed back with the response. + /// client_id => NULLABLE_STRING -- A user specified identifier for the client making the request. + /// + public static void AssertRequestHeader(this BigEndianBinaryReader reader, IKafkaRequest request) + { + reader.AssertRequestHeader(request.ApiKey, request.ApiVersion, request.CorrelationId, request.ClientId); + } + + /// + /// MessageSet => [Offset MessageSize Message] + /// Offset => int64 -- This is the offset used in kafka as the log sequence number. When the producer is sending non compressed messages, + /// it can set the offsets to anything. When the producer is sending compressed messages, to avoid server side recompression, + /// each compressed message should have offset starting from 0 and increasing by one for each inner message in the compressed message. + /// MessageSize => int32 + /// + /// NB. MessageSets are not preceded by an int32 like other array elements in the protocol: + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + /// + public static void AssertMessageSet(this BigEndianBinaryReader reader, int version, IEnumerable> messageValues) + { + var messages = messageValues.Select( + t => + new Message { + Attribute = t.Item1, + Timestamp = t.Item2, + Key = t.Item3, + Value = t.Item4, + MagicNumber = 1 + }); + reader.AssertMessageSet(version, messages); + } + + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + /// + /// version 0: + /// Message => Crc MagicByte Attributes Key Value + /// version 1: + /// Message => Crc MagicByte Attributes Timestamp Key Value + /// Crc => int32 -- The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer. + /// MagicByte => int8 -- This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 1. + /// Attributes => int8 -- This byte holds metadata attributes about the message. + /// The lowest 3 bits contain the compression codec used for the message. + /// The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0) + /// All other bits should be set to 0. + /// Timestamp => int64 -- This is the timestamp of the message. The timestamp type is indicated in the attributes. Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)). + /// Key => bytes -- The key is an optional message key that was used for partition assignment. The key can be null. + /// Value => bytes -- The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null. + /// + public static void AssertMessage(this BigEndianBinaryReader reader, int version, byte attributes, DateTime timestamp, byte[] key, byte[] value) + { + reader.AssertMessage(version, new Message { Attribute = attributes, Timestamp = timestamp, Key = key, Value = value, MagicNumber = 1}); + } + + /// + /// From http://kafka.apache.org/protocol.html#protocol_messages + /// + /// Request Header => api_key api_version correlation_id client_id + /// api_key => INT16 -- The id of the request type. + /// api_version => INT16 -- The version of the API. + /// correlation_id => INT32 -- A user-supplied integer value that will be passed back with the response. + /// client_id => NULLABLE_STRING -- A user specified identifier for the client making the request. + /// + public static void AssertRequestHeader(this BigEndianBinaryReader reader, ApiKeyRequestType apiKey, short version, int correlationId, string clientId) + { + Assert.That(reader.ReadInt16(), Is.EqualTo((short) apiKey), "api_key"); + Assert.That(reader.ReadInt16(), Is.EqualTo(version), "api_version"); + Assert.That(reader.ReadInt32(), Is.EqualTo(correlationId), "correlation_id"); + Assert.That(reader.ReadInt16String(), Is.EqualTo(clientId), "client_id"); + } + + /// + /// From http://kafka.apache.org/protocol.html#protocol_messages + /// + /// Response Header => correlation_id + /// correlation_id => INT32 -- The user-supplied value passed in with the request + /// + public static void AssertResponseHeader(this BigEndianBinaryReader reader, int correlationId) + { + Assert.That(reader.ReadInt32(), Is.EqualTo(correlationId)); + } + + /// + /// RequestOrResponse => Size (RequestMessage | ResponseMessage) + /// Size => int32 -- The Size field gives the size of the subsequent request or response message in bytes. + /// The client can read requests by first reading this 4 byte size as an integer N, and + /// then reading and parsing the subsequent N bytes of the request. + /// + /// From: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-CommonRequestandResponseStructure + /// + public static void AssertProtocol(this byte[] bytes, Action assertions) + { + using (var reader = new BigEndianBinaryReader(bytes)) { + var finalPosition = reader.ReadInt32() + reader.Position; + assertions(reader); + Assert.That(finalPosition, Is.EqualTo(reader.Position), + string.Format("Size was {0} but ended in a different spot.", finalPosition - 4)); + } + } + + public static byte[] PrefixWithInt32Length(this byte[] source) + { + var destination = new byte[source.Length + 4]; + using (var stream = new MemoryStream(destination)) { + using (var writer = new BigEndianBinaryWriter(stream)) { + writer.Write(source.Length); + } + } + Buffer.BlockCopy(source, 0, destination, 4, source.Length); + return destination; + } + + /// + /// From http://kafka.apache.org/protocol.html#protocol_messages + /// + /// Response Header => correlation_id + /// correlation_id => INT32 -- The user-supplied value passed in with the request + /// + public static void WriteResponseHeader(this BigEndianBinaryWriter writer, int correlationId) + { + writer.Write(correlationId); + } + + } +} \ No newline at end of file diff --git a/src/kafka-tests/Unit/ProtocolByteTests.cs b/src/kafka-tests/Unit/ProtocolByteTests.cs new file mode 100644 index 00000000..bfc6db7c --- /dev/null +++ b/src/kafka-tests/Unit/ProtocolByteTests.cs @@ -0,0 +1,836 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using KafkaNet.Common; +using KafkaNet.Protocol; +using NUnit.Framework; + +namespace kafka_tests.Unit +{ + /// + /// From http://kafka.apache.org/protocol.html#protocol_types + /// The protocol is built out of the following primitive types. + /// + /// Fixed Width Primitives: + /// int8, int16, int32, int64 - Signed integers with the given precision (in bits) stored in big endian order. + /// + /// Variable Length Primitives: + /// bytes, string - These types consist of a signed integer giving a length N followed by N bytes of content. + /// A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. + /// + /// Arrays: + /// This is a notation for handling repeated structures. These will always be encoded as an int32 size containing + /// the length N followed by N repetitions of the structure which can itself be made up of other primitive types. + /// In the BNF grammars below we will show an array of a structure foo as [foo]. + /// + /// Message formats are from https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-CommonRequestandResponseStructure + /// + /// RequestOrResponse => Size (RequestMessage | ResponseMessage) + /// Size => int32 : The Size field gives the size of the subsequent request or response message in bytes. + /// The client can read requests by first reading this 4 byte size as an integer N, and + /// then reading and parsing the subsequent N bytes of the request. + /// + /// Request Header => api_key api_version correlation_id client_id + /// api_key => INT16 -- The id of the request type. + /// api_version => INT16 -- The version of the API. + /// correlation_id => INT32 -- A user-supplied integer value that will be passed back with the response. + /// client_id => NULLABLE_STRING -- A user specified identifier for the client making the request. + /// + /// Response Header => correlation_id + /// correlation_id => INT32 -- The user-supplied value passed in with the request + /// + [TestFixture] + [Category("Unit")] + public class ProtocolByteTests + { + /// + /// ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] + /// RequiredAcks => int16 -- This field indicates how many acknowledgements the servers should receive before responding to the request. + /// If it is 0 the server will not send any response (this is the only case where the server will not reply to + /// a request). If it is 1, the server will wait the data is written to the local log before sending a response. + /// If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. + /// Timeout => int32 -- This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements + /// in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include + /// network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are + /// queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if + /// the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client + /// should use the socket timeout. + /// TopicName => string -- The topic that data is being published to. + /// Partition => int32 -- The partition that data is being published to. + /// MessageSetSize => int32 -- The size, in bytes, of the message set that follows. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + /// + [Test] + public void ProduceApiRequest( + [Values(0, 1, 2)] short version, + [Values(0, 2, -1)] short acks, + [Values(0, 1000)] int timeoutMilliseconds, + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values(3)] int messagesPerSet) + { + var clientId = "ProduceApiRequest"; + + var request = new ProduceRequest { + Acks = acks, + ClientId = clientId, + CorrelationId = clientId.GetHashCode(), + TimeoutMS = timeoutMilliseconds, + Payload = new List(), + ApiVersion = version + }; + + for (var t = 0; t < topicsPerRequest; t++) { + request.Payload.Add( + new Payload { + Topic = topic + t, + Partition = t % totalPartitions, + Codec = MessageCodec.CodecNone, + Messages = GenerateMessages(messagesPerSet, (byte) (version >= 2 ? 1 : 0)) + }); + } + + var data = request.Encode(); + + data.Buffer.AssertProtocol( + reader => { + reader.AssertRequestHeader(request); + reader.AssertProduceRequest(request); + }); + } + + /// + /// ProduceResponse => [TopicName [Partition ErrorCode Offset *Timestamp]] *ThrottleTime + /// *ThrottleTime is only version 1 (0.9.0) and above + /// *Timestamp is only version 2 (0.10.0) and above + /// TopicName => string -- The topic this response entry corresponds to. + /// Partition => int32 -- The partition this response entry corresponds to. + /// ErrorCode => int16 -- The error from this partition, if any. Errors are given on a per-partition basis because a given partition may be + /// unavailable or maintained on a different host, while others may have successfully accepted the produce request. + /// Offset => int64 -- The offset assigned to the first message in the message set appended to this partition. + /// Timestamp => int64 -- If LogAppendTime is used for the topic, this is the timestamp assigned by the broker to the message set. + /// All the messages in the message set have the same timestamp. + /// If CreateTime is used, this field is always -1. The producer can assume the timestamp of the messages in the + /// produce request has been accepted by the broker if there is no error code returned. + /// Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)). + /// ThrottleTime => int32 -- Duration in milliseconds for which the request was throttled due to quota violation. + /// (Zero if the request did not violate any quota). + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + /// + [Test] + public void ProduceApiResponse( + [Values(0, 1, 2)] short version, + [Values(-1, 0, 10000000)] long timestampMilliseconds, + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values( + ErrorResponseCode.NoError, + ErrorResponseCode.InvalidMessage + )] ErrorResponseCode errorCode, + [Values(0, 100000)] int throttleTime) + { + var randomizer = new Randomizer(); + var clientId = "ProduceApiResponse"; + var correlationId = clientId.GetHashCode(); + + byte[] data = null; + using (var stream = new MemoryStream()) { + var writer = new BigEndianBinaryWriter(stream); + writer.WriteResponseHeader(correlationId); + + writer.Write(topicsPerRequest); + for (var t = 0; t < topicsPerRequest; t++) { + writer.Write(topic + t, StringPrefixEncoding.Int16); + writer.Write(1); // partitionsPerTopic + writer.Write(t % totalPartitions); + writer.Write((short)errorCode); + writer.Write((long)randomizer.Next()); + if (version >= 2) { + writer.Write(timestampMilliseconds); + } + } + if (version >= 1) { + writer.Write(throttleTime); + } + data = new byte[stream.Position]; + Buffer.BlockCopy(stream.GetBuffer(), 0, data, 0, data.Length); + } + + var request = new ProduceRequest { ApiVersion = version }; + var responses = request.Decode(data); // doesn't include the size in the decode -- the framework deals with it, I'd assume + data.PrefixWithInt32Length().AssertProtocol( + reader => + { + reader.AssertResponseHeader(correlationId); + reader.AssertProduceResponse(version, throttleTime, responses); + }); + } + + /// + /// FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] + /// ReplicaId => int32 -- The replica id indicates the node id of the replica initiating this request. Normal client consumers should always + /// specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted + /// to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes. + /// MaxWaitTime => int32 -- The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available + /// at the time the request is issued. + /// MinBytes => int32 -- This is the minimum number of bytes of messages that must be available to give a response. If the client sets this + /// to 0 the server will always respond immediately, however if there is no new data since their last request they will + /// just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has + /// at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the + /// consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. + /// setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate + /// 64k of data before responding). + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition the fetch is for. + /// FetchOffset => int64 -- The offset to begin this fetch from. + /// MaxBytes => int32 -- The maximum bytes to include in the message set for this partition. This helps bound the size of the response. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI + /// + [Test] + public void FetchApiRequest( + [Values(0, 1, 2)] short version, + [Values(0, 100)] int timeoutMilliseconds, + [Values(0, 64000)] int minBytes, + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values(25600000)] int maxBytes) + { + var randomizer = new Randomizer(); + var clientId = "FetchApiRequest"; + + var request = new FetchRequest { + ClientId = clientId, + CorrelationId = clientId.GetHashCode(), + Fetches = new List(), + ApiVersion = version + }; + + for (var t = 0; t < topicsPerRequest; t++) { + var payload = new Fetch { + Topic = topic + t, + PartitionId = t % totalPartitions, + Offset = randomizer.Next(0, int.MaxValue), + MaxBytes = maxBytes + }; + request.Fetches.Add(payload); + } + + var data = request.Encode(); + + data.Buffer.AssertProtocol( + reader => { + reader.AssertRequestHeader(request); + reader.AssertFetchRequest(request); + }); + } + + /// + /// FetchResponse => *ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] + /// *ThrottleTime is only version 1 (0.9.0) and above + /// ThrottleTime => int32 -- Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not + /// violate any quota.) + /// TopicName => string -- The topic this response entry corresponds to. + /// Partition => int32 -- The partition this response entry corresponds to. + /// ErrorCode => int16 -- The error from this partition, if any. Errors are given on a per-partition basis because a given partition may + /// be unavailable or maintained on a different host, while others may have successfully accepted the produce request. + /// HighwaterMarkOffset => int64 -- The offset at the end of the log for this partition. This can be used by the client to determine how many messages + /// behind the end of the log they are. + /// MessageSetSize => int32 -- The size in bytes of the message set for this partition + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse + /// + [Test] + public void FetchApiResponse( + [Values(0, 1, 2)] short version, + [Values(0, 1234)] int throttleTime, + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values( + ErrorResponseCode.NoError, + ErrorResponseCode.OffsetOutOfRange + )] ErrorResponseCode errorCode, + [Values(3)] int messagesPerSet + ) + { + var randomizer = new Randomizer(); + var clientId = "FetchApiResponse"; + var correlationId = clientId.GetHashCode(); + + byte[] data = null; + using (var stream = new MemoryStream()) { + var writer = new BigEndianBinaryWriter(stream); + writer.WriteResponseHeader(correlationId); + + if (version >= 1) { + writer.Write(throttleTime); + } + writer.Write(topicsPerRequest); + for (var t = 0; t < topicsPerRequest; t++) { + writer.Write(topic + t, StringPrefixEncoding.Int16); + writer.Write(1); // partitionsPerTopic + writer.Write(t % totalPartitions); + writer.Write((short)errorCode); + writer.Write((long)randomizer.Next()); + + var messageSet = Message.EncodeMessageSet(GenerateMessages(messagesPerSet, (byte) (version >= 2 ? 1 : 0))); + writer.Write(messageSet.Length); + writer.Write(messageSet); + } + data = new byte[stream.Position]; + Buffer.BlockCopy(stream.GetBuffer(), 0, data, 0, data.Length); + } + + var request = new FetchRequest { ApiVersion = version }; + var responses = request.Decode(data); // doesn't include the size in the decode -- the framework deals with it, I'd assume + data.PrefixWithInt32Length().AssertProtocol( + reader => + { + reader.AssertResponseHeader(correlationId); + reader.AssertFetchResponse(version, throttleTime, responses); + }); + } + + /// + /// OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] + /// ReplicaId => int32 -- The replica id indicates the node id of the replica initiating this request. Normal client consumers should always + /// specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted + /// to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes. + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition the fetch is for. + /// Time => int64 -- Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the + /// latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note + /// that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element. + /// MaxNumberOfOffsets => int32 + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) + /// + [Test] + public void OffsetsApiRequest( + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int totalPartitions, + [Values(-2, -1, 123456, 10000000)] long time, + [Values(1, 10)] int maxOffsets) + { + var clientId = "OffsetsApiRequest"; + + var request = new OffsetRequest { + ClientId = clientId, + CorrelationId = clientId.GetHashCode(), + Offsets = new List(), + ApiVersion = 0 + }; + + for (var t = 0; t < topicsPerRequest; t++) { + var payload = new Offset { + Topic = topic + t, + PartitionId = t % totalPartitions, + Time = time, + MaxOffsets = maxOffsets + }; + request.Offsets.Add(payload); + } + + var data = request.Encode(); + + data.Buffer.AssertProtocol( + reader => { + reader.AssertRequestHeader(request); + reader.AssertOffsetRequest(request); + }); + } + + /// + /// OffsetResponse => [TopicName [PartitionOffsets]] + /// PartitionOffsets => Partition ErrorCode [Offset] + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition the fetch is for. + /// ErrorCode => int16 -- The error from this partition, if any. Errors are given on a per-partition basis because a given partition may + /// be unavailable or maintained on a different host, while others may have successfully accepted the produce request. + /// Offset => int64 + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) + /// + [Test] + public void OffsetsApiResponse( + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(5)] int totalPartitions, + [Values( + ErrorResponseCode.UnknownTopicOrPartition, + ErrorResponseCode.NotLeaderForPartition, + ErrorResponseCode.Unknown + )] ErrorResponseCode errorCode, + [Values(1, 5)] int offsetsPerPartition) + { + var randomizer = new Randomizer(); + var clientId = "OffsetsApiResponse"; + var correlationId = clientId.GetHashCode(); + + byte[] data = null; + using (var stream = new MemoryStream()) { + var writer = new BigEndianBinaryWriter(stream); + writer.WriteResponseHeader(correlationId); + + writer.Write(topicsPerRequest); + for (var t = 0; t < topicsPerRequest; t++) { + writer.Write(topic + t, StringPrefixEncoding.Int16); + writer.Write(1); // partitionsPerTopic + writer.Write(t % totalPartitions); + writer.Write((short)errorCode); + writer.Write(offsetsPerPartition); + for (var o = 0; o < offsetsPerPartition; o++) { + writer.Write((long)randomizer.Next()); + } + } + data = new byte[stream.Position]; + Buffer.BlockCopy(stream.GetBuffer(), 0, data, 0, data.Length); + } + + var request = new OffsetRequest { ApiVersion = 0 }; + var responses = request.Decode(data); // doesn't include the size in the decode -- the framework deals with it, I'd assume + data.PrefixWithInt32Length().AssertProtocol( + reader => + { + reader.AssertResponseHeader(correlationId); + reader.AssertOffsetResponse(responses); + }); + } + + /// + /// TopicMetadataRequest => [TopicName] + /// TopicName => string -- The topics to produce metadata for. If no topics are specified fetch metadata for all topics. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + /// + [Test] + public void MetadataApiRequest( + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(0, 1, 10)] int topicsPerRequest) + { + var clientId = "MetadataApiRequest"; + + var request = new MetadataRequest { + ClientId = clientId, + CorrelationId = clientId.GetHashCode(), + Topics = new List(), + ApiVersion = 0 + }; + + for (var t = 0; t < topicsPerRequest; t++) { + request.Topics.Add(topic + t); + } + + var data = request.Encode(); + + data.Buffer.AssertProtocol( + reader => { + reader.AssertRequestHeader(request); + reader.AssertMetadataRequest(request); + }); + } + + /// + /// MetadataResponse => [Broker][TopicMetadata] + /// Broker => NodeId Host Port (any number of brokers may be returned) + /// -- The node id, hostname, and port information for a kafka broker + /// NodeId => int32 -- The broker id. + /// Host => string -- The hostname of the broker. + /// Port => int32 -- The port on which the broker accepts requests. + /// TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] + /// TopicErrorCode => int16 -- The error code for the given topic. + /// TopicName => string -- The name of the topic. + /// PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr + /// PartitionErrorCode => int16 -- The error code for the partition, if any. + /// PartitionId => int32 -- The id of the partition. + /// Leader => int32 -- The id of the broker acting as leader for this partition. + /// If no leader exists because we are in the middle of a leader election this id will be -1. + /// Replicas => [int32] -- The set of all nodes that host this partition. + /// Isr => [int32] -- The set of nodes that are in sync with the leader for this partition. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + /// + [Test] + public void MetadataApiResponse( + [Values(1, 5)] int brokersPerRequest, + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int partitionsPerTopic, + [Values( + ErrorResponseCode.NoError, + ErrorResponseCode.UnknownTopicOrPartition + )] ErrorResponseCode errorCode) + { + var randomizer = new Randomizer(); + var clientId = "MetadataApiResponse"; + var correlationId = clientId.GetHashCode(); + + byte[] data = null; + using (var stream = new MemoryStream()) { + var writer = new BigEndianBinaryWriter(stream); + writer.WriteResponseHeader(correlationId); + + writer.Write(brokersPerRequest); + for (var b = 0; b < brokersPerRequest; b++) { + writer.Write(b); + writer.Write("broker-" + b, StringPrefixEncoding.Int16); + writer.Write(9092 + b); + } + + writer.Write(topicsPerRequest); + for (var t = 0; t < topicsPerRequest; t++) { + writer.Write((short) errorCode); + writer.Write(topic + t, StringPrefixEncoding.Int16); + writer.Write(partitionsPerTopic); + for (var p = 0; p < partitionsPerTopic; p++) { + writer.Write((short) errorCode); + writer.Write(p); + var leader = randomizer.Next(0, brokersPerRequest - 1); + writer.Write(leader); + var replicas = randomizer.Next(0, brokersPerRequest - 1); + writer.Write(replicas); + for (var r = 0; r < replicas; r++) { + writer.Write(r); + } + var isr = randomizer.Next(0, replicas); + writer.Write(isr); + for (var i = 0; i < isr; i++) { + writer.Write(i); + } + } + } + data = new byte[stream.Position]; + Buffer.BlockCopy(stream.GetBuffer(), 0, data, 0, data.Length); + } + + var request = new MetadataRequest {ApiVersion = 0}; + var responses = request.Decode(data).Single(); // note that this is a bit weird + // doesn't include the size in the decode -- the framework deals with it, I'd assume + data.PrefixWithInt32Length().AssertProtocol( + reader => { + reader.AssertResponseHeader(correlationId); + reader.AssertMetadataResponse(responses); + }); + } + + /// + /// OffsetCommitRequest => ConsumerGroup *ConsumerGroupGenerationId *MemberId *RetentionTime [TopicName [Partition Offset *TimeStamp Metadata]] + /// *ConsumerGroupGenerationId, MemberId is only version 1 (0.8.2) and above + /// *TimeStamp is only version 1 (0.8.2) + /// *RetentionTime is only version 2 (0.9.0) and above + /// ConsumerGroupId => string -- The consumer group id. + /// ConsumerGroupGenerationId => int32 -- The generation of the consumer group. + /// MemberId => string -- The consumer id assigned by the group coordinator. + /// RetentionTime => int64 -- Time period in ms to retain the offset. + /// TopicName => string -- The topic to commit. + /// Partition => int32 -- The partition id. + /// Offset => int64 -- message offset to be committed. + /// Timestamp => int64 -- Commit timestamp. + /// Metadata => string -- Any associated metadata the client wants to keep + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + [Test] + public void OffsetCommitApiRequest( + [Values(0, 1, 2)] short version, + [Values("group1", "group2")] string groupId, + [Values(0, 5)] int generation, + [Values(-1, 20000)] int retentionTime, + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(5)] int maxPartitions, + [Values(10)] int maxOffsets, + [Values(null, "something useful for the client")] string metadata) + { + var clientId = "OffsetCommitApiRequest"; + var randomizer = new Randomizer(); + + var request = new OffsetCommitRequest { + ClientId = clientId, + CorrelationId = clientId.GetHashCode(), + ConsumerGroup = groupId, + OffsetCommits = new List(), + ApiVersion = version, + GenerationId = generation, + MemberId = "member" + generation + }; + + if (retentionTime >= 0) { + request.OffsetRetention = TimeSpan.FromMilliseconds(retentionTime); + } + for (var t = 0; t < topicsPerRequest; t++) { + var payload = new OffsetCommit { + Topic = topic + t, + PartitionId = t % maxPartitions, + Offset = randomizer.Next(0, int.MaxValue), + Metadata = metadata + }; + payload.TimeStamp = retentionTime; + request.OffsetCommits.Add(payload); + } + + var data = request.Encode(); + + data.Buffer.AssertProtocol( + reader => + { + reader.AssertRequestHeader(request); + reader.AssertOffsetCommitRequest(request); + }); + } + + /// + /// OffsetCommitResponse => [TopicName [Partition ErrorCode]]] + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition. + /// ErrorCode => int16 -- The error code for the partition, if any. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + [Test] + public void OffsetCommitApiResponse( + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int partitionsPerTopic, + [Values( + ErrorResponseCode.NoError, + ErrorResponseCode.OffsetMetadataTooLargeCode + )] ErrorResponseCode errorCode) + { + var clientId = "OffsetCommitApiResponse"; + var correlationId = clientId.GetHashCode(); + + byte[] data = null; + using (var stream = new MemoryStream()) { + var writer = new BigEndianBinaryWriter(stream); + writer.WriteResponseHeader(correlationId); + + writer.Write(topicsPerRequest); + for (var t = 0; t < topicsPerRequest; t++) { + writer.Write(topic + t, StringPrefixEncoding.Int16); + writer.Write(partitionsPerTopic); + for (var p = 0; p < partitionsPerTopic; p++) { + writer.Write(p); + writer.Write((short) errorCode); + } + } + data = new byte[stream.Position]; + Buffer.BlockCopy(stream.GetBuffer(), 0, data, 0, data.Length); + } + + var request = new OffsetCommitRequest {ApiVersion = 0}; + var responses = request.Decode(data); + // doesn't include the size in the decode -- the framework deals with it, I'd assume + data.PrefixWithInt32Length().AssertProtocol( + reader => { + reader.AssertResponseHeader(correlationId); + reader.AssertOffsetCommitResponse(responses); + }); + } + + /// + /// OffsetFetchRequest => ConsumerGroup [TopicName [Partition]] + /// ConsumerGroup => string -- The consumer group id. + /// TopicName => string -- The topic to commit. + /// Partition => int32 -- The partition id. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + [Test] + public void OffsetFetchApiRequest( + [Values("group1", "group2")] string groupId, + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(5)] int maxPartitions) + { + var clientId = "OffsetFetchApiRequest"; + + var request = new OffsetFetchRequest { + ClientId = clientId, + CorrelationId = clientId.GetHashCode(), + ConsumerGroup = groupId, + Topics = new List(), + ApiVersion = 0 + }; + + for (var t = 0; t < topicsPerRequest; t++) { + var payload = new OffsetFetch { + Topic = topic + t, + PartitionId = t % maxPartitions + }; + request.Topics.Add(payload); + } + + var data = request.Encode(); + + data.Buffer.AssertProtocol( + reader => + { + reader.AssertRequestHeader(request); + reader.AssertOffsetFetchRequest(request); + }); + } + + /// + /// OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]] + /// TopicName => string -- The name of the topic. + /// Partition => int32 -- The id of the partition. + /// Offset => int64 -- The offset, or -1 if none exists. + /// Metadata => string -- The metadata associated with the topic and partition. + /// ErrorCode => int16 -- The error code for the partition, if any. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + [Test] + public void OffsetFetchApiResponse( + [Values("test", "a really long name, with spaces and punctuation!")] string topic, + [Values(1, 10)] int topicsPerRequest, + [Values(1, 5)] int partitionsPerTopic, + [Values( + ErrorResponseCode.NoError, + ErrorResponseCode.UnknownTopicOrPartition, + ErrorResponseCode.OffsetsLoadInProgressCode, + ErrorResponseCode.NotCoordinatorForConsumerCode, + ErrorResponseCode.IllegalGeneration, + ErrorResponseCode.UnknownMemberId, + ErrorResponseCode.TopicAuthorizationFailed, + ErrorResponseCode.GroupAuthorizationFailed + )] ErrorResponseCode errorCode) + { + var clientId = "OffsetFetchApiResponse"; + var correlationId = clientId.GetHashCode(); + var randomizer = new Randomizer(); + + byte[] data = null; + using (var stream = new MemoryStream()) { + var writer = new BigEndianBinaryWriter(stream); + writer.WriteResponseHeader(correlationId); + + writer.Write(topicsPerRequest); + for (var t = 0; t < topicsPerRequest; t++) { + writer.Write(topic + t, StringPrefixEncoding.Int16); + writer.Write(partitionsPerTopic); + for (var p = 0; p < partitionsPerTopic; p++) { + writer.Write(p); + var offset = (long)randomizer.Next(int.MinValue, int.MaxValue); + writer.Write(offset); + writer.Write(offset >= 0 ? topic : string.Empty, StringPrefixEncoding.Int16); + writer.Write((short) errorCode); + } + } + data = new byte[stream.Position]; + Buffer.BlockCopy(stream.GetBuffer(), 0, data, 0, data.Length); + } + + var request = new OffsetFetchRequest {ApiVersion = 0}; + var responses = request.Decode(data); + // doesn't include the size in the decode -- the framework deals with it, I'd assume + data.PrefixWithInt32Length().AssertProtocol( + reader => { + reader.AssertResponseHeader(correlationId); + reader.AssertOffsetFetchResponse(responses); + }); + } + + /// + /// GroupCoordinatorRequest => GroupId + /// GroupId => string -- The consumer group id. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + [Test] + public void GroupCoordinatorApiRequest([Values("group1", "group2")] string groupId) + { + var clientId = "GroupCoordinatorApiRequest"; + + var request = new ConsumerMetadataRequest { + ClientId = clientId, + CorrelationId = clientId.GetHashCode(), + ConsumerGroup = groupId, + ApiVersion = 0 + }; + + var data = request.Encode(); + + data.Buffer.AssertProtocol( + reader => + { + reader.AssertRequestHeader(request); + reader.AssertGroupCoordinatorRequest(request); + }); + } + + /// + /// GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort + /// ErrorCode => int16 -- The error code. + /// CoordinatorId => int32 -- The broker id. + /// CoordinatorHost => string -- The hostname of the broker. + /// CoordinatorPort => int32 -- The port on which the broker accepts requests. + /// + /// From https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + /// + [Test] + public void GroupCoordinatorApiResponse( + [Values( + ErrorResponseCode.NoError, + ErrorResponseCode.ConsumerCoordinatorNotAvailableCode, + ErrorResponseCode.GroupAuthorizationFailed + )] ErrorResponseCode errorCode, + [Values(0, 1)] int coordinatorId + ) + { + var clientId = "GroupCoordinatorApiResponse"; + var correlationId = clientId.GetHashCode(); + + byte[] data = null; + using (var stream = new MemoryStream()) { + var writer = new BigEndianBinaryWriter(stream); + writer.WriteResponseHeader(correlationId); + writer.Write((short)errorCode); + writer.Write(coordinatorId); + writer.Write("broker-" + coordinatorId, StringPrefixEncoding.Int16); + writer.Write(9092 + coordinatorId); + + data = new byte[stream.Position]; + Buffer.BlockCopy(stream.GetBuffer(), 0, data, 0, data.Length); + } + + var request = new ConsumerMetadataRequest {ApiVersion = 0}; + var responses = request.Decode(data).Single(); + // doesn't include the size in the decode -- the framework deals with it, I'd assume + data.PrefixWithInt32Length().AssertProtocol( + reader => { + reader.AssertResponseHeader(correlationId); + reader.AssertGroupCoordinatorResponse(responses); + }); + } + + private List GenerateMessages(int count, byte version) + { + var randomizer = new Randomizer(); + var messages = new List(); + for (var m = 0; m < count; m++) { + var message = new Message { + MagicNumber = version, + Timestamp = DateTime.UtcNow, + Key = m > 0 ? new byte[8] : null, + Value = new byte[8*(m + 1)] + }; + if (message.Key != null) { + randomizer.NextBytes(message.Key); + } + randomizer.NextBytes(message.Value); + messages.Add(message); + } + return messages; + } + } +} \ No newline at end of file diff --git a/src/kafka-tests/kafka-tests.csproj b/src/kafka-tests/kafka-tests.csproj index 5665176b..cf701291 100644 --- a/src/kafka-tests/kafka-tests.csproj +++ b/src/kafka-tests/kafka-tests.csproj @@ -90,6 +90,8 @@ + +