前言
如果想兼顾开发效率,又能保证高并发,协程就是最好的选择。它可以在保持异步化运行机制的同时,用同步方式写代码(goroutine-per-connection),这在实现高并发的同时,缩短了开发周期,是高性能服务未来的发展方向。
对go netpoller 的信心:Getty 只考虑使用 Go 语言原生的网络接口,如果遇到网络性能瓶颈也只会在自身层面寻找优化突破点。
- CPU 和 IO 设备是不同的设备,能并行运行。合理调度程序,充分利用硬件,就能跑出很好的性能;
- Go 的 IO 最最核心的是 io 库,除了定义 interface (Reader/Writer),还实现了通用的函数,比如 Copy 之类的;
- 内存字节数组可以作为 Reader ,Writer ,实现在 bytes 库中,字符串可以作为 Reader,实现在 strings 库中,strings.NewReader;网络句柄可以作为 Reader ,Writer ,实现在 net 库中,net.Conn;文件句柄可以作为 Reader ,Writer ,实现在 os 库中,os.File ;
整体理念
Go语言TCP Socket编程从tcp socket诞生后,网络编程架构模型也几经演化,大致是:“每进程一个连接” –> “每线程一个连接” –> “Non-Block + I/O多路复用(linux epoll/windows iocp/freebsd darwin kqueue/solaris Event Port)”。伴随着模型的演化,服务程序愈加强大,可以支持更多的连接,获得更好的处理性能。不过I/O多路复用也给使用者带来了不小的复杂度,以至于后续出现了许多高性能的I/O多路复用框架, 比如libevent、libev、libuv等,以帮助开发者简化开发复杂性,降低心智负担。不过Go的设计者似乎认为I/O多路复用的这种通过回调机制割裂控制流的方式依旧复杂,且有悖于“一般逻辑”设计,为此Go语言将该“复杂性”隐藏在Runtime中了:Go开发者无需关注socket是否是 non-block的,也无需亲自注册文件描述符的回调,只需在每个连接对应的goroutine中以“block I/O”的方式对待socket处理即可。
The Go netpollerIn Go, all I/O is blocking. The Go ecosystem is built around the idea that you write against a blocking interface and then handle concurrency through goroutines and channels rather than callbacks and futures.An example is the HTTP server in the “net/http” package. Whenever it accepts a connection, it will create a new goroutine to handle all the requests that will happen on that connection. This construct means that the request handler can be written in a very straightforward manner. First do this, then do that. Unfortunately, using the blocking I/O provided by the operating system isn’t suitable for constructing our own blocking I/O interface.
netty 在屏蔽java nio底层细节方面做得不错, 但因为java/jvm的限制,“回调机制割裂控制流”的问题依然无法避免。
原理
- 多路复用 有赖于 linux 的epoll 机制,具体的说 是 epoll_create/epoll_ctl/epoll_wait 三个函数
- epoll 机制包含 两个fd: epfd 和 待读写数据的fd(比如socket)。先创建efpd,然后向epfd 注册fd事件, 之后触发epoll_wait 轮询注册在epfd 的fd 事件发生了没有。
- netpoller 负责将 操作系统 提供的nio 转换为 goroutine 支持的blocking io。为屏蔽linux、windows 等底层nio 接口的差异,netpoller 定义一个 虚拟接口来封装底层接口。
func netpollinit() func netpollopen(fd uintptr, pd *pollDesc) int32 func netpoll(delta int64) gList func netpollBreak() func netpollIsPollDescriptor(fd uintptr) bool
本文 主要讲 netpoller 基于 linux 的epoll 接口 的实现 Go netpoller 网络模型之源码全面解析
Goroutine 让出线程并等待读写事件:当我们在文件描述符上执行读写操作时,如果文件描述符不可读或者不可写,当前 Goroutine 就会执行 runtime.poll_runtime_pollWait
检查 runtime.pollDesc
的状态并调用 runtime.netpollblock
等待文件描述符的可读或者可写。runtime.netpollblock
会使用运行时提供的 runtime.gopark
让出当前线程,将 Goroutine 转换到休眠状态并等待运行时的唤醒。
I/O 多路复用需要使用特定的系统调用/select,java 语言需要显式调用select ,而golang 则通过netpoller 组件将select调用 重新隐藏了。
多路复用等待读写事件的发生并返回:netpoller并不是由runtime中的某一个线程独立运行的,runtime中的调度和系统调用会通过 runtime.netpoll 与网络轮询器交换消息,获取待执行的 Goroutine 列表,恢复Goroutine 为运行状态,并将待执行的 Goroutine 加入运行队列等待处理。
io 前后的GPM
G1 正在 M 上执行,还有 3 个 Goroutine 在 LRQ 上等待执行。网络轮询器空闲着,什么都没干。
G1 想要进行网络系统调用,因此它被移动到网络轮询器并且处理异步网络系统调用。然后,M 可以从LRQ 执行另外的 Goroutine。此时,G2 就被上下文切换到 M 上了。
异步网络系统调用由网络轮询器完成,G1 被移回到 P 的 LRQ 中。一旦 G1 可以在 M 上进行上下文切换,它负责的 Go 相关代码就可以再次执行。
执行网络系统调用不需要额外的 M。网络轮询器使用系统线程,它时刻处理一个有效的事件循环/eventloop。
实现
服务端逻辑
- 在 golang net 的 listen 中,会完成如下几件事:
- 创建 socket 并设置非阻塞,
- bind 绑定并监听本地的一个端口
- 调用 listen 开始监听
- epoll_create 创建一个 epoll 对象
- epoll_etl 将 listen 的 socket 添加到 epoll 中等待连接到来
- Accept主要做了三件事
- 调用 accept 系统调用接收一个连接
- 如果没有连接到达,把当前协程阻塞掉
- 新连接到来的话,将其添加到 epoll 中管理,然后返回
- Write 的大体过程和 Read 是类似的。先是调用 Write 系统调用发送数据,如果内核发送缓存区不足的时候,就把自己先阻塞起来,然后等可写时间发生的时候再继续发送。
当要等待的事件就绪的时候,被阻塞掉的协程又是如何被重新调度的呢?
- Go 语言的运行时会在调度或者系统监控中调用 sysmon,它会调用 netpoll,来不断地调用 epoll_wait 来查看 epoll 对象所管理的文件描述符中哪一个有事件就绪需要被处理了。如果有,就唤醒对应的协程来进行执行。
- 除此之外还有几个地方会唤醒协程,如
- startTheWorldWithSema
- findrunnable 在 schedule 中调用 有top 和 stop 之分。其中 stop 中会导致阻塞。
- pollWork
核心数据结构
connect/accept/read/write 都会 转换为 pollDesc 操作。
调用 internal/poll.pollDesc.init
初始化文件描述符时不止会初始化网络轮询器,会通过 runtime.poll_runtime_pollOpen
函数重置轮询信息 runtime.pollDesc
并调用 runtime.netpollopen
初始化轮询事件。runtime.netpollopen
会调用 epollctl 向全局的轮询文件描述符 epfd 中加入新的轮询事件监听文件描述符的可读和可写状态
轮询 以获取 可执行的Goroutine
这里类似 netty 的eventloop
// src/runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
// 根据传入的 delay 计算 epoll 系统调用需要等待的时间;
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
waitms = 1e9
}
var events [128]epollevent
retry:
// 调用 epollwait 等待可读或者可写事件的发生;
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if waitms > 0 {
return gList{}
}
goto retry
}
// 在循环中依次处理 epollevent 事件;
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
...
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
...
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
netpollready(&toRun, pd, mode)
}
}
return toRun
计算了需要等待的时间之后,runtime.netpoll 会执行 epollwait 等待文件描述符转换成可读或者可写。当 epollwait 函数返回的值大于 0 时,就意味着被监控的文件描述符出现了待处理的事件。处理的事件总共包含两种,一种是调用 runtime.netpollBreak
函数触发的事件,该函数的作用是中断网络轮询器;另一种是其他文件描述符的正常读写事件,对于这些事件,我们会交给 runtime.netpollready
处理
代码实现
//$GOROOT/src/net/tcpsock.go
type TCPConn struct {
conn
}
//$GOROOT/src/net/net.go
type conn struct {
fd *netFD
}
// $GOROOT/src/net/fd_unix.go
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
// $GOROOT/src/internal/poll/fd_unix.go
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
... ...
}
net.conn只是*netFD 的外层包裹结构,最终 Write 和 Read 都会落在其中的fd字段上,netFD 在不同平台上有着不同的实现。
// $GOROOT/src/internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
func (fd *FD) Write(p []byte) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, err
}
var nn int
for {
max := len(p)
if fd.IsStream && max-nn > maxRW {
max = nn + maxRW
}
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}