-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpipeline.go
85 lines (63 loc) · 1.51 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package pipeline
import "errors"
type Message interface {
}
type Stage interface {
Process(stage Message) ([]Message, error)
}
type PipelineOpts struct {
Concurrency int
}
type Pipeline interface {
AddPipe(pipe Stage, opt *PipelineOpts)
Start() error
Stop() error
Input() chan<- Message
Output() <-chan Message
}
var ErrConcurrentPipelineEmpty = errors.New("concurrent pipeline empty")
type ConcurrentPipeline struct {
workerGroups []StageWorker
}
func (c *ConcurrentPipeline) AddPipe(pipe Stage, opt *PipelineOpts) {
if opt == nil {
opt = &PipelineOpts{Concurrency: 1}
}
var input = make(chan Message, 10)
var output = make(chan Message, 10)
for _, i := range c.workerGroups {
input = i.Output()
}
worker := NewWorkerGroup(opt.Concurrency, pipe, input, output)
c.workerGroups = append(c.workerGroups, worker)
}
func (c *ConcurrentPipeline) Output() <-chan Message {
sz := len(c.workerGroups)
return c.workerGroups[sz-1].Output()
}
func (c *ConcurrentPipeline) Input() chan<- Message {
return c.workerGroups[0].Input()
}
func (c *ConcurrentPipeline) Start() error {
if len(c.workerGroups) == 0 {
return ErrConcurrentPipelineEmpty
}
for i := 0; i < len(c.workerGroups); i++ {
g := c.workerGroups[i]
g.Start()
}
return nil
}
func (c *ConcurrentPipeline) Stop() error {
for _, i := range c.workerGroups {
close(i.Input())
i.WaitStop()
}
sz := len(c.workerGroups)
close(c.workerGroups[sz-1].Output())
return nil
}
func NewConcurrentPipeline() Pipeline {
return &ConcurrentPipeline{
}
}