同学们好!截至目前,我们已经完成了Go语言基础部分的学习。为了巩固并加深对所学知识的理解和掌握,我们将在这节课一起完成一个适度规模的阶段性项目——《聊天室》。
首先我们要对这个项目的需求做一番分析,列出需要实现的各个功能点,然后针对这些功能需求给出一个框架性的设计方案。依据设计方案中的具体细节,分别编码实现网络聊天室的客户机和服务器,两个应用程序。
在拿到任何一个项目时,我们首先要做的就是分析和设计。通过需求分析,可以明确我们的产品需要提供哪些功能。最终用户和产品部门所提供的需求通常都是业务性需求,即用业务语言描述用户想要的究竟是一个什么东西。但作为开发部门通常还要在业务需求的基础上做进一步的加工和整理,从开发角度用技术语言复述出用户的需求,得到所谓的技术性需求,为进一步的系统设计做必要的准备。依据所得到的技术性需求,设计人员可以给出具体的实现方案,其中包括系统的总体架构、所选择的技术路线、各模块间的通信协议、每一个操作步骤的执行流程、算法与数据结构设计,以及与数据持久化有关的库表设计、文件格式设计等具体技术细节。在正式编码之前考虑得越充分、越详细、越具体,对后续开发过程中可能遇到的各种问题和风险有一个尽可能全面的预估,后面的实现环节就会越顺利,越少走弯路,少犯错误,整个项目如期交付的机率也就越大。
聊天室作为一款网络应用,首先应该允许用户登入和登出。登入即表示在线,登出即表示离线,这是一切网络应用最起码的功能之一。成功登入系统的用户可以在聊天室中发送消息,所发送的消息不但自己可以看到,聊天室中的其它在线用户也应该可以看到。每个在线用户可能还想查看当前都有哪些用户在线。每个在线用户也可能想修改自己的用户名。对于长时间不发送消息的用户,聊天室会将其自动踢出,即强制其成为离线状态。
根据前面所做的需求分析,我们给出网络聊天室的总体架构。网络聊天室应用分为客户机和服务器两部分。客户机可以是多个,服务器只有一个。聊天室服务器由主线程、接收线程、公共通道、广播线程、用户通道和发送线程等六个功能模块组成。其中接收线程、用户通道和发送线程,针对每个用户都有一个独立的实例,而主线程、公共通道和广播线程则只有一个,为所有用户共享。当然作为一个Go语言应用,这里所说的“线程”并非真正意义上的系统级线程,而是专指goroutine。
主线程负责监听客户机的连接请求,并在接受连接后创建针对该客户机的接收线程,后者负责创建与该客户机相连的发送线程。与每个客户机相连的接收线程,接收来自客户机的消息。如果该消息是一个特殊的请求命令,则执行相应的业务处理,并将处理结果直接返回给发送请求的客户机,否则将该消息写入公共通道。若在给定时间内没有收到来自该客户机的任何消息,则关闭与该客户机的连接,即将其踢出聊天室。广播线程从公共通道读取消息,写入与每个客户机对应的用户通道。与每个客户机相连的发送线程,从特定的用户通道读取消息并发送给特定的客户机。
在这个项目中我们将用到基于套接字的TCP通信。我们会用映射管理用户信息,其中会涉及到对映射元素的遍历与删除。所有的线程我们都会通过goroutine实现,并会借助通道在不同goroutine间传递数据。我们会通过select-case结构规避永久阻塞,并利用定时器,实现超时踢人功能。
在接下来的代码编写阶段,我们会首先构建一个支持多客户机同时连接的TCP并发服务器。然后定义一个表示用户的结构体并将该结构体的实例放入映射,以实现对用户的管理。最后我们会创建一系列的线程和通道,在特定的线程中读写特定的通道,并完成对消息和命令的处理。
下面我们先来实现网络聊天室的客户机程序。
创建一个ChatClient工程:
x1package main
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "net"
8 "os"
9 "strings"
10)
11
12////////////////////////////////////////////////////////////////////////
13
14func main() {
15 conn, err := net.Dial("tcp", "127.0.0.1:8888")
16 if err != nil {
17 fmt.Println("net.Dial错误:", err)
18 return
19 }
20
21 go func() {
22 for {
23 message := make([]byte, 1024)
24 num, err := conn.Read(message)
25 if err != nil {
26 if err != io.EOF {
27 fmt.Println("Conn.Read错误:", err)
28 }
29 os.Exit(0)
30 }
31
32 fmt.Println(">", string(message[:num]))
33 }
34 }()
35
36 for {
37 message := ""
38 reader := bufio.NewReader(os.Stdin)
39 message, _ = reader.ReadString('\n')
40 message = strings.TrimSpace(message)
41
42 _, err := conn.Write([]byte(message))
43 if err != nil {
44 fmt.Println("Conn.Write错误:", err)
45 return
46 }
47 }
48}
客户机一启动即尝试与监听本机8888端口的服务器建立TCP连接。连接成功,即开启一个独立的goroutine,在无限循环中接收来自服务器的消息并打印,直到发生错误或服务器主动关闭了连接,退出当前进程。与此同时main函数所在的goroutine也在一个无限循环中不断从标准输入读取字符串,并通过程序一开始即建立的TCP连接,将其发送给服务器,直到发生错误,退出当前进程。
接下来我们再实现网络聊天室的服务器程序。
创建一个ChatServer工程:
xxxxxxxxxx
241package main
2
3import (
4 "fmt"
5 "net"
6 "strings"
7 "sync"
8 "time"
9)
10
11type User struct {
12 name string
13 addr string
14 conn net.Conn
15 channel chan string
16}
17
18var users = make(map[string]*User)
19
20var lock sync.RWMutex
21
22var common = make(chan string, 10)
23
24////////////////////////////////////////////////////////////////////////
首先在全局域定义几个数据类型和变量。User是一个代表用户的结构体类型,其中包含四个成员,字符串类型的name表示用户名、字符串类型的addr表示用户所在客户机的IP地址和端口号、连接对象conn用于和用户所在客户机通信、字符串通道类型的channel表示专属于该用户的用户通道。users是一个映射类型的用户列表,其键为每个用户所在客户机的IP地址和端口号,值为表示该用户的User对象的地址。lock是一个互斥锁,用于在多线程场景下访问用户列表提供并发保护。字符串通道common表示公共通道。
xxxxxxxxxx
271func main() {
2 fmt.Println("主线程> 启动监听")
3
4 listener, err := net.Listen("tcp", ":8888")
5 if err != nil {
6 fmt.Println("主线程> net.Listen错误:", err)
7 return
8 }
9
10 go broadcast()
11
12 for {
13 fmt.Println("主线程> 等待连接")
14
15 conn, err := listener.Accept()
16 if err != nil {
17 fmt.Println("主线程> Listener.Accept错误:", err)
18 return
19 }
20
21 fmt.Println("主线程> 接受连接")
22
23 go receiver(conn)
24 }
25}
26
27////////////////////////////////////////////////////////////////////////
在main函数中,首先调用net包的Listen函数启动侦听,通过其参数指定传输层协议“tcp”和侦听地址及端口“:8888”,不写地址表示侦听任意地址。该函数成功会返回一个侦听器对象。通过go关键字开启广播线程,执行broadcast函数。在一个无限循环中调用侦听器对象的Accept方法,等待来自客户机的连接请求,并在接受连接后返回可用于后续通信的连接对象。再次通过go关键字开启接收线程,执行receiver函数,将侦听器对象Accept方法返回的连接对象作为参数传递给receiver函数。而后,主线程在循环中继续调用侦听器对象的Accept方法等待新的客户机连接。
xxxxxxxxxx
121func broadcast() {
2 for {
3 message := <-common
4 fmt.Println("广播线程>", message)
5
6 lock.Lock()
7 for _, user := range users {
8 user.channel <- message
9 }
10 lock.Unlock()
11 }
12}
在代表广播线程执行过程的broadcast函数里,只有一个无限循环,不断地从公共通道读取消息,并在遍历用户列表的过程中,向每个用户的用户通道写入该消息。注意,针对用户列表的遍历循环必须放在锁区内部,以防止多线程间的并发访问冲突。
xxxxxxxxxx
561func receiver(conn net.Conn) {
2 fmt.Println("接收线程> 开始接收")
3
4 addr := conn.RemoteAddr().String()
5
6 user := User{
7 name: addr,
8 addr: addr,
9 conn: conn,
10 channel: make(chan string, 10),
11 }
12
13 lock.Lock()
14 users[addr] = &user
15 lock.Unlock()
16
17 common <- fmt.Sprintf("%s@%s: 进入聊天室", user.name, user.addr)
18
19 go sender(user)
20
21 for {
22 fmt.Println("接收线程> 接收消息")
23
24 read := make(chan bool)
25 go timeout(user, read)
26
27 buf := make([]byte, 1024)
28 num, err := conn.Read(buf)
29 if err != nil {
30 fmt.Println("接收线程> Conn.Read错误:", err)
31 break
32 }
33
34 read <- true
35
36 fmt.Printf("接收线程> 成功接收: %d字节\n", num)
37 message := string(buf[:num])
38 fmt.Println("接收线程> 消息内容:", message)
39
40 if doMessage(user, message) == false {
41 break
42 }
43 }
44
45 lock.Lock()
46 delete(users, addr)
47 lock.Unlock()
48 close(user.channel)
49 conn.Close()
50
51 common <- fmt.Sprintf("%s@%s: 离开聊天室", user.name, user.addr)
52
53 fmt.Println("接收线程> 结束接收")
54}
55
56////////////////////////////////////////////////////////////////////////
在代表接收线程执行过程的receiver函数里,首先通过连接对象的RemoteAddr方法获得客户机的IP地址和端口号,然后创建并初始化一个User类型的与该客户机对应的用户对象,以客户机的IP地址和端口号作为其默认用户名,同时为该用户创建用户通道。在锁区内部,以该用户所在客户机的IP地址和端口号为键,用户对象的地址为值,加入用户列表。通过go关键字开启该用户的发送线程,执行sender函数,传入用户对象。紧接着流程进入一个无限循环,不断地通过连接对象的Read方法接收来自客户机的消息,并通过doMessage函数处理该消息。另外,在接收每条消息之前,还通过go关键字开启了一个超时线程,执行timeout函数,并传入一个布尔型通道read。如果超过一定时间仍没有接收到任何消息,timeout函数将通过关闭连接的方法令连接对象的Read方法返回错误,跳出无限循环,结束接收线程。如果在给定时间内成功接收到消息,则会在Read方法返回成功后向read通道写入true,timeout函数直接返回,接收线程继续执行。doMessage函数用于处理消息,该函数返回false表示不需要再继续接收消息,通过break语句跳出无限循环。接收线程结束意味着该用户已经或即将离开聊天室,这时在锁区内将与该用户对应的键值对从用户列表中删除,同时关闭该用户的用户通道和TCP连接。
xxxxxxxxxx
221func sender(user User) {
2 fmt.Println("发送线程> 开始发送")
3
4 for {
5 message, ok := <-user.channel
6 if !ok {
7 fmt.Println("发送线程> 通道关闭")
8 break
9 }
10 fmt.Println("发送线程> 消息内容:", message)
11
12 fmt.Println("发送线程> 发送消息")
13
14 num, err := user.conn.Write([]byte(message))
15 if err != nil {
16 fmt.Println("发送线程> Conn.Write错误:", err)
17 }
18 fmt.Printf("发送线程> 成功发送: %d字节\n", num)
19 }
20
21 fmt.Println("发送线程> 结束发送")
22}
在代表发送线程执行过程的sender函数里,只有一个无限循环,不断地从参数用户的用户通道读取消息,并通过参数用户连接对象的Write方法,将消息发送给该用户所在的客户机。请注意,我们从参数用户的用户通道读取消息的同时,还得到一个布尔型标志ok,表示该用户的用户通道当前是处于打开状态还是关闭状态。在receiver函数返回前,即接收线程即将结束时,我们关闭了参数用户的用户通道,这时得到的ok标志将为false,通过break语句跳出无限循环,发送线程结束。
xxxxxxxxxx
101func timeout(user User, read chan bool) {
2 select {
3 case <-read:
4 return
5
6 case <-time.After(time.Minute):
7 common <- fmt.Sprintf("%s@%s: 赶走潜水员", user.name, user.addr)
8 user.conn.Close()
9 }
10}
在代表超时线程执行过程的timeout函数里,包含了一个针对多路通道的select-case结构。如果在一分钟内没有接收到任何来自客户机的消息,流程将阻塞于select。如果在此期间receiver函数中对连接对象Read方法的调用返回了实际接收到的消息字节数,则会向read通道写入true,这时流程将执行第一个case分支,通过return语句返回并结束超时线程。如果在一分钟到达时依然没有接收到任何来自客户机的消息,time包After方法所返回的通道将被写入数据,这时流程将执行第二个case分支,在宣告“赶走潜水员”后,通过关闭参数用户的TCP连接将其踢出聊天室。receiver函数中对该用户连接对象Read方法的调用将返回错误,跳出无限循环,从用户列表中删除该用户,关闭该用户的用户通道,结束接收线程。sender函数中的无限循环因该用户的用户通道被关闭而退出,结束发送线程。至此,包括用户对象、用户通道、接收线程、发送线程等在内的一切与被踢用户有关的资源已被全部销毁殆尽。
xxxxxxxxxx
91func doMessage(user User, message string) bool {
2 if message[0] == '!' {
3 return doCommand(user, message)
4 }
5
6 return doChat(user, message)
7}
8
9////////////////////////////////////////////////////////////////////////
doMessage函数用于处理消息。从客户机接收到的消息分为两种,以叹号开头的是命令消息,由doCommand函数处理,其它为普通消息,由doChat函数处理。这两个函数的返回值也是doMessage函数的返回值。
xxxxxxxxxx
291func doCommand(requester User, command string) bool {
2 argv := strings.Fields(command)
3 argc := len(argv)
4
5 switch argv[0] {
6 case "!rename":
7 if argc == 2 {
8 return doRename(requester, argv[1])
9 }
10
11 case "!who":
12 if argc == 1 {
13 return doWho(requester)
14 }
15
16 case "!quit":
17 if argc == 1 {
18 return doQuit(requester)
19 }
20 }
21
22 message := "无效命令: "
23 for _, arg := range argv {
24 message += fmt.Sprintf("%s ", arg)
25 }
26 requester.channel <- message
27
28 return true
29}
doCommand函数用于处理命令消息。通过strings包的Fields函数,以空白字符为分隔符,将从参数传入的命令消息字符串拆分为若干子字符串。在下面的switch-case结构中,根据第一个子字符串所代表的命令调用相应的命令处理函数。“!rename”表示修改用户名,命令消息中应该还有第二个子字符串,即新用户名,将其作为参数传给doRename命令处理函数,并返回该函数的返回值。类似地,“!who”和“!quit”分别表示查询在线用户和登出聊天室,对应的命令处理函数分别为doWho和doQuit。对于无法处理的命令,则向命令请求者用户的用户通道写入错误提示信息。
xxxxxxxxxx
61func doChat(user User, message string) bool {
2 common <- fmt.Sprintf("%s@%s: %s", user.name, user.addr, message)
3 return true
4}
5
6////////////////////////////////////////////////////////////////////////
doChat函数用于处理普通消息。将消息发送者用户的用户名、IP地址和端口号,以及消息内容格式化为一个字符串,写入公共通道。
xxxxxxxxxx
121func doRename(requester User, newName string) bool {
2 fmt.Printf("接收线程> 处理%s@%s的更名命令\n",
3 requester.name, requester.addr)
4
5 lock.Lock()
6 oldName := users[requester.addr].name
7 users[requester.addr].name = newName
8 lock.Unlock()
9 requester.channel <- fmt.Sprintf("[%s]->[%s]", oldName, newName)
10
11 return true
12}
doRename函数用于处理更名命令。在锁区内获取命令请求者用户的原用户名,同时更新为新用户名。将命令处理结果写入命令请求者用户的用户通道。
xxxxxxxxxx
141func doWho(requester User) bool {
2 fmt.Printf("接收线程> 处理%s@%s的查询命令\n",
3 requester.name, requester.addr)
4
5 message := ""
6 lock.Lock()
7 for _, user := range users {
8 message += fmt.Sprintf("[%s]", user.name)
9 }
10 lock.Unlock()
11 requester.channel <- message
12
13 return true
14}
doWho函数用于处理查询命令。在锁区内遍历用户列表,将每一个在线用户的用户名用一对方括号括起后拼接成一个完整的字符串。将命令处理结果写入命令请求者用户的用户通道。
xxxxxxxxxx
61func doQuit(requester User) bool {
2 fmt.Printf("接收线程> 处理%s@%s的退出命令\n",
3 requester.name, requester.addr)
4
5 return false
6}
doQuit函数用于处理退出命令。该函数非常简单,直接返回false即可。该函数的返回值就是doCommand函数的返回值,同时也是doMessage函数的返回值。doMessage函数返回false将导致命令请求者用户的接收线程,从receiver函数的无限循环中跳出,并在删除用户对象、关闭用户通道和TCP连接后结束运行。而该用户的发送线程,也会因用户通道被关闭而从sender函数的无限循环中跳出并结束运行。至此,包括用户对象、用户通道、接收线程、发送线程等在内的一切与命令请求者用户有关的资源已被全部销毁殆尽。
至此,我们已经完成了《聊天室》项目的全部编码实现。大家可以根据我们在分析设计阶段列出的项目需求,进行完整的功能性验证。相信大家已经注意到了,Go语言的映射并不是线程安全的。当同时存在多个goroutine意图访问同一个映射对象时,其中只要有一个goroutine执行修改性操作,所有goroutine都必须将对映射对象的访问置于一个由互斥锁构成的锁区之内,以避免潜在的并发访问冲突。
谢谢大家,我们下节课再见!