得益于Go语言的精简语法和对并发控制的良好支持,我们可以迅速总结出高效而优雅的常用并发代码范式。

for-select模式

我们可以循环监听多个channel的消息实现多路复用,我们也可以使用for...range遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func normal_task(idx int) {
fmt.Printf("normal_task(%d)\n", idx)
}

// 死循环一直监听
func select_pattern(ctx context.Context) {
cnt := 0
for {
select {
case <-ctx.Done():
fmt.Printf("done\n")
return
default:
cnt++
cnt %= 100
normal_task(cnt)
time.Sleep(1 * time.Second)
}
}
}

// 范围for遍历
func select_range_pattern(ctx context.Context, arr []int) {
for _, v := range arr {
select {
case <-ctx.Done():
fmt.Printf("done\n")
return
default:
normal_task(v)
time.Sleep(1 * time.Second)
}
}
fmt.Printf("select_range_pattern finished\n")
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
select_pattern(ctx)
}()
time.Sleep(10 * time.Second)
cancel()
wg.Wait()

fmt.Println("-------stage2------------")
wg = sync.WaitGroup{}
wg.Add(1)
ctx, cancel = context.WithCancel(context.Background())
go func() {
defer wg.Done()
select_range_pattern(ctx, []int{1, 2, 3, 4, 5})
}()
time.Sleep(10 * time.Second)
cancel()
wg.Wait()
}

select-timeout模式

我们可以利用time.Afterselect的监听设置超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func time_pattern(ctx context.Context, ch chan int) {
select {
case <-ctx.Done():
fmt.Printf("canceled\n")
return
case a := <-ch:
fmt.Printf("recv msg: %d\n", a)
return
case <-time.After(2 * time.Second):
fmt.Printf("timeout 2s\n")
return
}
}

func main() {
wg = sync.WaitGroup{}
wg.Add(1)
ctx, cancel = context.WithCancel(context.Background())
ch := make(chan int)
go func() {
fmt.Println("发起了一个请求:")
time.Sleep(5 * time.Second)
ch <- 666
}()

go func() {
defer wg.Done()
time_pattern(ctx, ch)
}()
wg.Wait()
}

pipeline模式

流水线模式或者说管线模式,让多个协程通过管道串联起来分步骤依次处理任务

下面是一种实现方式(把协程池简化为单个协程了

总体流程就是主协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
func perchase(ctx context.Context, inCh chan int, outCh chan int) {
defer fmt.Printf("perchase, done\n")
defer close(outCh)
for {
select {
case <-ctx.Done():
return
case num, ok := <-inCh:
if !ok {
return
}
fmt.Printf("inCh: 采购了%d\n", num)
select {
case <-ctx.Done():
return
case outCh <- num:
}
}
}

}

func machine(ctx context.Context, inCh chan int, outCh chan int) {
defer fmt.Printf("machine, done\n")
defer close(outCh)

for {
select {
case <-ctx.Done():
return
case num := <-inCh:
fmt.Printf("machine:收到加工任务%d\n", num)
for i := 0; i < num; i++ {
fmt.Printf("machine:正在加工一个产品,编号%d\n", i)
select {
case <-ctx.Done():
return
case outCh <- 1:
}
time.Sleep(1 * time.Second)
}
fmt.Printf("inCh: 生产了%d\n", num)
}
}
}

func sell(ctx context.Context, inCh chan int) {
defer fmt.Printf("sell, done\n")
for {
select {
case <-ctx.Done():
return
case num := <-inCh:
fmt.Printf("sell:获取到销售任务%d\n", num)
for i := 0; i < num; i++ {
fmt.Printf("sell:卖出去了一个产品,编号%d\n", i)
time.Sleep(1 * time.Second)
}
fmt.Printf("本批次任务完成,共计产品%d\n", num)
}

}
}

func test_pipeline() {
fmt.Println("----------- test pipeline --------------\n")
//初始化变量
inPerchaseCh := make(chan int)
outPerchaseCh := make(chan int)
outMachineCh := make(chan int)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(3)

//启动管线
go func() {
defer wg.Done()
perchase(ctx, inPerchaseCh, outPerchaseCh)
}()

go func() {
defer wg.Done()
machine(ctx, outPerchaseCh, outMachineCh)
}()

go func() {
defer wg.Done()
sell(ctx, outMachineCh)
}()

//开始输入
inPerchaseCh <- 10
time.Sleep(5 * time.Second)
//inPerchaseCh <- 20

time.Sleep(20 * time.Second)
//销毁管线
close(inPerchaseCh)
cancel()
cancel()
cancel()
wg.Wait()

}