Skip to content

Commit

Permalink
Remove cancellation tracking
Browse files Browse the repository at this point in the history
This way, cancelled subscribers do not pile up on Completions.

This is important for allocator switching, as their completions get bound to the shutdown completion, and they cancel their subscriptions when they complete.
If their nodes cannot be reused, they would pile up over the lifetime of the pool.
  • Loading branch information
chrisvest committed Oct 12, 2024
1 parent ced730f commit cb926da
Showing 1 changed file with 7 additions and 17 deletions.
24 changes: 7 additions & 17 deletions src/main/java/stormpot/internal/StackCompletion.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void complete() {
if (obj instanceof Thread th) {
LockSupport.unpark(th);
} else if (obj instanceof Flow.Subscriber<?> subscriber &&
ns.compareAndSetObj(subscriber, new CancelledSubscription(subscriber))) {
ns.compareAndSetObj(subscriber, null)) {
subscriber.onComplete();
}
ns = next;
Expand All @@ -124,12 +124,10 @@ public void subscribe(Flow.Subscriber<? super Void> subscriber) {
while (ns != END && ns != DONE) {
Node next = loadNext(ns);
Object obj = ns.obj;
if (obj instanceof CancelledSubscription cs && cs.subscriber == subscriber) {
if (ns.compareAndSetObj(cs, subscriber)) {
subscriber.onSubscribe(ns);
if (isCompleted() && ns.compareAndSetObj(subscriber, new CancelledSubscription(subscriber))) {
subscriber.onComplete();
}
if (obj == null && ns.compareAndSetObj(null, subscriber)) {
subscriber.onSubscribe(ns);
if (isCompleted() && ns.compareAndSetObj(subscriber, null)) {
subscriber.onComplete();
}
return;
}
Expand Down Expand Up @@ -322,26 +320,18 @@ public void request(long n) {
@SuppressWarnings("unchecked")
@Override
public void cancel() {
// todo we should CAS obj to null, and void accumulating cancelled subscribers
Flow.Subscriber<Void> subscriber;
CancelledSubscription cancelled;
do {
Object curr = obj;
if (curr instanceof Flow.Subscriber<?>) {
subscriber = (Flow.Subscriber<Void>) curr;
cancelled = new CancelledSubscription(subscriber);
} else if (curr instanceof CancelledSubscription) {
return; // Already cancelled
} else {
throw new IllegalStateException("Not a subscription node");
return;
}
} while (!compareAndSetObj(subscriber, cancelled));
} while (!compareAndSetObj(subscriber, null));
}
}

private record CancelledSubscription(Flow.Subscriber<?> subscriber) {
}

/**
* A callback that will be notified when threads block on a given completion.
*/
Expand Down

0 comments on commit cb926da

Please sign in to comment.