Note
This is one of 200 standalone projects, maintained as part of the @thi.ng/umbrella monorepo and anti-framework.
🚀 Please help me to work full-time on these projects by sponsoring me on GitHub. Thank you! ❤️
Primitives & operators for Communicating Sequential Processes based on async/await and async iterables.
Important
This package was temporarily deprecated (throughout most of 2023), but meanwhile has been reanimated in the form of a complete rewrite, using a new, more simple and more modern approach afforded by contemporary ES language features (and widespread support for them).
This new/current implementation is in most cases NOT compatible with earlier versions.
References:
The key construct of this package is a read/write channel primitive which can be customized with different buffer implementations to control blocking behaviors and backpressure handling (aka attempting to write faster to a channel than values are being read, essentially a memory management issue). Unbuffered CSP channels are blocking on both the reader and writer side.
The following channel buffer types/behaviors are included (from the thi.ng/buffers package), all accepting a max. capacity and all implementing the IReadWriteBuffer interface required by the channel:
fifo
: First in, first out ring buffer. Writes to the channel will start blocking once the buffer's capacity is reached, otherwise complete immediately. Likewise, channel reads are non-blocking whilst there're more buffered values available. Reads will only block if the buffer is empty.lifo
: Last in, first out. Write behavior is the same as withfifo
, reads are in reverse order (as the name indicates), i.e. the last value written will be the first value read (i.e. stack behavior).sliding
: Sliding window ring buffer. Writes to the channel are never blocking! Whilst the buffer is at full capacity, new writes will first expunge the oldest buffered value (similar to LRU cache behavior). Read behavior is the same as forfifo
.dropping
: Dropping value ring buffer. Writes to the channel are never blocking! Whilst the buffer is at full capacity, new writes will be silently ignored. Read behavior is the same as forfifo
.
As mentioned previously, channels and their read, write and close operations are the key building blocks for CSP.
Mult
for channel multiplexing (aka one-to-many splitting) and dynamic add/removal of subscribersPubSub
for topic-based subscriptions, each topic implemented as aMult
broadcast()
channel()
concat()
consume()
consumeWith()
drain()
fromAsyncIterable()
into()
merge()
mult()
pipe()
pubsub()
select()
timeout()
BETA - possibly breaking changes forthcoming
Search or submit any issues for this package
- @thi.ng/fibers - Process hierarchies & operators for cooperative multitasking
- @thi.ng/rstream - Reactive streams & subscription primitives for constructing dataflow graphs / pipelines
- @thi.ng/transducers-async - Async versions of various highly composable transducers, reducers and iterators
yarn add @thi.ng/csp
ESM import:
import * as csp from "@thi.ng/csp";
Browser ESM import:
<script type="module" src="https://esm.run/@thi.ng/csp"></script>
For Node.js REPL:
const csp = await import("@thi.ng/csp");
Package sizes (brotli'd, pre-treeshake): ESM: 1.78 KB
Note: @thi.ng/api is in most cases a type-only import (not used at runtime)
One project in this repo's /examples directory is using this package:
Screenshot | Description | Live demo | Source |
---|---|---|---|
CSP channel-based event handling, async transducers & reactive UI components | Demo | Source |
import { channel } from "@thi.ng/csp";
// create CSP channel for bi-directional communication
const chan = channel<number>();
// create first async process (ping)
(async () => {
while (true) {
// this op will block until a value becomes available in the channel
const x = await chan.read();
// if the channel was closed meanwhile, read() will deliver `undefined`
if (x === undefined || x > 5) {
console.log("stopping...");
// calling close() is idempotent
// any in-flight writes will still be readable
chan.close();
break;
}
console.log("ping", x);
// this op will also block until the other side is reading the value
await chan.write(x + 1);
}
console.log("ping done");
})();
// create second async process (pong, almost identical to ping)
(async () => {
while (true) {
// wait until value can be read (or channel closed)
const x = await chan.read();
// exit loop if channel closed
if (x === undefined) break;
console.log("pong", x);
// write next value & wait until other side read it
await chan.write(x + 1);
}
console.log("pong done");
})();
// kickoff
chan.write(0);
// ping 0
// pong 1
// ping 2
// pong 3
// ping 4
// pong 5
// stopping...
// ping done
// pong done
import { channel, consumeWith, into, pubsub } from "@thi.ng/csp";
// input channel (optional)
const src = channel<string>({ id: "users" });
// publisher with a topic function
// (topic here is the first character of each received string)
const pub = pubsub<string>(src, (x) => x[0]);
// create topic subscriptions (channel & debug consumer)
// under the hood each topic is a Mult (multiplexed channel)
// subscription channels are automatically named:
// `<src-id>-<topic>-tap<tapid>` (see below)
for (let i of "abc") {
consumeWith(pub.subscribeTopic(i), (x, ch) => console.log(ch.id, x));
}
// start processing by feeding an iterable of names
await into(src, ["alice", "bert", "bella", "charlie", "arthur"]);
// users-a-tap0 alice
// users-b-tap1 bert
// users-b-tap1 bella
// users-c-tap2 charlie
// users-a-tap0 arthur
// pubsubs & mults are closed recursively once we close the input channel
src.close();
If this project contributes to an academic publication, please cite it as:
@misc{thing-csp,
title = "@thi.ng/csp",
author = "Karsten Schmidt",
note = "https://thi.ng/csp",
year = 2016
}
© 2016 - 2025 Karsten Schmidt // Apache License 2.0