Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] For Kafka .8.2+ Provide Way to Cache and work with Offset Coordinators Similar to BrokerRouter #40

Open
chadsowald opened this issue Jan 16, 2015 · 9 comments

Comments

@chadsowald
Copy link

Hi. I'm using this library and it has been very helpful. Thank you!

I'm using it and testing against both Kafka .8.1.1 and .8.2-beta with 3 Kafka nodes clustered. While I'm able to get offset management working in both cases, I have to do so differently depending on the Kafka version I'm targeting. Specifically, the new Offset Coordinator concept (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) doesn't allow us to always issue different request types the same way.

In .8.1.1, for any request I can do:

BrokerRouter.SelectBrokerRoute(....).SendAsync(...request)

But, in .8.2-beta I can do the same for everything except OffsetFetch and OffsetCommit, where I have to do:

OffsetCoordinatorConnection.Send(...request)

where I had to create and manage the OffsetCoordinatorConnection (IKafkaConnection), basically repeating what the BrokerRouter already does.

I couldn't use the BrokerRouter to discover the connection for those request types because the leader for a group/topic/partition and the offset coordinator for that same group/topic/partition are not necessarily the same broker, but BrokerRouter.SelectBrokerRoute only ever selects the leader.

As such, I will have to implement my own caching (and refreshing) for these offset coordinators that basically is the same caching you do in Broker Router.

I wanted to share my experience because I suspect you are already aware of these differences, but I wasn't sure what plans you might have to address them. I see that some of your offset management integration tests are ignored for likely these reasons.

Thank you!

@Jroland
Copy link
Owner

Jroland commented Jan 20, 2015

Good to know, I have not done too much testing against 8.2 and was not aware about this particular issue yet. If you see anymore things not working quite right, please do post more issues.

@chadsowald
Copy link
Author

Sounds good and thanks again!

@bschmidtbauer
Copy link

Do you still have this problem with 8.2? I have been using an 8.1 & 8.2 cluster with 3 brokers and have been able to use the offset API without issue.

I'm currently just doing something simple like the below where it is using an extension method on the ConsumerOptions class:

        public static Task<List<OffsetCommitResponse>> SaveConsumerGroupOffsetAsync(this ConsumerOptions options, string consumerGroup, int partitionId, long offset)
        {
            var commitRequest = CreateOffsetCommitRequest(options.Topic, consumerGroup, partitionId, offset);
            var route = options.Router.SelectBrokerRoute(options.Topic, partitionId);
            return route.Connection.SendAsync(commitRequest);
        }

It's certainly possible I just haven't exercised this enough to find where it falls down.

The problem I'm running into is support for v1 of that API - currently everything is fixed at using v0 and with this API that means zookeeper as the storage mechanism. I would like to use the approach with v1 of this API which is storing the offsets into a Kafka topic. I want to be sure that we aren't talking about the same thing before opening another issue.

I would also like to say thanks to all involved as this has been a very helpful client.

Thanks

@nrandell
Copy link

nrandell commented Apr 2, 2015

I've just been looking at this as I'm interesting in using the new kafka version to store offsets internally rather than in Zookeeper. As far as I can tell, the current implementation only supports version 0 of the offset commit request, so information is only stored in zookeeper.

@bschmidtbauer
Copy link

Correct - all API's are currently set at v0.
protected const Int16 ApiVersion = 0;

The v1 of that API doesn't look like much of a change but I don't really know if the below parameters mean that you need to participate in the Group Management capabilities or can just send them as null/default:

GroupGenerationId
ConsumerId

You can see how they are used for what is out there for the 0.9 release and corresponding Java client redesign:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/

I'm not sure what the overall plan is for supporting different versions of the individual API's over time in this project. Maybe it's not necessary but I suspect there are cases where people will care and have different opinions on how it should go about it.

Are there current plans for how these individual API versions will be handled? I don't want to add an issue if this is already in the works.

i can certainly try and help if something is needed.

@nrandell
Copy link

nrandell commented Apr 2, 2015

I've got a version currently working using the version 1 api. Just going through looking at the tests to make sure that they all pass. I'm also going to create version 0 tests as well just to make sure nothing appears to be broken.

@Jroland
Copy link
Owner

Jroland commented Apr 2, 2015

@nrandell great, if you have something working send in a PR. The next thing on my list was to do this upgrade, but I found a couple problems with the last release that I need to fix first. So it would be great if you could do this one. Send in a PR early even if you are not done quite yet, if you want me to look at it.

@nrandell
Copy link

nrandell commented Apr 2, 2015

I've just created pull request #48 for this. However one unit test in ProducerConsumerIntegrationTests seems to fail occasionally and I'm not sure why

@bschmidtbauer
Copy link

Awesome! I pulled it down and was able to run it successfully.

The offset reads are very fast but my commits are still slow, I suspect this is because I'm just using the broker for the topic/partition (via BrokerRouter) that I'm committing for and not actually the correct one that is managing the offset commit partition.

It looks like the ConsumerMetadataRequest request now works after having saved an offset so I suspect we can now properly build the coordinator for offsets (which was what this issue originally started as).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants