-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Drain SinkManyEmitterProcessor buffer after cancel #3789
Conversation
@@ -383,10 +383,12 @@ public Object scanUnsafe(Attr key) { | |||
} | |||
|
|||
final void drain() { | |||
if (WIP.getAndIncrement(this) != 0) { | |||
if (WIP.get(this) != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the cases where more than 1 thread calling the drain function, all the threads are incrementing the WIP counter while only one of them proceeds to clear the queue and decrements the counter. This leaves the WIP in unclean state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By separating the atomic operation into two operations you break the exclusive access to the critical section below. Please start with creating JCStress tests if you're considering to spend more time on this. These drain methods are quite a critical piece and ensuring proper lock-free coordination between different actors is essential. Another aspect is performance so that we make as little volatile accesses as we can.
@@ -398,6 +400,7 @@ final void drain() { | |||
boolean empty = q == null || q.isEmpty(); | |||
|
|||
if (checkTerminated(d, empty)) { | |||
WIP.addAndGet(this, -missed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, these returns also leave the WIP in an unclean state.
3ad4a0b
to
251d759
Compare
@@ -383,10 +383,12 @@ public Object scanUnsafe(Attr key) { | |||
} | |||
|
|||
final void drain() { | |||
if (WIP.getAndIncrement(this) != 0) { | |||
if (WIP.get(this) != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By separating the atomic operation into two operations you break the exclusive access to the critical section below. Please start with creating JCStress tests if you're considering to spend more time on this. These drain methods are quite a critical piece and ensuring proper lock-free coordination between different actors is essential. Another aspect is performance so that we make as little volatile accesses as we can.
251d759
to
d7e3d7d
Compare
@chemicL I addressed your comments and also added a JCStress test. But I am not sure whether the test case is correct. |
Hey. Thank you for the effort. Your PR has allowed to unravel some hidden complexities and is pushing the research into this space forward. Unfortunately, I'm not able to accept this in the current form. There are multiple issues here and it would require quite an effort on my part to help correct these. The issues I can see are:
In general, I think the |
The internal buffer/queue in SinkManyEmitterProcessor will be drained after all the subscriptions are canceled.
As explained here the queue/buffer in SinkManyEmitterProcessor is not drained properly after the last subscriber canceled the subscription. This was happening due to the WIP marker is left in an unclean state. This PR fixes the issue by updating the WIP marker. I am not sure if this is the ideal approach though.
Fixes #3715