From a949f8dfdbc7c6836e9a299c2f4a3d602bab222c Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Wed, 10 Jul 2024 23:57:35 +0530 Subject: [PATCH 1/8] #208 - add update priority logic, add promise started event --- source/index.ts | 18 ++++++++++++++---- source/priority-queue.ts | 6 ++++++ source/queue.ts | 1 + 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/source/index.ts b/source/index.ts index faf2c9e..eb58d27 100644 --- a/source/index.ts +++ b/source/index.ts @@ -8,7 +8,7 @@ type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error'; +type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'started'; /** Promise queue with concurrency control. @@ -43,6 +43,8 @@ export default class PQueue(function_: Task, options: {throwOnTimeout: true} & Exclude): Promise; - async add(function_: Task, options?: Partial): Promise; - async add(function_: Task, options: Partial = {}): Promise { + async add(function_: Task, options: { throwOnTimeout: true } & Exclude, uid?: string): Promise; + async add(function_: Task, options?: Partial, uid?: string): Promise; + async add(function_: Task, options: Partial = {}, uid?: string): Promise { + // incase uid is not defined + uid = (this.#uidAssigner++).toString(); options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, + uid, ...options, }; @@ -258,6 +267,7 @@ export default class PQueue { @@ -32,6 +33,11 @@ export default class PriorityQueue implements Queue) => element.uid === uid); + item && (item.priority = priority || ((item.priority || 0) + 1)); + } + dequeue(): RunFunction | undefined { const item = this.#queue.shift(); return item?.run; diff --git a/source/queue.ts b/source/queue.ts index be3316c..cd325ee 100644 --- a/source/queue.ts +++ b/source/queue.ts @@ -5,4 +5,5 @@ export type Queue = { filter: (options: Readonly>) => Element[]; dequeue: () => Element | undefined; enqueue: (run: Element, options?: Partial) => void; + prioritize: (uid: string, priority: number) => void; }; From 4cdcbf72b3966adfd66c368c9ed91fdf8f169974 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Thu, 11 Jul 2024 00:05:19 +0530 Subject: [PATCH 2/8] #208 - update for uidAssigner --- source/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/index.ts b/source/index.ts index eb58d27..aecf859 100644 --- a/source/index.ts +++ b/source/index.ts @@ -43,6 +43,7 @@ export default class PQueue(function_: Task, options?: Partial, uid?: string): Promise; async add(function_: Task, options: Partial = {}, uid?: string): Promise { // incase uid is not defined - uid = (this.#uidAssigner++).toString(); + !uid && (uid = (this.#uidAssigner++).toString()); options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, From 8383aacb2a984f7f2850caf73963c446ab1515d2 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Thu, 11 Jul 2024 23:56:17 +0530 Subject: [PATCH 3/8] #208 - add test cases, add linting fixes --- source/index.ts | 15 +++++++------ source/priority-queue.ts | 20 +++++++++++++++-- test/test.ts | 46 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/source/index.ts b/source/index.ts index aecf859..bb2071d 100644 --- a/source/index.ts +++ b/source/index.ts @@ -43,8 +43,8 @@ export default class PQueue(function_: Task, options: { throwOnTimeout: true } & Exclude, uid?: string): Promise; + async add(function_: Task, options: {throwOnTimeout: true} & Exclude, uid?: string): Promise; async add(function_: Task, options?: Partial, uid?: string): Promise; async add(function_: Task, options: Partial = {}, uid?: string): Promise { - // incase uid is not defined - !uid && (uid = (this.#uidAssigner++).toString()); + // Incase uid is not defined + if (uid === undefined) { + uid = (this.#uidAssigner++).toString(); + } + options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, @@ -268,7 +271,7 @@ export default class PQueue) => element.uid === uid); - item && (item.priority = priority || ((item.priority || 0) + 1)); + const queueIndex: number = this.#queue.findIndex((element: Readonly) => element.uid === uid); + const [item] = this.#queue.splice(queueIndex, 1); + if (item === undefined) { + return; + } + + item.priority = priority ?? ((item.priority ?? 0) + 1); + if (this.size && this.#queue[this.size - 1]!.priority! >= priority!) { + this.#queue.push(item); + return; + } + + const index = lowerBound( + this.#queue, item, + (a: Readonly, b: Readonly) => b.priority! - a.priority!, + ); + + this.#queue.splice(index, 0, item); } dequeue(): RunFunction | undefined { diff --git a/test/test.ts b/test/test.ts index d511eae..ad54b68 100644 --- a/test/test.ts +++ b/test/test.ts @@ -6,7 +6,7 @@ import inRange from 'in-range'; import timeSpan from 'time-span'; import randomInt from 'random-int'; import pDefer from 'p-defer'; -import PQueue, {AbortError} from '../source/index.js'; +import PQueue from '../source/index.js'; const fixture = Symbol('fixture'); @@ -1134,3 +1134,47 @@ test('aborting multiple jobs at the same time', async t => { await t.throwsAsync(task2, {instanceOf: DOMException}); t.like(queue, {size: 0, pending: 0}); }); + +test('.setPriority() - execute a promise before planned', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {}, 'snail'); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {}, 'duck'); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {}, 'turtle'); + queue.setPriority('turtle', 1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🐢', '🦆']); +}); + +test('started event to check when promise function is called', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {}, '🐌'); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {}, '🦆'); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {}, '🐢'); + queue.on('started', uid => { + if (uid === '🦆') { + t.deepEqual(result, ['🐌', '🐢']); + } + }); + queue.setPriority('🐢', 1); + await queue.onIdle(); +}); From a60bfc19a32c1c5a9601e6b64ba54c1ee5f578ef Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Fri, 12 Jul 2024 09:33:03 +0530 Subject: [PATCH 4/8] #208 - review fixes, remove started event changes --- source/index.ts | 22 ++++++++++------------ source/options.ts | 1 + source/priority-queue.ts | 7 +++---- source/queue.ts | 2 +- test/test.ts | 32 ++++---------------------------- 5 files changed, 19 insertions(+), 45 deletions(-) diff --git a/source/index.ts b/source/index.ts index bb2071d..3459f98 100644 --- a/source/index.ts +++ b/source/index.ts @@ -8,7 +8,7 @@ type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'started'; +type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error'; /** Promise queue with concurrency control. @@ -44,7 +44,7 @@ export default class PQueue(function_: Task, options: {throwOnTimeout: true} & Exclude, uid?: string): Promise; - async add(function_: Task, options?: Partial, uid?: string): Promise; - async add(function_: Task, options: Partial = {}, uid?: string): Promise { - // Incase uid is not defined - if (uid === undefined) { - uid = (this.#uidAssigner++).toString(); + async add(function_: Task, options: {throwOnTimeout: true} & Exclude): Promise; + async add(function_: Task, options?: Partial): Promise; + async add(function_: Task, options: Partial = {}): Promise { + // Incase id is not defined + if (options.id === undefined) { + options.id = (this.#idAssigner++).toString(); } options = { timeout: this.timeout, throwOnTimeout: this.#throwOnTimeout, - uid, ...options, }; @@ -271,7 +270,6 @@ export default class PQueue { @@ -33,9 +32,9 @@ export default class PriorityQueue implements Queue) => element.uid === uid); - const [item] = this.#queue.splice(queueIndex, 1); + setPriority(id: string, priority?: number) { + const existingIndex: number = this.#queue.findIndex((element: Readonly) => element.id === id); + const [item] = this.#queue.splice(existingIndex, 1); if (item === undefined) { return; } diff --git a/source/queue.ts b/source/queue.ts index cd325ee..459d29b 100644 --- a/source/queue.ts +++ b/source/queue.ts @@ -5,5 +5,5 @@ export type Queue = { filter: (options: Readonly>) => Element[]; dequeue: () => Element | undefined; enqueue: (run: Element, options?: Partial) => void; - prioritize: (uid: string, priority: number) => void; + setPriority: (id: string, priority: number) => void; }; diff --git a/test/test.ts b/test/test.ts index ad54b68..da828e0 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1141,40 +1141,16 @@ test('.setPriority() - execute a promise before planned', async t => { queue.add(async () => { await delay(400); result.push('🐌'); - }, {}, 'snail'); + }, {id: '🐌'}); queue.add(async () => { await delay(400); result.push('🦆'); - }, {}, 'duck'); + }, {id: '🦆'}); queue.add(async () => { await delay(400); result.push('🐢'); - }, {}, 'turtle'); - queue.setPriority('turtle', 1); - await queue.onIdle(); - t.deepEqual(result, ['🐌', '🐢', '🦆']); -}); - -test('started event to check when promise function is called', async t => { - const result: string[] = []; - const queue = new PQueue({concurrency: 1}); - queue.add(async () => { - await delay(400); - result.push('🐌'); - }, {}, '🐌'); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {}, '🦆'); - queue.add(async () => { - await delay(400); - result.push('🐢'); - }, {}, '🐢'); - queue.on('started', uid => { - if (uid === '🦆') { - t.deepEqual(result, ['🐌', '🐢']); - } - }); + }, {id: '🐢'}); queue.setPriority('🐢', 1); await queue.onIdle(); + t.deepEqual(result, ['🐌', '🐢', '🦆']); }); From cafe886eada509b899c39bcb7e09846fffb1bce3 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Thu, 18 Jul 2024 18:55:26 +0530 Subject: [PATCH 5/8] #208 - review fixes - add test case, update setPriority method --- source/priority-queue.ts | 13 +++++++++---- test/test.ts | 20 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/source/priority-queue.ts b/source/priority-queue.ts index f1115aa..86d1baf 100644 --- a/source/priority-queue.ts +++ b/source/priority-queue.ts @@ -17,10 +17,11 @@ export default class PriorityQueue implements Queue= options.priority!) { + if (this.size === 0 || this.#queue[this.size - 1]!.priority! >= options.priority!) { this.#queue.push(element); return; } @@ -32,15 +33,19 @@ export default class PriorityQueue implements Queue) => element.id === id); + if (existingIndex === -1) { + throw new Error('Invalid Index - No promise function of specified id available in the queue.'); + } + const [item] = this.#queue.splice(existingIndex, 1); if (item === undefined) { return; } - item.priority = priority ?? ((item.priority ?? 0) + 1); - if (this.size && this.#queue[this.size - 1]!.priority! >= priority!) { + item.priority = priority; + if (this.size === 0 || this.#queue[this.size - 1]!.priority! >= priority) { this.#queue.push(item); return; } diff --git a/test/test.ts b/test/test.ts index da828e0..b88efab 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1154,3 +1154,23 @@ test('.setPriority() - execute a promise before planned', async t => { await queue.onIdle(); t.deepEqual(result, ['🐌', '🐢', '🦆']); }); + +test.failing('.setPriority() - with invalid "id"', async t => { + const result: string[] = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => { + await delay(400); + result.push('🐌'); + }, {id: '🐌'}); + queue.add(async () => { + await delay(400); + result.push('🦆'); + }, {id: '🦆'}); + queue.add(async () => { + await delay(400); + result.push('🐢'); + }, {id: '🐢'}); + queue.setPriority('⚡️', 1); + await queue.onIdle(); + t.deepEqual(result, ['🐌', '🐢', '🦆']); +}); From 6a370fe71e9b911ae3b8406df3b3e7d8eaf84b36 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Fri, 19 Jul 2024 07:23:22 +0530 Subject: [PATCH 6/8] #208 - throw error if undefined item in queue while setting priority --- source/priority-queue.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/priority-queue.ts b/source/priority-queue.ts index 86d1baf..9fa824d 100644 --- a/source/priority-queue.ts +++ b/source/priority-queue.ts @@ -35,13 +35,13 @@ export default class PriorityQueue implements Queue) => element.id === id); - if (existingIndex === -1) { + if (existingIndex === -1 ) { throw new Error('Invalid Index - No promise function of specified id available in the queue.'); } const [item] = this.#queue.splice(existingIndex, 1); if (item === undefined) { - return; + throw new Error('Undefined Item - No promise function of specified id available in the queue.'); } item.priority = priority; From ccb7a5398619216e6610d21f22ef8066edea2276 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Fri, 19 Jul 2024 07:25:27 +0530 Subject: [PATCH 7/8] #208 - update spacing --- source/priority-queue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/priority-queue.ts b/source/priority-queue.ts index 9fa824d..dd427ea 100644 --- a/source/priority-queue.ts +++ b/source/priority-queue.ts @@ -35,7 +35,7 @@ export default class PriorityQueue implements Queue) => element.id === id); - if (existingIndex === -1 ) { + if (existingIndex === -1) { throw new Error('Invalid Index - No promise function of specified id available in the queue.'); } From 686f0d8e21efc1cae0b4c855d6a1eab58258d436 Mon Sep 17 00:00:00 2001 From: Raishav Hanspal Date: Fri, 19 Jul 2024 07:38:37 +0530 Subject: [PATCH 8/8] #208 - use enqueue function in setPriority --- source/priority-queue.ts | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/source/priority-queue.ts b/source/priority-queue.ts index dd427ea..cc46b88 100644 --- a/source/priority-queue.ts +++ b/source/priority-queue.ts @@ -44,18 +44,7 @@ export default class PriorityQueue implements Queue= priority) { - this.#queue.push(item); - return; - } - - const index = lowerBound( - this.#queue, item, - (a: Readonly, b: Readonly) => b.priority! - a.priority!, - ); - - this.#queue.splice(index, 0, item); + this.enqueue(item.run, {priority, id}); } dequeue(): RunFunction | undefined {