From 43acb64cdae882e3d32c17db1fbe9e8980e2a6f0 Mon Sep 17 00:00:00 2001 From: Martin Jensen Date: Mon, 7 Sep 2015 08:52:40 +0200 Subject: [PATCH 1/5] Fixed set offset bug. --- src/kafka-net/Consumer.cs | 22 +++++++++---------- .../ProducerConsumerIntegrationTests.cs | 8 +++---- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 99b9afcb..07d32600 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -21,7 +21,7 @@ public class Consumer : IMetadataQueries, IDisposable private readonly BlockingCollection _fetchResponseQueue; private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); private readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary(); + private readonly ConcurrentDictionary> _partitionOffsetIndex = new ConcurrentDictionary>(); private readonly IMetadataQueries _metadataQueries; private int _disposeCount; @@ -33,7 +33,7 @@ public Consumer(ConsumerOptions options, params OffsetPosition[] positions) _options = options; _fetchResponseQueue = new BlockingCollection(_options.ConsumerBufferSize); _metadataQueries = new MetadataQueries(_options.Router); - + SetOffsetPosition(positions); } @@ -62,7 +62,7 @@ public void SetOffsetPosition(params OffsetPosition[] positions) foreach (var position in positions) { var temp = position; - _partitionOffsetIndex.AddOrUpdate(position.PartitionId, i => temp.Offset, (i, l) => temp.Offset); + _partitionOffsetIndex.AddOrUpdate(position.PartitionId, i => new Tuple(temp.Offset, true), (i, l) => new Tuple(temp.Offset, true)); } } @@ -73,7 +73,7 @@ public void SetOffsetPosition(params OffsetPosition[] positions) /// Will only return data if the consumer is actively being consumed. public List GetOffsetPosition() { - return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value }).ToList(); + return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value.Item1 }).ToList(); } private void EnsurePartitionPollingThreads() @@ -125,7 +125,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) { //get the current offset, or default to zero if not there. long offset = 0; - _partitionOffsetIndex.AddOrUpdate(partitionId, i => offset, (i, currentOffset) => { offset = currentOffset; return currentOffset; }); + _partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple(offset, false), (i, currentOffset) => { offset = currentOffset.Item1; return currentOffset; }); //build a fetch request for partition at offset var fetch = new Fetch @@ -139,11 +139,11 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) var fetches = new List { fetch }; var fetchRequest = new FetchRequest - { - MaxWaitTime = (int)Math.Min((long)int.MaxValue, _options.MaxWaitTimeForMinimumBytes.TotalMilliseconds), - MinBytes = _options.MinimumBytes, - Fetches = fetches - }; + { + MaxWaitTime = (int)Math.Min((long)int.MaxValue, _options.MaxWaitTimeForMinimumBytes.TotalMilliseconds), + MinBytes = _options.MinimumBytes, + Fetches = fetches + }; //make request and post to queue var route = _options.Router.SelectBrokerRoute(topic, partitionId); @@ -166,7 +166,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) } var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1; - _partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset); + _partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple(nextOffset, false), (i, l) => l.Item2 ? l : new Tuple(nextOffset, false)); // sleep is not needed if responses were received continue; diff --git a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs index 9a88d9d0..cb2e3bca 100644 --- a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs @@ -88,7 +88,7 @@ public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() using (var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets)) { - for (int i = 0; i < 20; i++) + for (int i = 0; i < 200; i++) { producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new[] { new Message(i.ToString(), testId) }).Wait(); } @@ -105,10 +105,10 @@ public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() //seek back to initial offset consumer.SetOffsetPosition(offsets); - var resetPositionMessages = consumer.Consume().Take(20).ToList(); + var resetPositionMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList(); //ensure all produced messages arrive again - Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value).ToList())); + Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList())); Assert.That(resetPositionMessages.Count, Is.EqualTo(20)); Assert.That(resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expected)); @@ -198,7 +198,7 @@ public async void ConsumerShouldMoveToNextAvailableOffsetWhenQueryingForNextMess offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray())) { Console.WriteLine("Sending {0} test messages", expectedCount); - var response = await producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, + var response = await producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, Enumerable.Range(0, expectedCount).Select(x => new Message(x.ToString()))); Assert.That(response.Any(x => x.Error != (int)ErrorResponseCode.NoError), Is.False, "Error occured sending test messages to server."); From 5b5b46802df486a2f6b1d4a30ee313ade732d058 Mon Sep 17 00:00:00 2001 From: Martin Jensen Date: Mon, 7 Sep 2015 09:02:19 +0200 Subject: [PATCH 2/5] Faster running integration test. --- .../Integration/ProducerConsumerIntegrationTests.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs index cb2e3bca..444b97cc 100644 --- a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs @@ -88,10 +88,8 @@ public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() using (var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets)) { - for (int i = 0; i < 200; i++) - { - producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new[] { new Message(i.ToString(), testId) }).Wait(); - } + int iter = 0; + producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new int[1000].Select(i => new Message(iter++.ToString(), testId))).Wait(); var sentMessages = consumer.Consume().Take(20).ToList(); From 6de1f8d2763bc190d97bf00b28d4c04f2bd6bbfe Mon Sep 17 00:00:00 2001 From: Martin Jensen Date: Tue, 8 Sep 2015 15:34:55 +0200 Subject: [PATCH 3/5] Fixed introduced bug. --- src/kafka-net/Consumer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 07d32600..00e0b0ff 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -166,7 +166,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) } var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1; - _partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple(nextOffset, false), (i, l) => l.Item2 ? l : new Tuple(nextOffset, false)); + _partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple(nextOffset, false), (i, l) => l.Item2 ? new Tuple(l.Item1, false) : new Tuple(nextOffset, false)); // sleep is not needed if responses were received continue; From 57d7d8781c1160010d66506fb29e485b5686b926 Mon Sep 17 00:00:00 2001 From: Martin Jensen Date: Tue, 8 Sep 2015 15:58:13 +0200 Subject: [PATCH 4/5] Fixed another introduced bug. --- src/kafka-net/Consumer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 00e0b0ff..05037442 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -125,7 +125,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) { //get the current offset, or default to zero if not there. long offset = 0; - _partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple(offset, false), (i, currentOffset) => { offset = currentOffset.Item1; return currentOffset; }); + _partitionOffsetIndex.AddOrUpdate(partitionId, i => new Tuple(offset, false), (i, currentOffset) => { offset = currentOffset.Item1; return new Tuple(currentOffset.Item1, false); }); //build a fetch request for partition at offset var fetch = new Fetch From 599f699440295321edf33d6519685d5f3e5655d3 Mon Sep 17 00:00:00 2001 From: Martin Jensen Date: Wed, 9 Sep 2015 09:37:04 +0200 Subject: [PATCH 5/5] More in-depth set offset test. --- .../ProducerConsumerIntegrationTests.cs | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs index 444b97cc..58a1493d 100644 --- a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs @@ -77,7 +77,9 @@ public void ConsumerShouldConsumeInSameOrderAsProduced() [Test] public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() { - var expected = new List { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19" }; + var expectedFrom0 = new List { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19" }; + var expectedFrom250 = new List { "250", "251", "252", "253", "254", "255", "256", "257", "258", "259", "260", "261", "262", "263", "264", "265", "266", "267", "268", "269" }; + var expectedFrom500 = new List { "500", "501", "502", "503", "504", "505", "506", "507", "508", "509", "510", "511", "512", "513", "514", "515", "516", "517", "518", "519" }; var testId = Guid.NewGuid().ToString(); using (var router = new BrokerRouter(new KafkaOptions(IntegrationConfig.IntegrationUri))) @@ -86,21 +88,29 @@ public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() var offsets = producer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result .Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray(); + var initialOffset = offsets.First().Offset; + using (var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets)) { int iter = 0; producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new int[1000].Select(i => new Message(iter++.ToString(), testId))).Wait(); - var sentMessages = consumer.Consume().Take(20).ToList(); + consumer.Consume().Take(900).ToList(); + + consumer.SetOffsetPosition(offsets); + var sentMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList(); //ensure the produced messages arrived Console.WriteLine("Message order: {0}", string.Join(", ", sentMessages.Select(x => x.Value.ToUtf8String()).ToList())); Assert.That(sentMessages.Count, Is.EqualTo(20)); - Assert.That(sentMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expected)); + Assert.That(sentMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expectedFrom0)); Assert.That(sentMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False); + consumer.Consume().Take(900).ToList(); + //seek back to initial offset + offsets.First().Offset = initialOffset + 500; consumer.SetOffsetPosition(offsets); var resetPositionMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList(); @@ -109,7 +119,23 @@ public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList())); Assert.That(resetPositionMessages.Count, Is.EqualTo(20)); - Assert.That(resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expected)); + var actual = resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(); + Assert.That(actual, Is.EqualTo(expectedFrom500)); + Assert.That(resetPositionMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False); + + consumer.Consume().Take(400).ToList(); + + //seek back to initial offset + offsets.First().Offset = initialOffset + 250; + consumer.SetOffsetPosition(offsets); + + resetPositionMessages = consumer.Consume().SkipWhile(x => x.Meta.Offset != offsets.First().Offset).Take(20).ToList(); + + //ensure all produced messages arrive again + Console.WriteLine("Message order: {0}", string.Join(", ", resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList())); + + Assert.That(resetPositionMessages.Count, Is.EqualTo(20)); + Assert.That(resetPositionMessages.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expectedFrom250)); Assert.That(resetPositionMessages.Any(x => x.Key.ToUtf8String() != testId), Is.False); } }