forked from crazybber/go-pattern-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fan_out.go
154 lines (132 loc) · 3.42 KB
/
fan_out.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package fanout
import "sync"
// Split2 多工作者重复分发,每个worker分发一波数据
// Split2 a channel into n channels that receive messages in a round-robin fashion.
func Split2(ch <-chan int, n int) []chan int {
cs := []chan int{}
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}
var wg sync.WaitGroup
//Distributes one value to channels
distributeToChannels := func(ch <-chan int, cs []chan int) {
// // Close every channel when the execution ends.
//get a target from ch
val, ok := <-ch
if !ok {
return // channel closed
}
//send value to all channels
for _, c := range cs {
c <- val
}
wg.Done()
}
for i := 0; i < n; i++ {
wg.Add(1)
// a worker to distribute message
go distributeToChannels(ch, cs)
}
go func() {
wg.Wait()
for _, c := range cs {
close(c)
}
}()
return cs
}
// Split 重复分发数据为多份
// Split a channel into n channels that receive messages in a round-robin fashion.
func Split(ch <-chan int, n int) []chan int {
//get a queue of chan
//cs := make([]chan int, n) //创建了个chan 数组但是空的
cs := []chan int{}
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}
//Distributes the work in a round robin fashion among the stated number of channels
//until the main channel has been closed. In that case, close all channels and return.
distributeToChannels := func(ch <-chan int, cs []chan int) {
// Close every channel when the execution ends.
defer func() {
for _, c := range cs {
close(c)
}
}()
//this version will block
for {
//get a target from ch
select {
case val, ok := <-ch:
if !ok {
return // channel closed
}
//send value to all channels
for _, c := range cs {
c <- val
}
}
}
}
// a worker to distribute message
go distributeToChannels(ch, cs)
return cs
}
//Split3 随机分发到不同的目的地
//Split3 a channel into n channels that receive messages in a round-robin fashion.
func Split3(ch <-chan int, n int) []chan int {
cs := make([]chan int, 0)
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}
// Distributes the work in a round robin fashion among the stated number
// of channels until the main channel has been closed. In that case, close
// all channels and return.
distributeToChannels := func(ch <-chan int, cs []chan int) {
// Close every channel when the execution ends.
defer func() {
for _, c := range cs {
close(c)
}
}()
for {
for _, c := range cs {
select {
case val, ok := <-ch:
if !ok {
return
}
c <- val
}
}
}
}
go distributeToChannels(ch, cs)
return cs
}
//The first stage, gen, is a function that converts a list of integers to a channel that emits the integers in the list.
// The gen function starts a goroutine that sends the integers on the channel and closes the channel when all
// the values have been sent:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
//The second stage, sq, receives integers from a channel and returns a channel that emits the square of
// each received integer. After the inbound channel is closed and this stage has sent all the values downstream,
// it closes the outbound channel:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}