You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
For some reason, I need to delay the creation of an app's broker in an on_startup hook. This is mainly because my application has a pretty heavy warmup bootstrap that loads a lot of data into memory so I need to delay this bootstrap (which actually includes broker creation) after the parent process forked into worker subprocesses, otherwise the parent process has an unnecessary high memory footprint (that only worker processes need to have).
E.g.
fromfaststreamimportFastStreamfromfaststream.kafkaimportKafkaBrokerapp=FastStream()
@app.on_startupdefbootstrap():
app.set_broker(KafkaBroker(["localhost:9092"]))
# some warmup of the application pre-loading data into memory
...
For now, this isn't working because of this assert
assertself.broker, "You should setup a broker"# nosec B101
which occurs at the very begining of the workers run, before on_startup hooks are executed.
The example above is overly simplified and one could argue that broker could have been created outside the bootstrap() function, but, as per example at https://faststream.airt.ai/latest/getting-started/lifespan/hooks/#lifespan, it could be legitimate to create it based on environment setting specifying not only the broker host but also the kind of broker to use (nats, kafka, ...).
I currently solve the problem by providing an emptyKafkaBroker at app instanciation (app = FastStream(broker=KafkaBroker())) before overriding it using app.set_broker() in the bootstrap() function.
However, as per the base Application constructor at
For some reason, I need to delay the creation of an app's broker in an
on_startup
hook. This is mainly because my application has a pretty heavy warmup bootstrap that loads a lot of data into memory so I need to delay this bootstrap (which actually includes broker creation) after the parent process forked into worker subprocesses, otherwise the parent process has an unnecessary high memory footprint (that only worker processes need to have).E.g.
For now, this isn't working because of this assert
faststream/faststream/app.py
Line 40 in 9b7c33e
on_startup
hooks are executed.The example above is overly simplified and one could argue that broker could have been created outside the
bootstrap()
function, but, as per example at https://faststream.airt.ai/latest/getting-started/lifespan/hooks/#lifespan, it could be legitimate to create it based on environment setting specifying not only the broker host but also the kind of broker to use (nats, kafka, ...).I currently solve the problem by providing an empty
KafkaBroker
at app instanciation (app = FastStream(broker=KafkaBroker())
) before overriding it usingapp.set_broker()
in thebootstrap()
function.However, as per the base
Application
constructor atfaststream/faststream/_internal/application.py
Lines 48 to 70 in 9b7c33e
Application.set_broker()
faststream/faststream/_internal/application.py
Lines 117 to 120 in 9b7c33e
on_startup
, I would have expected my use case to be legitimate.As a consequence, should this assert be removed, moved after startup hooks execution (at
faststream/faststream/_internal/application.py
Lines 168 to 175 in 9b7c33e
None
atfaststream/faststream/_internal/application.py
Lines 176 to 177 in 9b7c33e
The text was updated successfully, but these errors were encountered: