Go語言的net庫通過以下機制實現非阻塞網絡I/O,同時保持代碼的簡潔性:
-
異步系統調用封裝
底層對socket(SO_NONBLOCK)自動啟用非阻塞模式,但開發者的調用接口仍表現為同步形式。當執行讀寫操作時,若內核緩沖區未就緒,立即返回EWOULDBLOCK錯誤。 -
事件驅動架構
內部通過runtime.netpoll組件集成OS特定多路復用器:- Linux使用
epoll - macOS使用
kqueue - Windows使用
IOCP
該組件持續監聽網絡事件,形成高效的I/O就緒隊列。
- Linux使用
-
協程調度整合
當Goroutine發起網絡請求時:
- 運行時將Goroutine掛起,將其與對應socket綁定
- 當前線程(M)立即解綁,轉去執行其他待處理的Goroutine
- 內核通知I/O就緒后,調度器優先喚醒等待此事件的Goroutine
- 零回調的同步編程模型
盡管底層采用非阻塞實現,開發者仍可編寫直觀的同步代碼:
以Read為例子,函數代碼如下:
// 位置: net/net.go
type conn struct {
fd *netFD
}
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// 位置: net/fd_posix.go
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}
// 位置: internal/poll/fd_unix.go
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// ...省略部分代碼
}
func (fd *FD) Read(p []byte) (int, error) {
// ...省略部分代碼
// 從Sysfd緩沖區讀取數據寫入goroutine緩沖區p
// 忽略中斷信號 EINTER
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
// 位置: internal/poll/fd_poll_runtime.go
type pollDesc struct {
runtimeCtx uintptr
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...省略部分代碼
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
/ returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// poller持有的goroutine
// rg 是read goroutine
// wg 是write goroutine
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 把goroutine置為等待狀態
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) {
return true
}
if gpp.CompareAndSwap(0, pdWait) {
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 當前goroutine被喚醒
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}