同学们好!在这节课里,我们将一起学习Go语言对并发编程的支持。事实上,Go语言之所以被称为现代化的编程语言,很大程度上就是因为它在语言层面已经内置了调度算法和上下文切换机制。Go语言程序的开发者不再需要自己处理有关进程、线程、协程之类的细节,语言本身已经处理得非常好。我们的任务只是专注于每个执行过程中的业务逻辑。
在学习并发编程之前,我们需要先辨析两个极易混淆的概念,并发与并行。所谓并发,是指一个行为主体在同一段时间处理多个事件,按时间片在不同事件间轮流切换。而所谓并行,则是指多个行为主体在同一段时间处理多个事件,每个行为主体只专注于一个事件。我们这里所关注的显然是前者。有关并行的问题其实是一个分布式的概念,我们将在其它课程中为大家介绍。
身为Go程序员的你可以忘记如何创建进程,如何开启线程,以及如何处理有关协程的细枝末节。你的技能栏里只有一个技能——goroutine。在Go语言中任何有关并发的问题,只需一个goroutine就足够了,如此的简单粗暴。当然事情也许不象你想象的那么简单,goroutine与goroutine之间可能还需要数据通信。这就要用到与通道有关的操作。什么是无缓冲通道?什么是有缓冲通道?如何遍历一个通道?这对于正确地使用通道而言都是至关重要的问题。
使用通道有哪些要点需要注意?对同一个通道多读多写时会发生什么?如何判断一个通道是否关闭?如何让一个通道中的数据只能单向流动?以及如何同时读写多路通道?这些有关通道的高级话题也是本节课程需要关注的重点。
所谓goroutine,其实就是一个执行过程,或者说是代码的执行序列。它与操作系统提供的进程、线程,或其它第三方库提供的所谓协程一样,都是为解决并发问题而被广泛使用的工具。但与其它并发工具不同的是,goroutine仅由Go语言运行时提供,它与处理器的硬件架构无关,也不依赖于操作系统或任何框架,完全平台中立。
goroutine被称为轻量级线程,或曰轻程,是比系统级线程更轻量级的执行单元。运行一个goroutine大约只需要4~5K字节的内存资源,这比动辄数百K甚至上兆字节的系统级线程要少得多。一个进程最多只允许几十个系统级线程同时运行,而能同时运行的goroutine则可多达数千个之多。goroutine是Go语言原生支持的并发机制,不需要依赖任何库。在调用任何函数时,只要在前面加上一个关键字go,该函数即在独立的goroutine中执行。多个goroutine和主线程一起公平竞争处理机资源。
下面我们通过一个Goroutine工程,实际体验一下基于goroutine的并发编程:
x1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 go routine('-', 100)
10 routine('+', 500)
11}
12
13func routine(c rune, d time.Duration) {
14 for {
15 fmt.Printf("%c", c)
16 time.Sleep(d * time.Millisecond)
17 }
18}
在这段代码中定义了一个函数routine,接收两个参数,需要打印的字符c和以毫秒为单位的间隔时间d。函数中包含了一个无限循环,每隔d毫秒即打印一个c字符。在main函数中,通过关键字go,创建了一个goroutine,每隔100毫秒打印一个减号。与此同时在main函数所代表的goroutine中,每隔500毫秒打印一个加号。两个goroutine并发地运行。从程序的运行输出我们可清楚地看到,加号和减号是混杂在一起被打印输出的。这就印证了并发运行的效果。
上面代码中的goroutine永远不会终止,除非我们手动终止了包含它们的进程。现实世界的goroutine往往需要在某种特定条件下能够终止运行。
事实上我们有三种从goroutine中优雅退出的方式。从goroutine函数中返回会令goroutine终止。在goroutine函数或被其调用的函数中调用runtime包的Goexit函数会令goroutine终止。在任何地方调用os包的Exit函数会令进程终止,当然隶属于该进程的所有goroutine也就随之终止了。
下面我们创建一个名为Goexit的工程,分别针对上述三种退出goroutine的方法做一个测试。首先我们测试从函数返回的效果:
xxxxxxxxxx
281package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 fmt.Println("进入main函数")
10 go foo()
11 time.Sleep(1 * time.Second)
12 fmt.Println("main函数返回")
13}
14
15func foo() {
16 fmt.Println("进入foo函数")
17 bar()
18 fmt.Println("foo函数提前返回")
19 return
20 fmt.Println("foo函数返回")
21}
22
23func bar() {
24 fmt.Println("进入bar函数")
25 fmt.Println("bar函数提前返回")
26 return
27 fmt.Println("bar函数返回")
28}
在这段代码的main函数中,通过关键字go,创建了一个goroutine,在该goroutine中执行foo函数的代码。foo函数又调用了bar函数。foo函数中的return语句令该函数提前返回,同时终止了其所代表的goroutine。bar函数中也有return语句,但它仅仅是令该函数提前返回,并不会终止其所在的goroutine,因此“foo函数提前返回”会被打印出来。由此可见,只有从goroutine函数中返回才会令goroutine终止,而从被goroutine函数调用的其它函数中返回,则仅仅是返回,goroutine仍会继续运行。下面我们再测试一下runtime包的Goexit函数:
xxxxxxxxxx
131...
2
3func foo() {
4 fmt.Println("进入foo函数")
5 bar()
6 fmt.Println("foo函数返回")
7}
8
9func bar() {
10 fmt.Println("进入bar函数")
11 runtime.Goexit()
12 fmt.Println("bar函数返回")
13}
在这段代码中,我们直接在被goroutine函数foo调用的bar函数中调用了runtime包的Goexit函数。bar函数所在的goroutine戛然而止。无论“bar函数返回”还是“foo函数返回”都不会被打印。最后我们测试一下os包的Exit函数:
xxxxxxxxxx
71...
2
3func bar() {
4 fmt.Println("进入bar函数")
5 os.Exit(0)
6 fmt.Println("bar函数返回")
7}
从程序的输出我们看到,无论“bar函数返回”还是“foo函数返回”甚至“main函数返回”,都没有被打印出来。os包的Exit函数直接终止了进程,当然也就终止了该进程中的所有goroutine。流程上位于该调用之后的语句都不会被执行。
如果我们需要的仅仅是一个独立运行的执行过程,到目前为止我们所掌握的知识已经足够了。但实际的情况可能会更复杂一些。因应不同应用的需要,多个goroutine之间有时可能还需要交换数据。这就要用到通道,而通道中最基本的就是无缓冲通道。
多个goroutine之间可以借助通道交换数据。创建通道与创建切片和映射一样,也是通过make函数实现的。只是其参数为一个由chan关键字和通道中数据的类型组成的通道类型。读写通道需要借助一个特殊的运算符“<-”。将数据写入通道的表达式形如“通道 <- 数据”,而从通道中读取数据则只需接收表达式“<-通道”的值。
下面的Channel工程演示了创建和读写无缓冲通道的基本方法:
xxxxxxxxxx
221package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan int)
10
11 go func() {
12 for {
13 fmt.Printf("%d ", <-ch)
14 }
15 }()
16
17 for i := 0; i < 10; i++ {
18 ch <- i
19 }
20
21 time.Sleep(time.Second)
22}
这段代码首先通过make函数创建了一个无缓冲通道,通道中的数据为整型。然后以匿名函数的形式创建了一个goroutine。该goroutine在一个无限循环中不断地打印从通道中读取的数据。与此同时,main函数所在的goroutine在一个循环中向通道写入了0~9十个整数,并在一个为期1秒的睡眠中等待另一个goroutine的完成。
我们之前所创建的通道都是无缓冲通道。写入通道的数据必须马上被读走,否则写入动作无法完成。对无缓冲通道的读写操作是两个同步的过程,写的同时必须读,读的同时必须写。参与读写的两个goroutine在通道上形成了执行步调的一致。但在很多时候,这种执行步调的一致性可能恰恰是并发编程需要刻意规避的。理想的并发模型应该是尽可能地保持异步,每个执行过程在绝大多数时候都是独立运行的,彼此之间不需要互相等待,除非在一些特殊的节点,必须建立同步时才短暂地同步片刻,随即恢复异步运行。为此Go语言提供了有缓冲通道。只要缓冲区没满,负责写通道的goroutine只管写,有没有人在读并不重要。同理,只要缓冲区不空,负责读通道的goroutine只管读,有没有人在写同样无所谓。这是一个典型的生产者——消费者模型。
创建有缓冲通道的方法与创建无缓冲通道并无大异,只是在调用make函数时再增加一个表示缓冲区大小的参数。多数情况下,缓冲区应该是非满非空的,读写通道的操作都不会发生阻塞。两个极端情况,一个是缓冲区满,写通道的操作会发生阻塞,直到缓冲区非满为止,另一个是缓冲区空,读通道的操作会发生阻塞,直到缓冲区非空为止。对于值为空即nil的通道,读写操作都会阻塞。
下面的BufferedChannel工程演示了有缓冲通道的用法:
xxxxxxxxxx
571package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan int, 3)
10
11 go func() {
12 ch <- 1
13 fmt.Println("1> ch <- 1")
14 ch <- 2
15 fmt.Println("1> ch <- 2")
16 ch <- 3
17 fmt.Println("1> ch <- 3")
18 ch <- 4
19 fmt.Println("1> ch <- 4")
20 }()
21
22 fmt.Println("0> sleep 3 sec.")
23 time.Sleep(3 * time.Second)
24 fmt.Println("0> sleep done")
25
26 fmt.Println("0>", <-ch, "<- ch")
27
28 go func() {
29 fmt.Println("2>", <-ch, "<- ch")
30 fmt.Println("2>", <-ch, "<- ch")
31 fmt.Println("2>", <-ch, "<- ch")
32 fmt.Println("2>", <-ch, "<- ch")
33 }()
34
35 fmt.Println("0> sleep 3 sec.")
36 time.Sleep(3 * time.Second)
37 fmt.Println("0> sleep done")
38
39 ch <- 5
40 fmt.Println("0> ch <- 5")
41
42 var nc chan int
43
44 go func() {
45 fmt.Println("3> todo")
46 nc <- 6
47 fmt.Println("3> done")
48 }()
49
50 go func() {
51 fmt.Println("4> todo")
52 <-nc
53 fmt.Println("4> done")
54 }()
55
56 time.Sleep(time.Second)
57}
这段代码首先通过make函数创建了一个有缓冲通道,通道中的数据为整型,缓冲区大小为3,即最多可容纳3个整数。而后创建了一个goroutine,有意向通道写入4个整数。结果发现在写入第4个整数时发生阻塞,直到main函数所在goroutine睡满3秒并从通道读取一个数据后,第4个整数才写入成功。接着又创建了一个goroutine,有意从通道读取4个整数。结果发现在读取第4个整数时发生阻塞,直到main函数所在goroutine睡满3秒并向通道写入一个数据后,第4个整数才读取成功。nc是一个被缺省初始化为空值的通道型变量。在后面创建的两个goroutine中,一个向nc写入,一个从nc读取,都发生了阻塞,直到main函数所在goroutine睡满1秒并返回为止。
无论是有缓冲通道还是无缓冲通道,从通道中读取数据都难免发生阻塞。这在某些情况下可能并不是我们所期望的。特别是这种阻塞还有可能会无限期地持续下去。遍历通道为我们提供了一种能够有效避免读阻塞发生的操作策略。
通常情况下,我们会想当然地认为读取和写入是两个对称的操作,一个负责输入数据,而另一个负责输出数据。但在实际的应用开发中,我们会发现真实的情形可能比我们预想的要更加微妙。特别是当读写操作发生阻塞的时候,我们的内心感受是不一样的。比如在写操作的过程中发生了阻塞,我们通常并不介意。因为总共要写多少数据我们是知道的,阻塞不过是片刻的等待,继续写就好了,总有写完的时候。但在读操作的过程中发生阻塞就不一样了,这会引发焦虑。因为我们并不知道只是暂时没有数据可读而发生阻塞,还是会一直这样阻塞下去。解决这类问题的常识性做法是,让负责写入的过程在写完所有的数据之后,显式地执行一个关闭动作,如关闭套接字、关闭通道等。负责读取的过程能够感受到这个关闭动作,不再继续等待不可能到来的数据,避免无休止的阻塞。Go语言的close函数可用于关闭通道。从通道中读取数据的过程可被放在一个基于范围的for循环中。每循环一次即从通道中读取一个数据,直到该通道被负责写入的goroutine通过close函数关闭为止,循环将在此刻退出。
在下面的ForChannel工程里,我们将尝试通过基于范围的for循环,遍历通道中的数据:
xxxxxxxxxx
271package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan int, 3)
10
11 go func() {
12 fmt.Println("Goroutine begin")
13
14 for data := range ch {
15 fmt.Printf("%d ", data)
16 }
17
18 fmt.Println("\nGoroutine end")
19 }()
20
21 for i := 0; i < 10; i++ {
22 ch <- i
23 }
24 close(ch)
25
26 time.Sleep(time.Second)
27}
这段代码首先通过make函数创建了一个有缓冲通道,通道中的数据为整型,缓冲区大小为3,即最多可容纳3个整数。然后以匿名函数的形式创建了一个goroutine。该goroutine在一个基于范围的for循环中打印从通道中读取的数据。与此同时,main函数所在的goroutine在一个循环中向通道写入了0~9十个整数,待写完所有数据后,通过close函数关闭该通道,最后在一个为期1秒的睡眠中等待另一个goroutine的完成。
这里我们有必要对多个goroutine间基于通道的数据通信做一个总结。
对于无缓冲通道,向无人读取的通道写入和从无人写入的通道读取,都会发生阻塞。对于有缓冲通道,向缓冲区满的通道写入和从缓冲区空的通道读取,同样会发生阻塞。没有被make初始化的通道型变量其值为空即nil,读写这样的通道会陷入阻塞。读取已被关闭的通道会得到零值,遍历已被关闭的通道会退出循环。写入或关闭已被关闭的通道将导致崩溃。谁写通道谁负责关闭通道,读通道者不负责关闭。通道操作的理想状态是写多少次读多少次,避免死锁或永久阻塞。
截止目前,我们看到的通道读写都是一个goroutine向通道写入数据,另一个goroutine从通道读取数据。事实上,写入或者读取同一个通道的goroutine可能不只一个,即所谓多写多读通道。
Go语言允许多个goroutine同时向一个通道写入或从一个通道读取数据。同时执行写操作的多个goroutine形成一种竞态关系,谁先谁后没有一定之规。同时执行读操作的多个goroutine同样也是一种竞态关系,先后顺序完全不可预知。
下面我们通过一个名为CompeteChannel的工程,观察一下多个goroutine是如何竞争同一个通道的:
xxxxxxxxxx
351package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan int, 5)
10
11 go writer(ch, 1, 10)
12 go writer(ch, 11, 20)
13 go writer(ch, 21, 30)
14 go writer(ch, 31, 40)
15
16 go reader(ch, 1)
17 go reader(ch, 2)
18 go reader(ch, 3)
19 go reader(ch, 4)
20
21 time.Sleep(5 * time.Second)
22}
23
24func writer(ch chan int, from int, to int) {
25 for i := from; i <= to; i++ {
26 ch <- i
27 time.Sleep(100 * time.Millisecond)
28 }
29}
30
31func reader(ch chan int, indent int) {
32 for {
33 fmt.Printf("%*d\n", indent*8, <-ch)
34 }
35}
writer和reader是两个goroutine函数,一个负责向给定的通道写入指定范围内的整数序列,另一个负责按照指定的缩进打印取自通道的数据。同时开启四个负责向通道写入的goroutine和四个负责从通道读取的goroutine。从程序运行的输出可以看出,每次打印出的结果都不一样。四个goroutine写入的顺序和四个goroutine读取的顺序完全是随机的。这就是典型竞态的直观表现。
如前所述,遍历已被关闭的通道会退出循环。这对于通过基于范围的for循环遍历通道中的数据而言的确非常有效,但读取已被关闭的通道会得到零值,可能并不能令我们感到满意。万一通道并没有被关闭而只是恰好包含了一个零值呢?我们如何区分这两种情况?我们需要有一种能够精确判断通道是否被关闭的方法。
事实上,我们在接收从通道中读取到的数据的同时还可以接收一个布尔型的标志,该标志为true表示通道处于打开状态,接收到的数据有效,即使它是零值也是通道中的零值,否则说明通道已被关闭,所接收到的数据必为零值且应被忽略。
下面的ClosedChannel工程演示了不使用基于范围的for循环,而仅仅依据对通道是否被关闭的判断,遍历通道中数据的方法:
xxxxxxxxxx
311package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan int)
10
11 go func() {
12 fmt.Println("Goroutine begin")
13
14 for {
15 data, ok := <-ch
16 if !ok {
17 break
18 }
19 fmt.Printf("%d ", data)
20 }
21
22 fmt.Println("\nGoroutine end")
23 }()
24
25 for i := 0; i < 10; i++ {
26 ch <- i
27 }
28 close(ch)
29
30 time.Sleep(time.Second)
31}
这段代码首先通过make函数创建了一个无缓冲通道,通道中的数据为整型。然后以匿名函数的形式创建了一个goroutine。该goroutine在一个无限循环中打印从通道中读取的数据。在接收数据的同时还接收了一个布尔型的标志ok。如果ok的值为false,则说明通道已被关闭,通过break语句退出遍历循环。与此同时,main函数所在的goroutine在一个循环中向通道写入了0~9十个整数,待写完所有数据后,通过close函数关闭该通道,最后在一个为期1秒的睡眠中等待另一个goroutine的完成。
通道也可以是单向的,即让一个goroutine只读而另一个goroutine只写该通道。
默认情况下,通道都是双向的。一个goroutine既可以将数据写入某个通道,也可以从同一个通道中读取数据,甚至可以将自己刚刚写入且尚未被其它goroutine读走的数据再读回来。这在特定场景下可能会引发错误,因为一个goroutine无法区分从通道中读出的数据究竟是来自对方还是自己。为了杜绝这种潜在的风险,Go语言提供了所谓单向通道。只需在通道类型中显式指明该通道是只读通道还是只写通道,前者在chan关键字的前面加上“<-”,而后者则在chan关键字的后面加上“<-”。任何试图向只读通道写入数据,或从只写通道读取数据的操作都将引发编译错误。双向通道在任何时候都可以被隐式转换为单向通道,而单向通道却无论如何都无法被转换为双向通道。
下面我们创建一个SimplexChannel工程,体验一下单向通道的基本用法:
xxxxxxxxxx
331package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan int, 5)
10
11 go writer(ch)
12 go reader(ch)
13
14 time.Sleep(time.Second)
15}
16
17func writer(wc chan<- int) {
18 for i := 0; i < 10; i++ {
19 wc <- i
20 }
21 close(wc)
22}
23
24func reader(rc <-chan int) {
25 for {
26 data, ok := <-rc
27 if !ok {
28 break
29 }
30 fmt.Printf("%d ", data)
31 }
32 fmt.Println()
33}
这段代码在完成通道ch的创建以后,开启了两个独立的goroutine,分别执行writer和reader函数。前者以只写通道类型的形参wc接收实参ch,向该通道写入0~9十个整数,而后者则以只读通道类型的形参rc接收实参ch,从该通道读取数据并打印,直到负责写入通道的goroutine关闭该通道为止。我们可以尝试从只写通道wc读取数据,或向只读通道rc写入数据,看看编译器会作何反应?
截止目前,我们所遇到的都是一个goroutine只访问一个通道的情况。那么如果一个goroutine需要同时访问多个通道,又当如何处置?最简单的做法是访问完一个通道再访问另一个通道,直到访问完最后一个通道后再回到第一个通道。这是一个典型的轮询策略。轮询最大的问题是,一旦对某个通道的访问陷入阻塞,其它通道即失去被访问的机会,直到引发阻塞的通道解除了阻塞。这显然不是一种效率优先的解决之道。多路通道所要解决的恰恰就是这类问题。
多路通道允许我们同时监视多个通道是否读写就绪,只读写那些可被读写的通道。从读就绪的通道读取数据和向写就绪的通道写入数据都不会发生阻塞,除非所有通道都处于非就绪状态,这时我们要么进入合理的阻塞等待,要么执行必要的空闲处理。Go语言为多路通道提供了select-case语法结构。select关键字后面跟一对花括号,花括号中包含一到多个case分支和至多一个default分支。每个case关键字后面是一个读取或者写入通道的表达式,以冒号结尾。每个case分支中包含读写通道成功后的操作。default分支不是必需的。如果没有default分支,当所有通道都未就绪时,执行过程会阻塞于select,否则会执行default分支中的代码。通常利用default分支执行一些空闲处理。熟悉UNIX/Linux内核编程的开发者,对基于select、poll或者epoll的所谓I/O多路复用,一定印象深刻。事实上,在逻辑上它们确有相似之处。在select-case结构中,一个case分支即代表一个被监视的通道,多个case分支即表示被同时监视的多个通道。只要其中有一个通道读或者写就绪,即会匹配到相应的分支,在完成实际的读写操作之后,执行分支中的后续处理。如果一个就绪通道都没有,则执行default分支中的空闲处理,或在select上阻塞等待。
下面我们通过一个SelectChannel工程,体验一下基于select-case结构的多路通道编程:
xxxxxxxxxx
351package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 nc, cc := make(chan int), make(chan rune)
10
11 go func() {
12 for {
13 select {
14 case n := <-nc:
15 fmt.Printf("%d ", n)
16 case c := <-cc:
17 fmt.Printf("%c ", c)
18 default:
19 fmt.Printf(". ")
20 time.Sleep(50 * time.Millisecond)
21 }
22 }
23 }()
24
25 ca := []rune{'A', 'B', 'C', 'D', 'E'}
26
27 for i, c := range ca {
28 nc <- i
29 time.Sleep(150 * time.Millisecond)
30 cc <- c
31 time.Sleep(150 * time.Millisecond)
32 }
33
34 time.Sleep(time.Second)
35}
程序一开始,我们就创建了两个无缓冲通道,一个传输整数的通道nc和一个传输字符的通道cc。在之后创建的goroutine中,包含了一个无限循环,每次执行循环体都会通过select-case结构同时监视nc和cc两个通道是否读就绪。哪个通道可读,就读哪个通道,并将读到的整数或字符打印出来。如果两个通道都不可读,则执行default分支中的空闲处理,打印圆点并睡眠50毫秒。main函数所在的goroutine在一个循环中以150毫秒的间隔,交替向nc和cc两个通道分别写入整数和字符序列,最后在为期1秒的睡眠中等待另一个goroutine的完成。
谢谢大家,我们下节课再见!