Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Collector[T] type. #3

Merged
merged 3 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go-presubmit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
go-version: ['stable', 'oldstable']
go-version: ['stable']
os: ['ubuntu-latest']
steps:
- uses: actions/checkout@v4
Expand Down
93 changes: 66 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,38 +229,77 @@ start(task4) // blocks until one of the previous tasks is finished

## Solo Tasks

In some cases it is useful to have a solo background task to handle a separate
concern, such as collecting the results from a batch of concurrent workers.

For example, suppose we have a group of tasks processing a complex search, and
we want to aggregate the results. We don't want to add this task to the same
group as the workers, because it needs to know when the rest of the workers
have all completed. On the other hand, creating another whole group just for
one task is overkill, since we only need one additional goroutine.

To handle such cases, the `Go` constructor is helpful: It manages a single
background goroutine with a separate wait:
In some cases it is useful to start a solo background task to handle an
isolated concern. For example, suppose we want to read a file into a buffer
while we take care of some other work. Rather than creating a whole group for
a single goroutine, we can create a solo task using the `Go` constructor.

```go
results := make(chan int)
s := taskgroup.Go(func() (total int) {
for v := range results {
total += v
}
return
var data []byte
s := taskgroup.Go(func() error {
f, err := os.Open(filePath)
if err != nil {
return err
}
defer f.Close()
data, err = io.ReadAll(f)
return err
})
```

When we're ready, we call `Wait` to receive the result of the task:

g := taskgroup.New(nil)
for i := 0; i < numTasks; i++ {
batch := newBatch(i)
g.Go(func() error {
return search(batch)
})
```go
if err := s.Wait(); err != nil {
log.Fatalf("Loading config failed: %v", err)
}
doThingsWith(data)
```

## Collecting Results

One common use for a background task is accumulating the results from a batch
of concurrent workers. This can be handled by a solo task, as described above,
and it is a common enough case that the library provides a `Collector` type to
handle it specifically.

To use it, pass a function to `NewCollector` to receive the values:

// Wait for the workers to finish, then signal the collector to stop.
g.Wait(); close(results)
```go
var sum int
c := taskgroup.NewCollector(func(v int) { sum += v })
```

Internally, a `Collector` wraps a solo task and a channel to receive results.

The `Task` and `NoError` methods of the collector `c` can then be used to wrap
a function that reports a value. If the function reports an error, that error
is returned from the task as usual. Otherwise, its non-error value is given to
the callback. As in the above example, calls to the function are serialized so
that it is safe to access state without additional locking:

```go
// Report an error, no value for the collector.
g.Go(c.Task(func() (int, error) {
return -1, errors.New("bad")
}))

// Report the value 25 to the collector.
g.Go(c.Task(func() (int, error) {
return 25, nil
}))

// Report a random integer to the collector.
g.Go(c.NoError(func() int { return rand.Intn(1000) })
```

Once all the tasks are done, call `Wait` to stop the collector and wait for it
to finish:

```go
g.Wait() // wait for tasks to finish
c.Wait() // wait for the collector to finish

// Now get the final result.
fmt.Println(s.Wait())
// Now you can access the values accumulated by c.
fmt.Println(sum)
```
53 changes: 53 additions & 0 deletions collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package taskgroup

// A Collector collects values reported by task functions and delivers them to
// an accumulator function.
type Collector[T any] struct {
ch chan<- T
s *Single[error]
}

// NewCollector creates a new collector that delivers task values to the
// specified accumulator function. The collector serializes calls to value, so
// that it is safe for the function to access shared state without a lock. The
// caller must call Wait when the collector is no longer needed, even if it has
// not been used.
func NewCollector[T any](value func(T)) *Collector[T] {
ch := make(chan T)
s := Go(NoError(func() {
for v := range ch {
value(v)
}
}))
return &Collector[T]{ch: ch, s: s}
}

// Wait stops the collector and blocks until it has finished processing.
// It is safe to call Wait multiple times from a single goroutine.
func (c *Collector[T]) Wait() {
if c.ch != nil {
close(c.ch)
c.ch = nil
c.s.Wait()
}
}

// Task returns a Task wrapping a call to f. If f reports an error, that error
// is propagated as the return value of the task; otherwise, the non-error
// value reported by f is passed to the value callback.
func (c *Collector[T]) Task(f func() (T, error)) Task {
return func() error {
v, err := f()
if err != nil {
return err
}
c.ch <- v
return nil
}
}

// NoError returns a Task wrapping a call to f. The resulting task reports a
// nil error for all calls.
func (c *Collector[T]) NoError(f func() T) Task {
return NoError(func() { c.ch <- f() })
}
72 changes: 57 additions & 15 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"log"
"math/rand"
"time"
Expand Down Expand Up @@ -105,15 +106,57 @@ func shuffled(n int) []int {
return vs
}

type slowReader struct {
n int
d time.Duration
}

func (s *slowReader) Read(data []byte) (int, error) {
if s.n == 0 {
return 0, io.EOF
}
time.Sleep(s.d)
nr := min(len(data), s.n)
s.n -= nr
for i := 0; i < nr; i++ {
data[i] = 'x'
}
return nr, nil
}

func ExampleSingle() {
results := make(chan int)

// Start a task to collect the results of a "search" process.
s := taskgroup.Go(func() (total int) {
for v := range results {
total += v
}
return
// A fake reader to simulate a slow file read.
// 2500 bytes and each read takes 50ms.
sr := &slowReader{2500, 50 * time.Millisecond}

var data []byte

// Start a task to read te "file" in the background.
fmt.Println("start")
s := taskgroup.Go(func() error {
var err error
data, err = io.ReadAll(sr)
return err
})

fmt.Println("work, work")
if err := s.Wait(); err != nil {
log.Fatalf("Read failed: %v", err)
}
fmt.Println("done")
fmt.Println(len(data), "bytes")

// Output:
// start
// work, work
// done
// 2500 bytes
}

func ExampleCollector() {
var total int
c := taskgroup.NewCollector[int](func(v int) {
total += v
})

const numTasks = 25
Expand All @@ -123,23 +166,22 @@ func ExampleSingle() {
g := taskgroup.New(nil)
for i := 0; i < numTasks; i++ {
target := i + 1
g.Go(func() error {
g.Go(c.Task(func() (int, error) {
for _, v := range input {
if v == target {
results <- v
break
return v, nil
}
}
return nil
})
return 0, errors.New("not found")
}))
}

// Wait for the searchers to finish, then signal the collector to stop.
g.Wait()
close(results)
c.Wait()

// Now get the final result.
fmt.Println(s.Wait())
fmt.Println(total)
// Output:
// 325
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/creachadair/taskgroup

go 1.20
go 1.21

require github.com/fortytw2/leaktest v1.3.0
31 changes: 31 additions & 0 deletions taskgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,37 @@ func TestSingleResult(t *testing.T) {
}
}

func TestCollector(t *testing.T) {
var sum int
c := taskgroup.NewCollector(func(v int) { sum += v })

vs := shuffled(15)
g := taskgroup.New(nil)

for i, v := range vs {
v := v
if v > 10 {
// This value should not be accumulated.
g.Go(c.Task(func() (int, error) {
return -100, errors.New("don't add this")
}))
} else if i%2 == 0 {
// A function with an error.
g.Go(c.Task(func() (int, error) { return v, nil }))
} else {
// A function without an error.
g.Go(c.NoError(func() int { return v }))
}
}

g.Wait() // wait for tasks to finish
c.Wait() // wait for collector

if want := (10 * 11) / 2; sum != want {
t.Errorf("Final result: got %d, want %d", sum, want)
}
}

type peakValue struct {
μ sync.Mutex
cur, max int
Expand Down