Go: channel 通道
go version # go version go1.25.6
1 · Channel 通道#
A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type.
通道提供了一种机制:用于在并发执行的函数直接发送/接受特定类型的值。
ChannelType = ( “chan” | “chan” ”<-” | ”<-” “chan” ) ElementType .
可选择的 <- 操作符标识chan的方向:发送、接受;如果未给定,则chan是双向的(可发送、可接受)
channel翻译成通道: 那么
通道会有什么类型: 缓冲,非缓冲;
通道会有什么操作: 创建,发送,接收,关闭;
channel分为 带缓冲、不带缓冲两种;可看作异步(带缓冲)、同步(不带缓冲)模式; 对channel的发送,接收等操作,会在编译器,转换成底层的发送,接收等函数;
chan 连接 goroutine,runtime 负责调度 chan 是 goroutine 之间的同步/通信对象;谁在等发、谁在等收都挂在 hchan 的 sendq/recvq 上
1.1 · 创建
// 1. 无缓冲
ch1 := make(chan int)
// 2. 有缓冲
ch2 := make(chan int, 2) // 第2个参数表示chan的容量;为0或不填表示无缓冲
// NOTE:未初始化的chan为nil,不可以发送/接受值
var chan1 chan int
print(chan1 == nil) // true
1.2 · 相关操作
// 1. 发送
// SendStmt = Channel "<-" Expression .
ch <- 1
// 2. 接受
v2 := <-ch
x, ok := <-ch
// 3. 关闭
ch := make(chan int)
close(ch)
// 4. 获取长度、容量
len(ch)
cap(ch)
// 5. 遍历
for v1 := range ch {
}
// 6. select 操作:用于在多个 channel 的发送/接收操作中选择最先就绪的那个
select {
case <-chan1:
case chan2 <- 2:
}
NOTE:
| Operation | nil | empty | full | not full&empty | closed |
|---|---|---|---|---|---|
| receive | 阻塞 | 阻塞 | 读元素 | 读元素 | 返回未读元素,无元素则零值 |
| send | 阻塞 | 写元素 | 阻塞 | 写元素 | panic |
| close | panic | - | - | - | panic |
1.2.1 · select 操作#
执行逻辑:
- 进入 select 时:从左到右计算所有 case 的 channel 和发送值,只算一次
- 选择 case:
- 有至少一个 case 可执行 → 随机选一个执行
- 没有可执行 case 且有 default → 执行 default
- 没有可执行 case 且无 default → 一直阻塞
- 执行:执行被选中的 case 代码;如果是接收且带赋值,则赋值后再执行
NOTE: default 的妙用,实现非阻塞操作
1.3 · Inside Chan#
1.3.1 · 创建
ch := make(chan int)
查看汇编,可知创建chan的函数是runtime.makechan;
// 用户代码创建的chan是一个指针,指向runtime包的hchan值,所以chan是一种引用类型
// t是chan内的元素类型,size是缓冲区大小
func makechan(t *chantype, size int64) *hchan {}
查看chan的定义、makechan源码
// 1. chan 定义:哪个goroutine在等发/收都挂在 hchan 的 sendq/recvq 上
type hchan struct {
qcount uint // chan队列里面的元素数量(用于缓存型chan)
dataqsiz uint // chan底层循环数组的长度(0为非缓存)
buf unsafe.Pointer // 指向底层循环数组的指针,只有缓冲型的 channel 才有
elemsize uint16 // chan中的元素大小
closed uint32 // chan是否关闭
elemtype *_type // chan中的元素类型
// !用于缓存型chan
sendx uint // 已经发送的元素在底层循环数组的索引
recvx uint // 已经接收的元素在底层循环数组的索引
// 重点!关联的goroutine
recvq waitq // 等待接收 的goroutine队列(尝试从channel读取数据而堵塞)
sendq waitq // 等待发送 的goroutine队列(尝试发送数据至channel而堵塞)
lock mutex // 锁保护hchan的所有字段
}
// 2. makechan 源码:分配hchan大小、底层的循环队列(可能无)的大小
func makechan(t *chantype, size int) *hchan {
// 1. 获取元素类型
elem := t.Elem
// 2. 计算内存大小;判断溢出
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 3. 开始分配
var c *hchan
switch {
// 3.1 只分配hchan的大小
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
// 3.2 无指针,hchan 和 buf 一起分配
case !elem.Pointers():
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
// 3.4 有指针,hchan 和 buf 分开分配
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true) // 2nd 参数不为nil,会被GC扫描(无类型信息就不会扫描)
}
// 4. 设置:元素大小、元素类型、循环队列大小
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
1.3.2 · 关闭
ch := make(chan int)
close(ch)
查看汇编,可知关闭chan的函数是runtime.closechan;
查看closechan源码
func closechan(c *hchan) {
// 1. 设置表示关闭的字段
c.closed = 1
var glist gList
// 2. 释放所有阻塞在读当前chan的goroutine
for {
sg := c.recvq.dequeue()
// 2.1 有接受值,给零值
if sg.elem.get() != nil {
typedmemclr(c.elemtype, sg.elem.get())
sg.elem.set(nil)
}
// 2.2 唤醒后,通过 gp.param 取回自己的 sudog
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// 3. 释放所有阻塞在写当前chan的goroutine(会panic)
for {
sg := c.sendq.dequeue()
// 3.1 本次发送作废
sg.elem.set(nil)
// 3.2 唤醒后,通过 gp.param 取回自己的 sudog
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// 4. 唤醒所有goroutine
for !glist.empty() {
gp := glist.pop()
goready(gp, 3)
}
}
NOTE:
- 内建函数 close(ch<-) 关闭一个双向的或者只发送的channel
- close函数应该由发送方调用,而不是接收方(如果是接收方关闭,则发送方会panic;反则接收方只会收到零值)
- 从一个关闭的channel获取值,不会被阻塞,获取的值是channel元素的零值,可以使用
x, ok := <-ch的ok判断channel是否关闭。 - 在channel的最后一个值被接收后,关闭channel ( defer close(ch) )
应用: 由于从一个已经关闭的channel中获取值不会被阻塞,若某些协程中阻塞在接收一个channel的值的操作,可以使用close函数来关闭该channel,这些阻塞的协程将不再阻塞。
1.3.3 · 发送/接受
非缓冲:runtime 在’发送 G’和’接收 G’之间直接传值 缓冲:平时从缓冲区拿/放,必要时仍是 G 对 G
总结发送/接受操作:
- 非阻塞操作时,也就是select时,chan为nil或者对应写/读为满/空,都是直接返回
- 阻塞操作时候,chan为nil或者为满/空,满发送阻塞,空接收阻塞!
- 阻塞操作,且chan为closed时,发送panic,读获取零值
- 阻塞,chan不为满/空且未close,且写操作有对应的读阻塞;读操作有对应的写阻塞,直接传递值
- 阻塞,chan不为满/空且未close,写操作把值放入缓存区,读操作从缓存区拿值
- 阻塞的读/写操作,会被对应的另一个操作唤醒,对应第4步,直接获取值,但如果是被close唤醒,则发送panic,读取获得零值
1.3.3.1 · 发送
ch := make(chan int)
ch <- 1
查看汇编,可知往chan发送数据的函数是runtime.chansend;
查看chansend源码
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 1. 如果channel是nil:非阻塞(select中)则直接返回,阻塞则挂起当前goroutine
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
// 2. 依旧select中,往满chan中写
if !block && c.closed == 0 && full(c) {
return false
}
// 3. 往关闭的chan写,panic
if c.closed != 0 {
panic(plainError("send on closed channel"))
}
// 4. 有阻塞在读当前chan的协程,直接传递值给它,不经过缓存区
// eq 即要发送的值
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 4. 缓存区有空间:将要发送的值入队列(并无阻塞读方需要唤醒)
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz { // 重置索引
c.sendx = 0
}
c.qcount++
return true
}
// 5. 后续一定是阻塞操作了(非select),需要把当前 g 挂起来了
if !block {
return false
}
gp := getg()
// 5.1 构建伪goroutine
mysg := acquireSudog()
mysg.elem.set(ep) // 5.2 设置值
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c.set(c)
gp.waiting = mysg
gp.param = nil
// 5.2 入 chan 的阻塞写队列
c.sendq.enqueue(mysg)
// 5.3 阻塞当前g,挂起
reason := waitReasonChanSend
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
// 阻塞在这里........
// 5.4 被唤醒:两者可能 1) 读方拿掉了值 2) chan被close了
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success // 发送未成功,表示chan被关了
gp.param = nil
mysg.c.set(nil) // 值被消费掉了
releaseSudog(mysg) // 释放伪g
if closed {
panic(plainError("send on closed channel"))
}
return true
}
总结发送操作:
- 非阻塞部分的逻辑(select相关):为nil/满了(未关闭),则直接返回;
- 有读操作阻塞,直接把值给它
- 缓存区有空间,放入缓存区
- 无空间,表示阻塞了,构建伪g(值存这里),挂载到chan上
- 等唤醒:读操作或关闭chan唤醒(伪g资源释放、关闭chan唤醒需要panic)
NOTE: 发送到nil的chan,挂起(select则不挂起);发送到closed的chan,panic(select的也panic)
1.3.3.2 · 接收
ch := make(chan int)
val := <- ch
// 或者
val, ok := <- ch
分是否带ok两种,查看汇编,可知往chan发送数据的函数是runtime.chanrecv1或runtime.chanrecv2,最终还是调用runtime.chanrecv
查看chanrecv源码
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 1. 从不阻塞的chan读(select)则直接返回;否则挂起
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
// 2. 快速路径(不阻塞,且为空):
if !block && empty(c) {
// 2.1 未关闭,直接返回
if atomic.Load(&c.closed) == 0 {
return
}
// 2.2 已经关闭且为空,返回零值
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
// 3. 已经关闭
if c.closed != 0 {
// 3.1 环形队列里面也没值了,返回零值
if c.qcount == 0 {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
// 3.2 未关闭,且当前chan有阻塞写,直接获取值并返回
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 4. 未关闭,且当前chan无阻塞写
if c.qcount > 0 {
// 获取缓存区的元素
qp := chanbuf(c, c.recvx)
// 赋值到接受值
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp) // 清空缓冲区的值
// 索引数据的调整
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
return true, true
}
if !block {
// 5. 后续都是阻塞操作了
return false, false
}
gp := getg()
// 5.1 构建伪g
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem.set(ep) // 设置值
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c.set(c)
gp.param = nil
// 5.2 伪g入chan
c.recvq.enqueue(mysg)
// 5.3 阻塞当前g
reason := waitReasonChanReceive
if c.bubble != nil {
reason = waitReasonSynctestChanReceive
}
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)
// 阻塞在这里.......
// 6. 被唤醒:对chan有写操作,或被close
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c.set(nil)
releaseSudog(mysg) // 释放伪g
return true, success
}
总结接收操作:
- 不阻塞也就是select中,为nil或为空,直接放回;和发送类似,为nil或者满了,直接返回
- close了且为空,则返回零值,这里和发送不一样,往close里面发会panic
- 不为空,有阻塞写,则接受值后返回
- 缓存区有值,直接拿
- 无值,阻塞,直到有写操作或者被close
1.4 · Patterns#
1.4.1 · 1. Or-Done 模式#
场景:多个 goroutine 各自干活(可能做不同事),谁先完成就关 done(或 cancel context),其他 goroutine 通过 select 发现后提前退出。
目的:竞速 / 取“第一个结果”,并取消还没完成的任务,节省资源。
典型写法: done := make(chan struct{}),先完成的 goroutine 里 close(done),其他 goroutine 里 select { case <-done: return; case … }。 这是“多路竞速,先到者胜,其余收工”的编排。
1.4.2 · 2. 扇入模式 (Fan-In)#
多个输入 channel,一个输出 channel。将多个源 channel 的数据合并到一个目的 channel。
方法:使用select监听多个chan,或者使用waitgroup开启多个协程往chan内写
1.4.3 · 3. 扇出模式 (Fan-Out)#
一个输入 channel,多个输出 channel。将一个源 channel 的数据分发到多个目的 channel。
方法:开启多个协程从chan读
1.4.4 · 4. Stream 模式#
把 channel 看作数据流,提供 map、filter、take 等操作。
// take:只取前 n 个元素
func take(ctx context.Context, in <-chan int, n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < n; i++ {
select {
case val, ok := <-in:
if !ok {
return
}
out <- val
case <-ctx.Done():
return
}
}
}()
return out
}
// map:映射转换
func streamMap(ctx context.Context, in <-chan int, f func(int) int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for val := range in {
select {
case out <- f(val):
case <-ctx.Done():
return
}
}
}()
return out
}
// filter:过滤
func filter(ctx context.Context, in <-chan int, f func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for val := range in {
if f(val) {
select {
case out <- val:
case <-ctx.Done():
return
}
}
}
}()
return out
}
1.5 · 思考
Do not communicate by sharing memory; instead, share memory by communicating. 翻译:不要通过分享内存进行通信,而是通过通信进行分享内存。
理解: communicate是多个程序之间进行通信; sharing memory是分享一块内存的信息; 前半句中,通过分享内存,从而进行通信;目的是通信,方法是分享内存;举个例子,多个人之间,共用一个内存块,利用这一个内存块,进行通信; 后半句中,通过通信,从而分享内存;目的是分享内存,方法是通信;举个例子,我想知道这个内存块的信息(分享),使用通信(channel)的方式复制这个块的信息,从而达到分享内存信息的目的;
注意:go是值传递
场景:
- 共享资源的并发访问使用传统并发原语;
- 复杂的任务编排和消息传递使用 Channel;
- 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
- 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
- 需要和 Select 语句结合,使用 Channel;
- 需要和超时配合时,使用 Channel 和 Context。