diff --git a/site/concurrency-primitives.md b/site/concurrency-primitives.md index 2193ff468a..9efc080e72 100644 --- a/site/concurrency-primitives.md +++ b/site/concurrency-primitives.md @@ -3,12 +3,31 @@ In the [`fs2.concurrent` package](https://github.com/functional-streams-for-scala/fs2/blob/series/1.0/core/shared/src/main/scala/fs2/concurrent/) you'll find a bunch of useful concurrency primitives built on the concurrency primitives defined in `cats-effect`. For example: - `Topic[F, A]` +- `Channel[F, A]` - `Signal[F, A]` In addition, `Stream` provides functions to interact with cats-effect's `Queue`. ## Simple Examples +### Channel +`Channel` implements a publish-subscribe pattern and is particularly useful where we have multiple `publishers` and a single `subscriber`. In the following example, we have two streams, `pub1` and `pub2` publishing the strings `"Hello"` and `"World"` every `1` and `2` seconds respectively. Additionally, we have `sub`, a subscriber that consumes and prints each element. The three streams are then run in parallel and interrupted after `6` seconds. + +```scala mdoc:silent +import cats.effect._ +import fs2.Stream +import scala.concurrent.duration._ +import cats.effect.unsafe.implicits.global +import fs2.concurrent.Channel + +Channel.unbounded[IO, String].flatMap { channel => + val pub1 = Stream.repeatEval(IO("Hello")).evalMap(channel.send).metered(1.second) + val pub2 = Stream.repeatEval(IO("World")).evalMap(channel.send).metered(2.seconds) + val sub = channel.stream.evalMap(IO.println) + Stream(pub1, pub2, sub).parJoinUnbounded.interruptAfter(6.seconds).compile.drain +}.unsafeRunSync() +``` + ### Topic (based on [Pera Villega](https://perevillega.com/)'s example [here](https://underscore.io/blog/posts/2018/03/20/fs2.html))