From cb926dadd5dacbb44cc5364ffc2e0bcce2d030eb Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 11 Oct 2024 17:01:00 -0700 Subject: [PATCH] Remove cancellation tracking This way, cancelled subscribers do not pile up on Completions. This is important for allocator switching, as their completions get bound to the shutdown completion, and they cancel their subscriptions when they complete. If their nodes cannot be reused, they would pile up over the lifetime of the pool. --- .../stormpot/internal/StackCompletion.java | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/src/main/java/stormpot/internal/StackCompletion.java b/src/main/java/stormpot/internal/StackCompletion.java index 56fdb2e6..ef164765 100644 --- a/src/main/java/stormpot/internal/StackCompletion.java +++ b/src/main/java/stormpot/internal/StackCompletion.java @@ -105,7 +105,7 @@ public void complete() { if (obj instanceof Thread th) { LockSupport.unpark(th); } else if (obj instanceof Flow.Subscriber subscriber && - ns.compareAndSetObj(subscriber, new CancelledSubscription(subscriber))) { + ns.compareAndSetObj(subscriber, null)) { subscriber.onComplete(); } ns = next; @@ -124,12 +124,10 @@ public void subscribe(Flow.Subscriber subscriber) { while (ns != END && ns != DONE) { Node next = loadNext(ns); Object obj = ns.obj; - if (obj instanceof CancelledSubscription cs && cs.subscriber == subscriber) { - if (ns.compareAndSetObj(cs, subscriber)) { - subscriber.onSubscribe(ns); - if (isCompleted() && ns.compareAndSetObj(subscriber, new CancelledSubscription(subscriber))) { - subscriber.onComplete(); - } + if (obj == null && ns.compareAndSetObj(null, subscriber)) { + subscriber.onSubscribe(ns); + if (isCompleted() && ns.compareAndSetObj(subscriber, null)) { + subscriber.onComplete(); } return; } @@ -322,26 +320,18 @@ public void request(long n) { @SuppressWarnings("unchecked") @Override public void cancel() { - // todo we should CAS obj to null, and void accumulating cancelled subscribers Flow.Subscriber subscriber; - CancelledSubscription cancelled; do { Object curr = obj; if (curr instanceof Flow.Subscriber) { subscriber = (Flow.Subscriber) curr; - cancelled = new CancelledSubscription(subscriber); - } else if (curr instanceof CancelledSubscription) { - return; // Already cancelled } else { - throw new IllegalStateException("Not a subscription node"); + return; } - } while (!compareAndSetObj(subscriber, cancelled)); + } while (!compareAndSetObj(subscriber, null)); } } - private record CancelledSubscription(Flow.Subscriber subscriber) { - } - /** * A callback that will be notified when threads block on a given completion. */