Skip to content

Commit

Permalink
chore: fix pydantic 2.10 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Nov 7, 2024
1 parent af70c5b commit aa1562d
Show file tree
Hide file tree
Showing 43 changed files with 527 additions and 448 deletions.
35 changes: 30 additions & 5 deletions faststream/_internal/broker/abc_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Optional,
)

from faststream._internal.state import BrokerState, Pointer
from faststream._internal.types import BrokerMiddleware, CustomCallable, MsgType

if TYPE_CHECKING:
Expand All @@ -29,6 +30,7 @@ def __init__(
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
include_in_schema: Optional[bool],
state: "BrokerState",
) -> None:
self.prefix = prefix
self.include_in_schema = include_in_schema
Expand All @@ -41,6 +43,8 @@ def __init__(
self._parser = parser
self._decoder = decoder

self._state = Pointer(state)

def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
"""Append BrokerMiddleware to the end of middlewares list.
Expand All @@ -58,20 +62,37 @@ def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
def subscriber(
self,
subscriber: "SubscriberProto[MsgType]",
is_running: bool = False,
) -> "SubscriberProto[MsgType]":
subscriber.add_prefix(self.prefix)
self._subscribers.append(subscriber)
if not is_running:
self._subscribers.append(subscriber)
return subscriber

@abstractmethod
def publisher(
self,
publisher: "PublisherProto[MsgType]",
is_running: bool = False,
) -> "PublisherProto[MsgType]":
publisher.add_prefix(self.prefix)
self._publishers.append(publisher)

if not is_running:
self._publishers.append(publisher)

return publisher

def setup_publisher(
self,
publisher: "PublisherProto[MsgType]",
**kwargs: Any,
) -> None:
"""Setup the Publisher to prepare it to starting."""
publisher._setup(**kwargs, state=self._state)

def _setup(self, state: "Pointer[BrokerState]") -> None:
self._state.set(state)

def include_router(
self,
router: "ABCBroker[Any]",
Expand All @@ -82,6 +103,8 @@ def include_router(
include_in_schema: Optional[bool] = None,
) -> None:
"""Includes a router in the current object."""
router._setup(self._state)

for h in router._subscribers:
h.add_prefix(f"{self.prefix}{prefix}")

Expand Down Expand Up @@ -126,6 +149,8 @@ def include_routers(
self.include_router(r)

def _solve_include_in_schema(self, include_in_schema: bool) -> bool:
if self.include_in_schema is None or self.include_in_schema:
return include_in_schema
return self.include_in_schema
# should be `is False` to pass `None` case
if self.include_in_schema is False:
return False

return include_in_schema
82 changes: 34 additions & 48 deletions faststream/_internal/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
SetupAble,
)
from faststream._internal.state.broker import (
BrokerState,
InitialBrokerState,
)
from faststream._internal.state.producer import ProducerUnset
Expand Down Expand Up @@ -137,6 +136,20 @@ def __init__(
],
**connection_kwargs: Any,
) -> None:
state = InitialBrokerState(
di_state=DIState(
use_fastdepends=apply_types,
get_dependent=_get_dependant,
call_decorators=_call_decorators,
serializer=serializer,
provider=Provider(),
context=ContextRepo(),
),
logger_state=logger_state,
graceful_timeout=graceful_timeout,
producer=ProducerUnset(),
)

super().__init__(
middlewares=middlewares,
dependencies=dependencies,
Expand All @@ -151,6 +164,7 @@ def __init__(
# Broker is a root router
include_in_schema=True,
prefix="",
state=state,
)

self.running = False
Expand All @@ -163,20 +177,6 @@ def __init__(
*self.middlewares,
)

self._state: BrokerState = InitialBrokerState(
di_state=DIState(
use_fastdepends=apply_types,
get_dependent=_get_dependant,
call_decorators=_call_decorators,
serializer=serializer,
provider=Provider(),
context=ContextRepo(),
),
logger_state=logger_state,
graceful_timeout=graceful_timeout,
producer=ProducerUnset(),
)

# AsyncAPI information
self.url = specification_url
self.protocol = protocol
Expand All @@ -187,15 +187,15 @@ def __init__(

@property
def _producer(self) -> "ProducerProto":
return self._state.producer
return self._state.get().producer

@property
def context(self) -> "ContextRepo":
return self._state.di_state.context
return self._state.get().di_state.context

@property
def provider(self) -> Provider:
return self._state.di_state.provider
return self._state.get().di_state.provider

async def __aenter__(self) -> "Self":
await self.connect()
Expand All @@ -213,20 +213,25 @@ async def __aexit__(
async def start(self) -> None:
"""Start the broker async use case."""
# TODO: filter by already running handlers after TestClient refactor
state = self._state.get()

for subscriber in self._subscribers:
log_context = subscriber.get_log_context(None)
log_context.pop("message_id", None)
self._state.logger_state.params_storage.setup_log_contest(log_context)
state.logger_state.params_storage.setup_log_contest(log_context)

self._state._setup_logger_state()
state._setup_logger_state()

for subscriber in self._subscribers:
self._state.logger_state.log(
state.logger_state.log(
f"`{subscriber.call_name}` waiting for messages",
extra=subscriber.get_log_context(None),
)
await subscriber.start()

if not self.running:
self.running = True

async def connect(self, **kwargs: Any) -> ConnectionType:
"""Connect to a remote server."""
if self._connection is None:
Expand All @@ -246,13 +251,15 @@ def _setup(self, di_state: Optional[DIState] = None) -> None:
Method should be idempotent due could be called twice
"""
broker_serializer = self._state.di_state.serializer
broker_state = self._state.get()
current_di_state = broker_state.di_state
broker_serializer = current_di_state.serializer

if di_state is not None:
if broker_serializer is EMPTY:
broker_serializer = di_state.serializer

self._state.di_state.update(
current_di_state.update(
serializer=broker_serializer,
provider=di_state.provider,
context=di_state.context,
Expand All @@ -266,15 +273,11 @@ def _setup(self, di_state: Optional[DIState] = None) -> None:

broker_serializer = PydanticSerializer()

self._state.di_state.update(
current_di_state.update(
serializer=broker_serializer,
)

self._state._setup()

# TODO: move to start
if not self.running:
self.running = True
broker_state._setup()

# TODO: move setup to object creation
for h in self._subscribers:
Expand All @@ -293,16 +296,6 @@ def setup_subscriber(
data.update(kwargs)
subscriber._setup(**data, state=self._state)

def setup_publisher(
self,
publisher: "PublisherProto[MsgType]",
**kwargs: Any,
) -> None:
"""Setup the Publisher to prepare it to starting."""
data = self._publisher_setup_extra.copy()
data.update(kwargs)
publisher._setup(**data, state=self._state)

@property
def _subscriber_setup_extra(self) -> "AnyDict":
return {
Expand All @@ -314,16 +307,9 @@ def _subscriber_setup_extra(self) -> "AnyDict":
"broker_decoder": self._decoder,
}

@property
def _publisher_setup_extra(self) -> "AnyDict":
return {
"producer": self._producer,
}

def publisher(self, *args: Any, **kwargs: Any) -> "PublisherProto[MsgType]":
pub = super().publisher(*args, **kwargs)
if self.running:
self.setup_publisher(pub)
pub = super().publisher(*args, **kwargs, is_running=self.running)
self.setup_publisher(pub)
return pub

async def close(
Expand Down
25 changes: 13 additions & 12 deletions faststream/_internal/broker/pub_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import abstractmethod
from collections.abc import Iterable
from functools import partial
from typing import TYPE_CHECKING, Any, Generic, Optional
from typing import TYPE_CHECKING, Any, Generic

from faststream._internal.subscriber.utils import process_msg
from faststream._internal.types import MsgType
Expand All @@ -25,19 +25,20 @@ async def publish(
message: "SendableMessage",
queue: str,
/,
) -> None:
) -> Any:
raise NotImplementedError

async def _basic_publish(
self,
cmd: "PublishCommand",
*,
producer: "ProducerProto",
) -> Optional[Any]:
) -> Any:
publish = producer.publish
context = self.context # caches property

for m in self.middlewares:
publish = partial(m(None, context=self.context).publish_scope, publish)
publish = partial(m(None, context=context).publish_scope, publish)

return await publish(cmd)

Expand All @@ -46,21 +47,22 @@ async def publish_batch(
self,
*messages: "SendableMessage",
queue: str,
) -> None:
) -> Any:
raise NotImplementedError

async def _basic_publish_batch(
self,
cmd: "PublishCommand",
*,
producer: "ProducerProto",
) -> None:
) -> Any:
publish = producer.publish_batch
context = self.context # caches property

for m in self.middlewares:
publish = partial(m(None, context=self.context).publish_scope, publish)
publish = partial(m(None, context=context).publish_scope, publish)

await publish(cmd)
return await publish(cmd)

@abstractmethod
async def request(
Expand All @@ -79,17 +81,16 @@ async def _basic_request(
producer: "ProducerProto",
) -> Any:
request = producer.request
context = self.context # caches property

for m in self.middlewares:
request = partial(m(None, context=self.context).publish_scope, request)
request = partial(m(None, context=context).publish_scope, request)

published_msg = await request(cmd)

response_msg: Any = await process_msg(
msg=published_msg,
middlewares=(
m(published_msg, context=self.context) for m in self.middlewares
),
middlewares=(m(published_msg, context=context) for m in self.middlewares),
parser=producer._parser,
decoder=producer._decoder,
source_type=SourceType.RESPONSE,
Expand Down
2 changes: 2 additions & 0 deletions faststream/_internal/broker/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Optional,
)

from faststream._internal.state.broker import EmptyBrokerState
from faststream._internal.types import (
BrokerMiddleware,
CustomCallable,
Expand Down Expand Up @@ -81,6 +82,7 @@ def __init__(
parser=parser,
decoder=decoder,
include_in_schema=include_in_schema,
state=EmptyBrokerState("You should include router to any broker."),
)

for h in handlers:
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/cli/utils/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ def set_log_level(level: int, app: "FastStream") -> None:
if app.logger and getattr(app.logger, "setLevel", None):
app.logger.setLevel(level) # type: ignore[attr-defined]

app.broker._state.logger_state.set_level(level)
app.broker._state.get().logger_state.set_level(level)
4 changes: 2 additions & 2 deletions faststream/_internal/publisher/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

if TYPE_CHECKING:
from faststream._internal.basic_types import SendableMessage
from faststream._internal.state import BrokerState
from faststream._internal.state import BrokerState, Pointer
from faststream._internal.types import (
AsyncCallable,
BrokerMiddleware,
Expand Down Expand Up @@ -109,7 +109,7 @@ def _setup( # type: ignore[override]
self,
*,
producer: Optional["ProducerProto"],
state: "BrokerState",
state: "Pointer[BrokerState]",
) -> None: ...

@abstractmethod
Expand Down
Loading

0 comments on commit aa1562d

Please sign in to comment.