得益于Go语言的精简语法和对并发控制的良好支持,我们可以迅速总结出高效而优雅的常用并发代码范式。
for-select模式 我们可以循环监听多个channel的消息实现多路复用,我们也可以使用for...range遍历
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 func normal_task (idx int ) { fmt.Printf("normal_task(%d)\n" , idx) } func select_pattern (ctx context.Context) { cnt := 0 for { select { case <-ctx.Done(): fmt.Printf("done\n" ) return default : cnt++ cnt %= 100 normal_task(cnt) time.Sleep(1 * time.Second) } } } func select_range_pattern (ctx context.Context, arr []int ) { for _, v := range arr { select { case <-ctx.Done(): fmt.Printf("done\n" ) return default : normal_task(v) time.Sleep(1 * time.Second) } } fmt.Printf("select_range_pattern finished\n" ) } func main () { ctx, cancel := context.WithCancel(context.Background()) wg := sync.WaitGroup{} wg.Add(1 ) go func () { defer wg.Done() select_pattern(ctx) }() time.Sleep(10 * time.Second) cancel() wg.Wait() fmt.Println("-------stage2------------" ) wg = sync.WaitGroup{} wg.Add(1 ) ctx, cancel = context.WithCancel(context.Background()) go func () { defer wg.Done() select_range_pattern(ctx, []int {1 , 2 , 3 , 4 , 5 }) }() time.Sleep(10 * time.Second) cancel() wg.Wait() }
select-timeout模式 我们可以利用time.After给select的监听设置超时时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func time_pattern (ctx context.Context, ch chan int ) { select { case <-ctx.Done(): fmt.Printf("canceled\n" ) return case a := <-ch: fmt.Printf("recv msg: %d\n" , a) return case <-time.After(2 * time.Second): fmt.Printf("timeout 2s\n" ) return } } func main () { wg = sync.WaitGroup{} wg.Add(1 ) ctx, cancel = context.WithCancel(context.Background()) ch := make (chan int ) go func () { fmt.Println("发起了一个请求:" ) time.Sleep(5 * time.Second) ch <- 666 }() go func () { defer wg.Done() time_pattern(ctx, ch) }() wg.Wait() }
pipeline模式 流水线模式或者说管线模式,让多个协程通过管道串联起来分步骤依次处理任务
下面是一种实现方式(把协程池简化为单个协程了
总体流程就是主协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 func perchase (ctx context.Context, inCh chan int , outCh chan int ) { defer fmt.Printf("perchase, done\n" ) defer close (outCh) for { select { case <-ctx.Done(): return case num, ok := <-inCh: if !ok { return } fmt.Printf("inCh: 采购了%d\n" , num) select { case <-ctx.Done(): return case outCh <- num: } } } } func machine (ctx context.Context, inCh chan int , outCh chan int ) { defer fmt.Printf("machine, done\n" ) defer close (outCh) for { select { case <-ctx.Done(): return case num := <-inCh: fmt.Printf("machine:收到加工任务%d\n" , num) for i := 0 ; i < num; i++ { fmt.Printf("machine:正在加工一个产品,编号%d\n" , i) select { case <-ctx.Done(): return case outCh <- 1 : } time.Sleep(1 * time.Second) } fmt.Printf("inCh: 生产了%d\n" , num) } } } func sell (ctx context.Context, inCh chan int ) { defer fmt.Printf("sell, done\n" ) for { select { case <-ctx.Done(): return case num := <-inCh: fmt.Printf("sell:获取到销售任务%d\n" , num) for i := 0 ; i < num; i++ { fmt.Printf("sell:卖出去了一个产品,编号%d\n" , i) time.Sleep(1 * time.Second) } fmt.Printf("本批次任务完成,共计产品%d\n" , num) } } } func test_pipeline () { fmt.Println("----------- test pipeline --------------\n" ) inPerchaseCh := make (chan int ) outPerchaseCh := make (chan int ) outMachineCh := make (chan int ) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} wg.Add(3 ) go func () { defer wg.Done() perchase(ctx, inPerchaseCh, outPerchaseCh) }() go func () { defer wg.Done() machine(ctx, outPerchaseCh, outMachineCh) }() go func () { defer wg.Done() sell(ctx, outMachineCh) }() inPerchaseCh <- 10 time.Sleep(5 * time.Second) time.Sleep(20 * time.Second) close (inPerchaseCh) cancel() cancel() cancel() wg.Wait() }