Skip to content

Commit

Permalink
Limited concurrency (#8)
Browse files Browse the repository at this point in the history
* Adding limited concurrency.

* Adding the ability to control the delay

* Added some tests which verify concurrency limits.

* Getting ready to release.
  • Loading branch information
manchicken authored Nov 11, 2022
1 parent cfdf0a3 commit 1a6ed7f
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 9 deletions.
6 changes: 6 additions & 0 deletions .markdownlint.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"MD013": false,
"MD024": {
"siblings_only": true
}
}
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## 0.0.2 (2022-11-10)

### Added

- Added `limitedConcurrency()` to allow for limiting the number of concurrent promises running at a time.

## 0.0.1 (2022-10-01)

### Added
Expand Down
52 changes: 48 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,35 @@ In large data processing scenarios where you use Promises to help manage asynchr

This can help you reduce code complexity, and can result in a more fault-tolerant data processing program.

## Protected Promises
## Protected Promises with

If you use `Promise.all()` for a bunch of Promises, and one of them rejects, you get a rejection from the `Promise.all()` call. This can lead some to add complexity, and all together it can reduce the reliability of large-ish data processing tasks. A Protected Promise is a Promise which has been protected using the `protectPromise()` function. This adds fault tolerance to the individual Promise so that it can be run in a way which still holds on to the rejection context but which allows the list of Promises to all be run.

## Usage
### Usage

### `
This function will make the `Promise` you pass in to it always resolve and never reject, but in the event of a caught rejection the resolved value will be `PromiseProtectedRejected` which is an `Error` type.

### `coalescePromises()`
In this example, you can see that I have a promise which resolves.

```javascript
// A function which resolves a Promise with a value of 1
const resolvePromise = () => Promise.resolve(1)

// A function which rejects a Promise with an Error
const rejectPromise = () => Promise.reject(new Error('Rejected'))

// Let's protect the resolvePromise
expect(protectPromise(resolvePromise)).resolves.toBe(1)

// Let's protect the rejectPromise
expect(promiseProtectedRejected(protectPromise(rejectPromise))).resolves.toBeInstanceOf(PromiseProtectedRejected)
```

## Coalescing Promises with `coalescePromises()`

When you've got a bunch of promises, and you want to run them all and track which ones fail or succeed, `coalescePromises()` can help you do this.

### Usage

In this example, you can use `coalescePromises()` to track which promises resolved successfully, and which were rejected.

Expand All @@ -42,3 +62,27 @@ const deleteAllFilesInFolder = (folderPath) =>

At the end of this, you'll see the `resolved` entries with all of the `unlink()` results, and you'll also see the `rejected` entries with all of the ones which failed to be deleted.

## Limiting Concurrent Promise Execution with `limitedConcurrency()`

This function provides the ability to run a bunch of promises, but limit how many are running concurrently. This can be useful if you have a bunch of promises which are IO bound, and you want to limit the number of concurrent IO operations. It's also super helpful if you are operating in an environment, such as AWS Lambda or other such serverless implementation, where resource constraints get in the way of your ability to run a bunch of promises concurrently.

### Usage

```javascript
const { limitedConcurrency } = require('@manchicken/promise-regulation')

const allRegions = someFunctionWhichReturnsABunchOfRegions() // Fake function for demonstration purposes

const operationsToRun = allRegions.map((region) => () => someFunctionWhichReturnsAPromise(region))
await limitConcurrency(operationToRun, 5)
```

**Important:** To limit concurrency, promises passed into this function should not yet be executing. In order to make sure that concurrency is in fact limited, the promises you pass to this function must be wrapped in a function. The function should take zero arguments, and should run and return the promise when executed.

## Author and Contributors

- Mike Stemle [(@manchicken)](https://github.com/manchicken)

## License

This project is licensed under the BSD-3 Clause License - see the [LICENSE](LICENSE) file for details.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
module.exports = {
...require('./src/protected-promise'),
...require('./src/coalesce-promises'),
...require('./src/limited-concurrency'),
}
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
},
"name": "@manchicken/promise-regulation",
"description": "This is a simple Node library to help you take a bit more control over your Promises.",
"version": "0.0.1",
"version": "0.0.2",
"main": "index.js",
"scripts": {
"test": "jest"
Expand Down
58 changes: 58 additions & 0 deletions src/__tests__/limited-concurrency.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Test the limitedConcurrency function
const { limitedConcurrency } = require('../limited-concurrency')

// This function waits three seconds and then resolves with `true`
const waitThreeSeconds = () =>
new Promise((resolve) => setTimeout(resolve, 3000, true))

// Let's have things take 30 seconds for the timeout.
jest.setTimeout(30000)

describe('limitedConcurrency', () => {
test('One promise at a time', async () => {
const promises = [
waitThreeSeconds,
waitThreeSeconds,
waitThreeSeconds,
waitThreeSeconds,
waitThreeSeconds,
]
const results = await limitedConcurrency(promises, 3)
expect(results).toHaveLength(5)
expect(promises).toHaveLength(5)
})

test('Verify that limitedConcurrency() actually limits concurrency', async () => {
let totalRunCount = 0

// This function gives us a closure which returns a promise that increments the counter.
const trackablePromise = () => () =>
Promise.resolve()
.then(() => (totalRunCount += 1))
.then(waitThreeSeconds)

// Make a list of five promises
const promises = [
trackablePromise(),
trackablePromise(),
trackablePromise(),
trackablePromise(),
trackablePromise(),
]

// Run those promises with a concurrency limit of 3
const results = limitedConcurrency(promises, 3)

// Wait a brief period of time to let the promises get started
await new Promise((resolve) => setTimeout(resolve, 50))

// We've been less than 3 seconds since we started the promises, so we should have exactly 3 promises running, and the `totalRunCount` should now equal 3.
expect(totalRunCount).toBe(3)

// Wrap up the rest of the promises
await results

// Make sure that all of the promises did eventually run.
expect(totalRunCount).toBe(5)
})
})
44 changes: 44 additions & 0 deletions src/limited-concurrency.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const { resolve } = require('bluebird')
const { coalescePromises } = require('./coalesce-promises')
const { protectPromise } = require('./protected-promise')

const SleepyTimeDelay = 250
const sleepyTime = (sleepDelay) =>
new Promise((resolve) => setTimeout(resolve, sleepDelay, true))

/**
* A function which allows for concurrent running of promises, but limits the number of concurrent promises.
* @param {Array<Function>} promiseFunctions - An array of functions to run, all of which return a promise.
* @param {Number} limit - The number of promises to run concurrently.
* @param {Number} sleepDelay - The number of milliseconds to wait between limit checks.
* @returns {Promise} - A promise which resolves when all promises have resolved.
*/
const limitedConcurrency = async (
promises = [],
limit = 1,
sleepDelay = SleepyTimeDelay,
) => {
let currentlyRunning = 0
const leftToRun = [...promises]
const finished = []

while (leftToRun.length > 0) {
if (currentlyRunning < limit) {
currentlyRunning += 1
finished.push(
protectPromise(leftToRun.shift()()).then((result) => {
currentlyRunning -= 1
return Promise.resolve(result)
}),
)
} else {
await sleepyTime(sleepDelay)
}
}

return Promise.all(finished)
}

module.exports = {
limitedConcurrency,
}

0 comments on commit 1a6ed7f

Please sign in to comment.