Skip to content

Commit

Permalink
Start on fan-in / fan-out material
Browse files Browse the repository at this point in the history
  • Loading branch information
noelwelsh committed Oct 22, 2024
1 parent cef75d6 commit 8d1ab1e
Showing 1 changed file with 52 additions and 36 deletions.
88 changes: 52 additions & 36 deletions docs/src/pages/fan-in-out.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# Fan-in and Fan-out

Let's start with a simple example.
In this section we'll talk about how to connect one upstream stage to several downstream stages, known as **fan-out**, and several upstream stages to one downstream stage, known as **fan-in**.

Let's start with a simple example of fan-out, where we connect one upstream stage to two downstream stages.

```scala mdoc:silent
import cats.effect.IO
import fs2.Stream
import cats.effect.unsafe.implicits.global
import fs2.Stream

val data = Stream(1, 2, 3, 4)
val a = data.evalMap(a => IO.println(s"a: $a"))
val b = data.evalMap(b => IO.println(s"b: $b"))
```

What do you think happens when run the following?
What do you think happens when the following is run?
Take a guess before trying out the code.

```scala mdoc:compile-only
Expand All @@ -33,42 +35,15 @@ Notice that the `a` branch does not run. Did you expect this behaviour?
@:@

The simple reason that we see output from `b` but not from `a` is that we didn't run `a`.
However, to understand why `a` does not run introduces some of the core concepts in FS2.
Remember that a `Stream` is a description.
This is, it's a data structure that describes what we want to happen.
We build a `Stream` from upstream (which produces data) to downstream (which consumes data.)
The downstream data structures have references to those upstream of them.
When we write

```scala
val data = Stream(1, 2, 3, 4)
val a = data.evalMap(a => IO.println(s"a: $a"))
val b = data.evalMap(b => IO.println(s"b: $b"))
```

we create three data structures:

1. `data`, which is the source of data;
2. `a`, which is downstream of `data` and has a reference to `data`; and
3. `b`, which is also downstream of `data` and has a reference to `data`.

Notice that `b` does not have any reference to `a`.
So when we run `b` we *cannot* run `a`, because we have no reference to it.

We'll call each data structure a **stage**.
A stage `x` is downstream of another stage `y` if it consumes data directly from `y`, or it consumes data from a stage that is downstream of `y`.
Likewise we can say that `y` is upstream of `x` if `x` is downstream of `y`.

When we run a stage, we only run it and all stages that are upstream from it.
In the example above, `a` is not upstream of `b` so it does not run when `b` runs.
If we want to make `a` run as well, we need a way to create a stage that is downstream of both.
In other words, we need a way to ["cross the streams"][ghostbusters] by joining `a` and `b`.
This is an example of **fan-in**, where we collect results from multiple streams.
When we call `b.compile.drain.unsafeRunSync()` this creates demand on `b`, which in turns creates demand on stages upstream of `b`.
As `a` is not upstream of `b` it has no demand and hence does not run.
There is a lesson from this: if you're not thinking about how you do fan-in and fan-out you're probably doing it wrong.

Let's start with fan-in.
FS2 provides several ways to express fan-in:

* We can use `merge` if we don't care about what order we get elements from the upstream streams, and both upstream streams have the same type.
* We can use `zip` if we want to pair up elements from two upstream streams.
* We can use `merge` if we don't care about what order we get elements from the upstream streams, and both upstream streams have the same type.
* We can use `either` if we don't care about order (like `merge`) but the two upstream streams have different types.

Write a stream `sink` that uses one of the methods above to express fan-in of `a` and `b`.
Expand Down Expand Up @@ -102,4 +77,45 @@ b: 4
so we can see that both `a` and `b` receive all the values from `data`.
@:@

[ghostbusters]: https://www.youtube.com/watch?v=TEq24JyFWzo

Now let's look at the example of fan-in below.
It's a modification of our previous example where we have an effectful source,
which generates random data.

```scala mdoc:reset:silent
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.Stream
import scala.util.Random

val source = Stream.eval(IO(Random.nextDouble())).repeat
val a = source.evalMap(a => IO.println(s"a: $a"))
val b = source.evalMap(b => IO.println(s"b: $b"))

val sink = a.merge(b)
```

What do you think you'll see when the following is run?
Will `a` and `b` both see the same values?
How many times will each run?

```scala mdoc:compile-only
// We use take(4) to avoid running forever
sink.take(4).compile.drain.unsafeRunSync()
```

@:solution

Here's some example output I saw when I ran the code.

```
b: 0.7958715143801504
a: 0.0859159273103528
b: 0.7907351218379188
a: 0.7568956320150807
b: 0.16689974459747392
a: 0.2762585354975654
```

Notice that `a` and `b` saw different data!
@:@

0 comments on commit 8d1ab1e

Please sign in to comment.