简介
channel是Go语言的一大特性,基于channel有很多值得探讨的问题,如
- channel为什么是并发安全的?
- 同步通道和异步通道有啥区别?
- 通道为何会阻塞协程?
- 使用通道导致阻塞的协程是如何解除阻塞的? 要了解本质,需要进源码查看,毕竟源码之下了无秘密。
原理
创建
channel理论上有三种,带缓冲\不带缓冲\nil,写法如下:
1 | // buffered |
追踪make函数,会发现在builtin/builtin.go中仅有一个声明func make(t Type, size …IntegerType) Type。真正的实现可以参考go内置函数make,简单来说在cmd/compile/internal/gc/typecheck.go中有函数typecheck1
1 | // The result of typecheck1 MUST be assigned back to n, e.g. |
最终真正实现位置为runtime/chan.go
1 | func makechan(t *chantype, size int) *hchan { |
从这个函数可以看出,channel的数据结构为hchan
结构
接下来我们看一下channel的数据结构,基于数据结构,可以推测出具体实现。
runtime/chan.go
1 | type hchan struct { |
通过该hchan的数据结构和makechan函数,数据结构里有几个值得说明的数据:
- dataqsiz表示channel的长度,如果未非缓冲队列,则值为0。通过dataqsiz实现环形队列。
- buf存放真正的数据
- sendx和recvx指在环形队列中数据入channel和出channel的位置
- sendq存放向channel发送数据的goroutine队列
- recvq存放等待获取channel数据的goroutine队列
- lock为全局锁
Anwser
通过追查到的代码,我们可以回答最开始提出的几个问题了。
channel为什么是并发安全的?
因为做操作之前,都会先获取全局锁,只有获取成功的才能进行操作,保证了并发安全。
同步通道和异步通道有啥区别?
使用的底层数据结构、操作代码都是一样的,只不过dataqsiz的值不一样,一个为0,一个为正数。
通道为何会阻塞协程?
当通道已经满了,但协程继续往通道里写入,或者通道里没有数据,但是协程从通道里获取数据时,协程会被阻塞。
实现的原理与Golang并发调度的GMP模型强相关。
写入满通道的流程
- 当前goroutine(G1)创建自身的一个引用(sudog),放置到hchan的sendq队列
- 当前goroutine(G1)会调用gopark函数,将当前协程置为waiting状态;
- 将M和G1绑定关系断开;
- scheduler会调度另外一个就绪态的goroutine与M建立绑定关系,然后M 会运行另外一个G。
读取空通道的流程
- 当前goroutine(G2)会创建自身的一个引用(sudog)
- 将代表G2的sudog存入recvq等待队列
- G2会调用gopark函数进入等待状态,让出OS thread,然后G2进入阻塞态
使用通道导致阻塞的协程是如何解除阻塞的?
对于已经满的通道,当有协程G2做读操作时,会解除G1的阻塞,流程为
- G2调用
t:=<-ch
获取一个元素A; - 从hchan的buf里面取出一个元素;
- 从sendq等待队列里面pop一个sudog;
- 将G1要写入的数据复制到buf中A的位置,然后更新buf的sendx和recvx索引值;
- G2调用goready(G1)将G1置为Runable状态,表示G1可以恢复运行;
对于读取空的通道,当有协程G1做写操作时,会解除G2的阻塞,流程为
- 将待写入的消息发送给接收的goroutine G2;
- G1调用goready(G2) 将G2设置成就绪状态,等待调度;
实现
我们来看一下chan的具体实现
读取数据
1 | // chanrecv receives on channel c and writes the received data to ep. |
接收channel的数据的流程如下:
CASE1:前置channel为nil的场景:
- 如果block为非阻塞,直接return;
- 如果block为阻塞,就调用gopark()阻塞当前goroutine,并抛出异常。
前置场景,block为非阻塞,且channel为非缓冲队列且sender等待队列为空 或则 channel为有缓冲队列但是队列里面元素数量为0,且channel未关闭,这个时候直接return;
调用
lock(&c.lock)
锁住channel的全局锁;CASE2:channel已经被关闭且channel缓冲中没有数据了,这时直接返回success和空值;
CASE3:sender队列非空,调用
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int)函数处理:
- channel是非缓冲channel,直接调用recvDirect函数直接从sender recv元素到ep对象,这样就只用复制一次;
- 对于sender队列非空情况下, 有缓冲的channel的缓冲队列一定是满的:
- 1.先取channel缓冲队列的对头元素复制给receiver(也就是ep);
- 2.将sender队列的对头元素里面的数据复制到channel缓冲队列刚刚弹出的元素的位置,这样缓冲队列就不用移动数据了。
- 释放channel的全局锁;
- 调用goready函数标记当前goroutine处于ready,可以运行的状态;
CASE4:sender队列为空,缓冲队列非空,直接取队列元素,移动头索引;
CASE5:sender队列为空、缓冲队列也没有元素且不阻塞协程,直接return (false,false);
CASE6:sender队列为空且channel的缓存队列为空,将goroutine加入recv队列,并阻塞。
写入数据
1 | /* |
向channel写入数据主要流程如下:
- CASE1:当channel为空或者未初始化,如果block表示阻塞那么向其中发送数据将会永久阻塞;如果block表示非阻塞就会直接return;
- CASE2:前置场景,block为非阻塞,且channel没有关闭(已关闭的channel不能写入数据)且(channel为非缓冲队列且receiver等待队列为空)或则( channel为有缓冲队列但是队列已满),这个时候直接return;
- 调用
lock(&c.lock)
锁住channel的全局锁; - CASE3:不能向已经关闭的channel send数据,会导致panic。
- CASE4:如果channel上的recv队列非空,则跳过channel的缓存队列,直接向消息发送给接收的goroutine:
- 调用sendDirect方法,将待写入的消息发送给接收的goroutine;
- 释放channel的全局锁;
- 调用goready函数,将接收消息的goroutine设置成就绪状态,等待调度。
- CASE5:缓存队列未满,则将消息复制到缓存队列上,然后释放全局锁;
- CASE6:缓存队列已满且接收消息队列recv为空,则将当前的goroutine加入到send队列;
- 获取当前goroutine的sudog,然后入channel的send队列;
- 将当前goroutine休眠
关闭channel
1 | func closechan(c *hchan) { |
关闭的主要流程如下所示:
- 获取全局锁;
- 设置channel数据结构chan的关闭标志位;
- 获取当前channel上面的读goroutine并链接成链表;
- 获取当前channel上面的写goroutine然后拼接到前面的读链表后面;
- 释放全局锁;
- 唤醒所有的读写goroutine。
总结
了解一下具体实现还是很好的,虽然在使用上不会带来变化,不过理解了内涵后,能够更加灵活的使用通道,可以更加容易的追查到问题,也能学习到高手的设计思想。