Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: RouterQueryEngine.aquery with multiple async engines raise Detected nested async error #17349

Open
bkfox opened this issue Dec 22, 2024 · 5 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@bkfox
Copy link

bkfox commented Dec 22, 2024

Bug Description

I'm using RouterQueryEngine with two engines that are using async. When I run aquery from an async function (in pytest context), I've got a RuntimeError: Detected nested async.

After some investigation, it seems that the call of run_async_tasks from the RouterQueryEngine._aquery is the reason why it raises the error:

  • _aquery is an async function, when awaited, it means that the event loop is already running;
  • run_async_tasks is a sync function that will try to get the loop. If the loop is already running (eg. awaited), it raises the error;
  • running the RouterQueryEngine with only one engine works since run_async_tasks is not called.

I unfortunately can't give sample code here since i'm linked to non-discloure agreement.

Version

0.10.50

Steps to Reproduce

  • Assuming two or more QueryEngine (engine_N) with async, in pytest tests
  • Assuming RouterQueryEngine router_engine initialized with query_engine_tools=[engine_1, engine_2]
import pytest

@pytest.mark.asyncio
async test_query(router_engine):
    query = await router_engine.aquery("here is my prompt")

Relevant Logs/Tracbacks

No response

@bkfox bkfox added bug Something isn't working triage Issue needs to be triaged/prioritized labels Dec 22, 2024
Copy link

dosubot bot commented Dec 22, 2024

Hey @bkfox! 👋 I'm here to help you with any bugs, questions, or contributions you have regarding this project. Let's work together to solve this issue!

The RuntimeError: Detected nested async error occurs because the run_async_tasks function is being called within an already running asynchronous context. This function is not designed to handle nested async calls, which leads to the error when using multiple async engines in RouterQueryEngine.aquery [1][2].

To work around this issue, you can try the following:

  1. Use nest_asyncio: Apply nest_asyncio to allow nested event loops. This is particularly useful in environments like Jupyter notebooks or pytest where an event loop might already be running. You can do this by adding import nest_asyncio; nest_asyncio.apply() at the beginning of your test or script.

  2. Restructure Code: Consider restructuring your code to avoid nested async calls. This might involve refactoring how and where you call asynchronous functions to ensure they are not nested within other async contexts.

These approaches should help mitigate the issue until a more permanent fix is implemented in the library.

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Other

@logan-markewich
Copy link
Collaborator

Hmm ill take a look. But the workaround here is just nest_asyncio.apply()

@logan-markewich
Copy link
Collaborator

It should also be an easy pr to use asyncio gather

@bkfox
Copy link
Author

bkfox commented Dec 22, 2024

Easier would be: responses = asyncio.gather(*tasks).

It seems that similar issue exists also in acombine_responses line 79 of the same file:

# this calls:  AsyncStreamingResponse.__str__ => asyncio_run(AsyncStreamingResponse._async_str)
response_strs.append(str(response))

Workaround: response_strs(await response._async_str())

I just tested both, it seems to work (but other issues raised in the flow).

BTW, acombine_responses return an StreamingResponse instead of AsyncStreamingResponse, which seems problematic since: RouterQueryEngine._aquery returns then a StreamingResponse, which is put in a QueryEndEvent (that only accepts: `AsyncStreamingResponse, Response, response_gen or PydanticResponse).

@bkfox
Copy link
Author

bkfox commented Dec 22, 2024

Okay, after lots of tries and tests, it seems that:

  • this happens only in pytest context: when using asgi django-channels async consumer, no problem there;

  • the await/async works if this is in the same async generator function, such as:

    async def get_generator():
        agent = await self.get_query_engine()
        query = await agent.aquery("hi everyone")
        async for chunk in query.response_gen:
             yield chunk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

2 participants