diff --git a/spec.bs b/spec.bs index 38b265a..6db9875 100644 --- a/spec.bs +++ b/spec.bs @@ -743,7 +743,76 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w
The forEach(|callback|, |options|) method steps are: - 1. TODO: Spec this and use |callback| and |options|. + 1. Let |p| [=a new promise=]. + + 1. Let |visitor callback controller| be a [=new=] {{AbortController}}. + + 1. Let |internal options| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is the + result of [=creating a dependent abort signal=] from the list + «|visitor callback controller|'s [=AbortController/signal=], |options|'s + {{SubscribeOptions/signal}} if non-null», using {{AbortSignal}}, and the [=current realm=]. + +
+

Many trivial [=internal observers=] act as pass-throughs, and do not control the + subscription to the {{Observable}} that they represent; that is, their [=internal + observer/error steps=] and [=internal observer/complete steps=] are called when the + subscription is terminated, and their [=internal observer/next steps=] simply pass some + version of the given value along the chain.

+ +

For this operator, however, the below |observer|'s [=internal observer/next steps=] are + responsible for actually aborting the underlying subscription to [=this=], in the event + that |callback| throws an exception. In that case, the {{SubscribeOptions}}'s + {{SubscribeOptions/signal}} we pass through to "Subscribe to an Observable", needs to be a [=creating a + dependent abort signal|dependent signal=] derived from |options|'s + {{SubscribeOptions/signal}}, **and** the {{AbortSignal}} of an {{AbortController}} that the + [=internal observer/next steps=] below has access to, and can [=AbortController/signal + abort=] when needed. +

+ + 1. If |internal options|'s {{SubscribeOptions/signal}} is [=AbortSignal/aborted=], then: + + 1. [=Reject=] |p| with |internal options|'s {{SubscribeOptions/signal}}'s + [=AbortSignal/abort reason=]. + + 1. Return |p|. + + 1. [=AbortSignal/add|Add the following abort algorithm=] to |internal options|'s + {{SubscribeOptions/signal}}: + + 1. [=Reject=] |p| with |internal options|'s {{SubscribeOptions/signal}}'s [=AbortSignal/abort + reason=]. + + Note: The fact that rejection of |p| is tied to |internal options|'s + {{SubscribeOptions/signal}}, and not |options|'s {{SubscribeOptions/signal}} means, that + any [=microtasks=] [=queue a microtask|queued=] during the firing of |options|'s + {{SubscribeOptions/signal}}'s {{AbortSignal/abort}} event will run before |p|'s + rejection handler runs. + + 1. Let |idx| be an {{unsigned long long}}, initially 0. + + 1. Let |observer| be a new [=internal observer=], initialized as follows: + + : [=internal observer/next steps=] + :: + 1. [=Invoke=] |callback| with the passed in value, and |idx|. + + If an exception |E| was thrown, then + [=reject=] |p| with |E|, and [=AbortController/signal abort=] |visitor callback + controller| with |E|. + + 1. Increment |idx|. + + : [=internal observer/error steps=] + :: [=Reject=] |p| with the passed in error. + + : [=internal observer/complete steps=] + :: [=Resolve=] |p| with {{undefined}}. + + 1. Subscribe to [=this=] given |observer| + and |internal options|. + + 1. Return |p|.