Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add ability to specify on_assign, on_revoke, and on_lost callbacks for a Confluent subscriber #1676

Open
andreaimprovised opened this issue Aug 12, 2024 · 0 comments
Assignees
Labels
Confluent Issues related to `faststream.confluent` module enhancement New feature or request good first issue Good for newcomers

Comments

@andreaimprovised
Copy link
Contributor

Is your feature request related to a problem? Please describe.
Yes, I want to know when my confluent Consumer gets topic partitions assigned and removed.

Currently, I reach through FastStream into confluent_kafka.Consumer.assignment() every time my k8s liveness probe runs, but its noisy and most notably, not right when it happens.

I may even, at times, want to do something with the informaiton beyond logging. Potentially clear some cached state, cancel some running threads/processes, etc...

Describe the solution you'd like

I want to specify at the subscriber registration level the callbacks that I want called, and for FastStream to pass them into the

confluent_kafka.Consumer.subscribe() call inside AsyncConfluentConsumer.

Feature code example

from faststream import FastStream

...

broker = KafkaBroker(...)

@broker.subscriber(
    "my-topic",
    on_assign=lambda consumer, partitions: ...,
    on_revoke=lambda consumer, partitions: ...,
)
def my_handler(body: str):
    print(body)

Describe alternatives you've considered

I monkey patch AsyncConfluentConsumer at import time in the FastStream library.

import faststream.confluent.broker.broker
from faststream.confluent.broker.broker import AsyncConfluentConsumer
from observing.observing import logger


class PatchedAsyncConfluentConsumer(AsyncConfluentConsumer):
    """A patched version of the AsyncConfluentConsumer class."""

    def __init__(self, *topics, **kwargs):
        super().__init__(*topics, **kwargs)
        self.topic_partitions = set()

    def on_revoke(self, consumer, partitions):
        """Conditionally pauses the consumer when partitions are revoked."""
        self.topic_partitions -= set(partitions)
        logger.info(
            "Consumer rebalance event: partitions revoked.",
            topic_partitions=dict(
                n_revoked=len(partitions),
                revoked=[
                    dict(topic=tp.topic, partition=tp.partition) for tp in partitions
                ],
                n_current=len(self.topic_partitions),
                current=[
                    dict(topic=tp.topic, partition=tp.partition)
                    for tp in self.topic_partitions
                ],
            ),
            memberid=self.consumer.memberid(),
            topics=self.topics,
            config=dict(
                group_id=self.config.get("group.id"),
                group_instance_id=self.config.get("group.instance.id"),
            ),
        )

    def on_assign(self, consumer, partitions):
        """Conditionally resumes the consumer when partitions are assigned."""
        self.topic_partitions |= set(partitions)
        logger.info(
            "Consumer rebalance event: partitions assigned.",
            topic_partitions=dict(
                n_assigned=len(partitions),
                assigned=[
                    dict(topic=tp.topic, partition=tp.partition) for tp in partitions
                ],
                n_current=len(self.topic_partitions),
                current=[
                    dict(topic=tp.topic, partition=tp.partition)
                    for tp in self.topic_partitions
                ],
            ),
            memberid=self.consumer.memberid(),
            topics=self.topics,
            config=dict(
                group_id=self.config.get("group.id"),
                group_instance_id=self.config.get("group.instance.id"),
            ),
        )

    async def start(self) -> None:
        """Starts the Kafka consumer and subscribes to the specified topics."""
        self.consumer.subscribe(
            self.topics, on_revoke=self.on_revoke, on_assign=self.on_assign
        )


def patch_async_confluent_consumer():
    logger.info("Patching AsyncConfluentConsumer.")
    faststream.confluent.broker.broker.AsyncConfluentConsumer = (
        PatchedAsyncConfluentConsumer
    )

Obviously, this is ideal for no one.

Additional context

@andreaimprovised andreaimprovised added the enhancement New feature or request label Aug 12, 2024
@Lancetnik Lancetnik added the Confluent Issues related to `faststream.confluent` module label Aug 13, 2024
@kumaranvpl kumaranvpl self-assigned this Aug 13, 2024
@Lancetnik Lancetnik added the good first issue Good for newcomers label Aug 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Confluent Issues related to `faststream.confluent` module enhancement New feature or request good first issue Good for newcomers
Projects
Status: Backlog
Development

No branches or pull requests

3 participants