Skip to content

Commit

Permalink
fix(effects): concurrent outdated abort handling
Browse files Browse the repository at this point in the history
  • Loading branch information
artalar committed Jul 16, 2024
1 parent 3c1585c commit 23f177e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 76 deletions.
33 changes: 18 additions & 15 deletions packages/effects/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ test('await transaction', async () => {
})

test('withAbortableSchedule', async () => {
const asyncAction = <I extends any[], O>(
cb: Fn<[Ctx, ...I], O>,
name: string,
): Action<I, O> =>
const asyncAction = <I extends any[], O>(cb: Fn<[Ctx, ...I], O>, name: string): Action<I, O> =>
action((ctx, ...a) => cb(withAbortableSchedule(ctx), ...a), name)

const track = mockFn()
Expand Down Expand Up @@ -156,8 +153,8 @@ test('concurrent', async () => {
try {
await ctx.schedule(noop)
results.push(count)
} catch (error) {
results.push(error)
} catch (error: any) {
results.push(error?.name)
}
}),
)
Expand All @@ -166,16 +163,23 @@ test('concurrent', async () => {
countAtom(ctx, 1)
countAtom(ctx, 2)
await sleep()
assert.is(results.length, 2)
assert.is(results[0]?.name, 'AbortError')
assert.is(results[1], 2)
assert.equal(results, ['AbortError', 2])

const anAtom = atom(null)
onConnect(anAtom, (ctx) => countAtom(ctx, 3))
ctx.subscribeTrack(anAtom).unsubscribe()
const anAtom1 = atom(null)
onConnect(anAtom1, (ctx) => countAtom(ctx, 3))
ctx.subscribeTrack(anAtom1).unsubscribe()
await sleep()
assert.is(results.length, 3)
assert.is(results.at(-1).name, 'AbortError')
assert.equal(results, ['AbortError', 2, 'AbortError'])

const anAtom2 = atom(null)
onConnect(anAtom2, async (ctx) => {
await null
countAtom(ctx, 4)
})
ctx.subscribeTrack(anAtom2).unsubscribe()
await sleep()
// there was `ReferenceError: Cannot access 'controller' before initialization` previously
assert.equal(results, ['AbortError', 2, 'AbortError', 'AbortError'])
})

test('spawn', async () => {
Expand Down Expand Up @@ -252,7 +256,6 @@ test('reaction parameters usage', async () => {
assert.is(track.calls.length, 1)
assert.is(track.lastInput(), '1a')


a(ctx, 'b')
someReaction(ctx, '2')
a(ctx, 'c')
Expand Down
82 changes: 21 additions & 61 deletions packages/effects/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@ import {
throwReatomError,
Unsubscribe,
} from '@reatom/core'
import {
AbortError,
isAbort,
merge,
noop,
throwIfAborted,
toAbortError,
} from '@reatom/utils'
import { AbortError, isAbort, merge, noop, throwIfAborted, toAbortError } from '@reatom/utils'

export class CauseContext<T> extends WeakMap<AtomCache, T> {
has(cause: AtomCache): boolean {
Expand All @@ -36,30 +29,22 @@ export class CauseContext<T> extends WeakMap<AtomCache, T> {

export const abortCauseContext = new CauseContext<AbortController | undefined>()

export const getTopController = (
patch: AtomCache & { controller?: AbortController },
): null | AbortController => abortCauseContext.get(patch) ?? null
export const getTopController = (patch: AtomCache & { controller?: AbortController }): null | AbortController =>
abortCauseContext.get(patch) ?? null

/** Handle abort signal from a cause */
export const onCtxAbort = (
ctx: Ctx,
cb: Fn<[AbortError]>,
): undefined | Unsubscribe => {
export const onCtxAbort = (ctx: Ctx, cb: Fn<[AbortError]>): undefined | Unsubscribe => {
const controller = getTopController(ctx.cause)

if (controller) {
const handler = () => cb(toAbortError(controller.signal.reason))
const cleanup = () =>
controller.signal.removeEventListener('abort', handler)
const cleanup = () => controller.signal.removeEventListener('abort', handler)

// TODO schedule
if (controller.signal.aborted) handler()
else {
controller.signal.addEventListener('abort', handler)
ctx.schedule(
() => controller.signal.removeEventListener('abort', handler),
-1,
)
ctx.schedule(() => controller.signal.removeEventListener('abort', handler), -1)
return cleanup
}
}
Expand Down Expand Up @@ -89,15 +74,11 @@ export const __thenReatomed = <T>(
if (!chain) {
const promise = origin.then(
(value: any) => {
ctx.get((read, actualize) =>
chain!.then.forEach((cb) => cb(value, read, actualize)),
)
ctx.get((read, actualize) => chain!.then.forEach((cb) => cb(value, read, actualize)))
return value
},
(error: any) => {
ctx.get((read, actualize) =>
chain!.catch.forEach((cb) => cb(error, read, actualize)),
)
ctx.get((read, actualize) => chain!.catch.forEach((cb) => cb(error, read, actualize)))
// prevent Uncaught DOMException for aborts
if (isAbort(error)) promise.catch(noop)
throw error
Expand Down Expand Up @@ -166,8 +147,7 @@ type skip = typeof skip
export const take = <T extends Atom, Res = AtomReturn<T>>(
ctx: Ctx,
anAtom: T,
mapper: Fn<[Ctx, Awaited<AtomReturn<T>>, skip], Res | skip> = (ctx, v: any) =>
v,
mapper: Fn<[Ctx, Awaited<AtomReturn<T>>, skip], Res | skip> = (ctx, v: any) => v,
): Promise<Awaited<Res>> => {
const cleanups: Array<Fn> = []
return new Promise<Awaited<Res>>((res: Fn, rej) => {
Expand All @@ -190,11 +170,7 @@ export const take = <T extends Atom, Res = AtomReturn<T>>(
}).finally(() => cleanups.forEach((cb) => cb()))
}

export const takeNested = <I extends any[]>(
ctx: Ctx,
cb: Fn<[Ctx, ...I]>,
...params: I
): Promise<void> =>
export const takeNested = <I extends any[]>(ctx: Ctx, cb: Fn<[Ctx, ...I]>, ...params: I): Promise<void> =>
new Promise<void>((resolve, reject) => {
const unabort = onCtxAbort(ctx, reject)

Expand Down Expand Up @@ -236,27 +212,16 @@ export const takeNested = <I extends any[]>(
})

const _isCausedBy = (cause: AtomCache, proto: AtomProto): boolean =>
cause.cause !== null &&
(cause.cause.proto === proto || isCausedBy(cause.cause, proto))

export const isCausedBy = (
caused: Ctx | AtomCache,
by: Atom | AtomProto,
): boolean =>
_isCausedBy(
'subscribe' in caused ? caused.cause : caused,
'__reatom' in by ? by.__reatom : by,
)
cause.cause !== null && (cause.cause.proto === proto || isCausedBy(cause.cause, proto))

export const isCausedBy = (caused: Ctx | AtomCache, by: Atom | AtomProto): boolean =>
_isCausedBy('subscribe' in caused ? caused.cause : caused, '__reatom' in by ? by.__reatom : by)

export const withAbortableSchedule = <T extends Ctx>(ctx: T): T => {
const { schedule } = ctx

return merge(ctx, {
schedule(
this: Ctx,
cb: Parameters<typeof schedule>[0],
step: Parameters<typeof schedule>[1] = 1,
) {
schedule(this: Ctx, cb: Parameters<typeof schedule>[0], step: Parameters<typeof schedule>[1] = 1) {
if (step < 1) return schedule.call(this, cb, step)

let resolve: Fn
Expand Down Expand Up @@ -298,18 +263,12 @@ export const withAbortableSchedule = <T extends Ctx>(ctx: T): T => {
})
}

export const concurrentControllers = new WeakMap<
Fn,
Atom<null | AbortController>
>()
export const concurrentControllers = new WeakMap<Fn, Atom<null | AbortController>>()
export const concurrent: {
<T extends Fn<[CtxSpy, ...any[]]>>(fn: T): T
<T extends Fn<[Ctx, ...any[]]>>(fn: T): T
} = (fn: Fn<[Ctx, ...any[]]>) => {
const abortControllerAtom = atom<null | AbortController>(
null,
`${__count('_concurrent')}.abortControllerAtom`,
)
const abortControllerAtom = atom<null | AbortController>(null, `${__count('_concurrent')}.abortControllerAtom`)

const result = Object.assign(
(ctx: Ctx, ...a: any[]) => {
Expand All @@ -319,19 +278,21 @@ export const concurrent: {
// TODO it is better to do it sync?
if (prevController) ctx.schedule(() => prevController.abort(abort))

const controller = abortControllerAtom(ctx, new AbortController())!
const unabort = onCtxAbort(ctx, (error) => {
// prevent unhandled error for abort
if (res instanceof Promise) res.catch(noop)
controller.abort(error)
})
const controller = abortControllerAtom(ctx, new AbortController())!
ctx = { ...ctx, cause: { ...ctx.cause } }
abortCauseContext.set(ctx.cause, controller)

var res = fn(withAbortableSchedule(ctx), ...a)
if (res instanceof Promise) {
res = res.finally(() => {
unabort?.()
// prevent uncaught rejection for the abort
if (controller.signal.aborted) res.catch(noop)
throwIfAborted(controller)
})
// prevent uncaught rejection for the abort
Expand Down Expand Up @@ -367,8 +328,7 @@ export interface ReactionAtom<Payload> extends Atom<Payload> {
unsubscribe: Unsubscribe
}

export interface Reaction<Params extends any[], Payload>
extends Action<Params, ReactionAtom<Payload>> {}
export interface Reaction<Params extends any[], Payload> extends Action<Params, ReactionAtom<Payload>> {}

export const reaction = <Params extends any[], Payload>(
cb: (ctx: CtxSpy, ...params: Params) => Payload,
Expand Down

0 comments on commit 23f177e

Please sign in to comment.