-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream output from subgraphs while in-progress for sync stream #1749
Conversation
nfcampos
commented
Sep 17, 2024
- previously implemented only for async stream
- previously implemented only for async stream
chunks: list[tuple[float, Any]] = [] | ||
config = {"configurable": {"thread_id": "2"}} | ||
for c in app.stream({"my_key": ""}, config, subgraphs=True): | ||
chunks.append((round(time.perf_counter() - start, 1), c)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to sub from start only to sub from idx 0 afterwards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seemed to make the test less flaky, but still a little flaky I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing times is inherently flawed
def get_waiter() -> asyncio.Task[None]: | ||
nonlocal waiter | ||
if waiter is None or waiter.done(): | ||
return (waiter := loop.submit(stream.wait)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel like this is harder to read than:
waiter = loop.submit(stream.wait)
return waiter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Less fun, but you are right
@@ -1228,6 +1234,22 @@ def output() -> Iterator: | |||
# enable subgraph streaming | |||
if subgraphs: | |||
loop.config["configurable"][CONFIG_KEY_STREAM] = loop.stream | |||
# we are careful to have a single waiter live at any one time | |||
# because on exit we increment semaphore count by exactly 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naive questions: how do we know put() is called exactly once before release? just because we only use the awaiter once at any given point in time? (via the futures dict)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that matters? Basically what we do on exit is put one more thing on the queue, so that if a waiter is waiting, it stops
while futures: | ||
done, _ = concurrent.futures.wait( | ||
while len(futures) > (1 if get_waiter is not None else 0): | ||
done, inflight = concurrent.futures.wait( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naive/stupid question: If we only have the waiter left, why do we need to wait for it? Just bcs. it will check for in-flight in the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we only have the waiter left we don't wait for it, right?