Skip to main content

Go: channel 通道

📅 2026-02-05 ✏️ 2026-03-15 Inside Go CS GO
go version # go version go1.25.6

1 · Channel 通道#

A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type.

通道提供了一种机制:用于在并发执行的函数直接发送/接受特定类型的值。

ChannelType = ( “chan” | “chan” ”<-” | ”<-” “chan” ) ElementType .

可选择的 <- 操作符标识chan的方向:发送、接受;如果未给定,则chan是双向的(可发送、可接受)

channel翻译成通道: 那么 通道会有什么类型: 缓冲,非缓冲; 通道会有什么操作: 创建,发送,接收,关闭;

channel分为 带缓冲、不带缓冲两种;可看作异步(带缓冲)、同步(不带缓冲)模式; 对channel的发送,接收等操作,会在编译器,转换成底层的发送,接收等函数;

chan 连接 goroutine,runtime 负责调度 chan 是 goroutine 之间的同步/通信对象;谁在等发、谁在等收都挂在 hchan 的 sendq/recvq 上

1.1 · 创建

// 1. 无缓冲
ch1 := make(chan int)
// 2. 有缓冲
ch2 := make(chan int, 2) // 第2个参数表示chan的容量;为0或不填表示无缓冲

// NOTE:未初始化的chan为nil,不可以发送/接受值
var chan1 chan int
print(chan1 == nil) // true

1.2 · 相关操作

// 1. 发送
// SendStmt = Channel "<-" Expression .
ch <- 1

// 2. 接受
v2    := <-ch
x, ok := <-ch

// 3. 关闭
ch := make(chan int)
close(ch)

// 4. 获取长度、容量
len(ch)
cap(ch)

// 5. 遍历
for v1 := range ch {
}

// 6. select 操作:用于在多个 channel 的发送/接收操作中选择最先就绪的那个
select {
	case <-chan1:
	case chan2 <- 2:
}

NOTE:

Operationnilemptyfullnot full&emptyclosed
receive阻塞阻塞读元素读元素返回未读元素,无元素则零值
send阻塞写元素阻塞写元素panic
closepanic---panic

1.2.1 · select 操作#

https://go.dev/ref/spec#Select_statements

执行逻辑:

  1. 进入 select 时:从左到右计算所有 case 的 channel 和发送值,只算一次
  2. 选择 case:
    • 有至少一个 case 可执行 → 随机选一个执行
    • 没有可执行 case 且有 default → 执行 default
    • 没有可执行 case 且无 default → 一直阻塞
  3. 执行:执行被选中的 case 代码;如果是接收且带赋值,则赋值后再执行

NOTE: default 的妙用,实现非阻塞操作

1.3 · Inside Chan#

1.3.1 · 创建

ch := make(chan int)

查看汇编,可知创建chan的函数是runtime.makechan

// 用户代码创建的chan是一个指针,指向runtime包的hchan值,所以chan是一种引用类型
// t是chan内的元素类型,size是缓冲区大小
func makechan(t *chantype, size int64) *hchan {}

查看chan的定义、makechan源码

// 1. chan 定义:哪个goroutine在等发/收都挂在 hchan 的 sendq/recvq 上
type hchan struct {
	qcount   uint           // chan队列里面的元素数量(用于缓存型chan)
	dataqsiz uint           // chan底层循环数组的长度(0为非缓存)
	buf      unsafe.Pointer // 指向底层循环数组的指针,只有缓冲型的 channel 才有
	elemsize uint16         // chan中的元素大小
	closed   uint32         // chan是否关闭
	elemtype *_type         // chan中的元素类型
	                        // !用于缓存型chan
	sendx    uint           // 已经发送的元素在底层循环数组的索引
	recvx    uint           // 已经接收的元素在底层循环数组的索引

	                        // 重点!关联的goroutine
	recvq    waitq          // 等待接收 的goroutine队列(尝试从channel读取数据而堵塞)
	sendq    waitq          // 等待发送 的goroutine队列(尝试发送数据至channel而堵塞)
	
	lock mutex              // 锁保护hchan的所有字段
}

// 2. makechan 源码:分配hchan大小、底层的循环队列(可能无)的大小
func makechan(t *chantype, size int) *hchan {
	// 1. 获取元素类型
	elem := t.Elem

	// 2. 计算内存大小;判断溢出
	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// 3. 开始分配
	var c *hchan
	switch {
	// 3.1 只分配hchan的大小
	case mem == 0:
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	// 3.2 无指针,hchan 和 buf 一起分配
	case !elem.Pointers():
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	// 3.4 有指针,hchan 和 buf 分开分配
	default:
		c = new(hchan) 
		c.buf = mallocgc(mem, elem, true) // 2nd 参数不为nil,会被GC扫描(无类型信息就不会扫描)
	}

	// 4. 设置:元素大小、元素类型、循环队列大小
	c.elemsize = uint16(elem.Size_)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	return c
}

1.3.2 · 关闭

ch := make(chan int)
close(ch)

查看汇编,可知关闭chan的函数是runtime.closechan;

查看closechan源码

func closechan(c *hchan) {
	// 1. 设置表示关闭的字段
	c.closed = 1

	var glist gList

	// 2. 释放所有阻塞在读当前chan的goroutine
	for {
		sg := c.recvq.dequeue()
		// 2.1 有接受值,给零值
		if sg.elem.get() != nil {
			typedmemclr(c.elemtype, sg.elem.get())
			sg.elem.set(nil)
		}
		// 2.2 唤醒后,通过 gp.param 取回自己的 sudog
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		glist.push(gp)
	}

	// 3. 释放所有阻塞在写当前chan的goroutine(会panic)
	for {
		sg := c.sendq.dequeue()
		// 3.1 本次发送作废
		sg.elem.set(nil)
		// 3.2 唤醒后,通过 gp.param 取回自己的 sudog
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		glist.push(gp)
	}

	// 4. 唤醒所有goroutine
	for !glist.empty() {
		gp := glist.pop()
		goready(gp, 3)
	}
}

NOTE:

  1. 内建函数 close(ch<-) 关闭一个双向的或者只发送的channel
  2. close函数应该由发送方调用,而不是接收方(如果是接收方关闭,则发送方会panic;反则接收方只会收到零值)
  3. 从一个关闭的channel获取值,不会被阻塞,获取的值是channel元素的零值,可以使用x, ok := <-ch的ok判断channel是否关闭。
  4. 在channel的最后一个值被接收后,关闭channel ( defer close(ch) )

应用: 由于从一个已经关闭的channel中获取值不会被阻塞,若某些协程中阻塞在接收一个channel的值的操作,可以使用close函数来关闭该channel,这些阻塞的协程将不再阻塞。

1.3.3 · 发送/接受

非缓冲:runtime 在’发送 G’和’接收 G’之间直接传值 缓冲:平时从缓冲区拿/放,必要时仍是 G 对 G

总结发送/接受操作:

  1. 非阻塞操作时,也就是select时,chan为nil或者对应写/读为满/空,都是直接返回
  2. 阻塞操作时候,chan为nil或者为满/空,满发送阻塞,空接收阻塞!
  3. 阻塞操作,且chan为closed时,发送panic,读获取零值
  4. 阻塞,chan不为满/空且未close,且写操作有对应的读阻塞;读操作有对应的写阻塞,直接传递值
  5. 阻塞,chan不为满/空且未close,写操作把值放入缓存区,读操作从缓存区拿值
  6. 阻塞的读/写操作,会被对应的另一个操作唤醒,对应第4步,直接获取值,但如果是被close唤醒,则发送panic,读取获得零值

1.3.3.1 · 发送

ch := make(chan int)
ch <- 1

查看汇编,可知往chan发送数据的函数是runtime.chansend;

查看chansend源码

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 1. 如果channel是nil:非阻塞(select中)则直接返回,阻塞则挂起当前goroutine
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
		throw("unreachable")
	}

	// 2. 依旧select中,往满chan中写
	if !block && c.closed == 0 && full(c) {
		return false
	}

	// 3. 往关闭的chan写,panic
	if c.closed != 0 {
		panic(plainError("send on closed channel"))
	}

	// 4. 有阻塞在读当前chan的协程,直接传递值给它,不经过缓存区
	// eq 即要发送的值
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 4. 缓存区有空间:将要发送的值入队列(并无阻塞读方需要唤醒)
	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz { // 重置索引
			c.sendx = 0
		}
		c.qcount++
		return true
	}

	// 5. 后续一定是阻塞操作了(非select),需要把当前 g 挂起来了
	if !block {
		return false
	}

	gp := getg()
	// 5.1 构建伪goroutine
	mysg := acquireSudog()
	mysg.elem.set(ep) // 5.2 设置值
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c.set(c)
	gp.waiting = mysg
	gp.param = nil
	// 5.2 入 chan 的阻塞写队列
	c.sendq.enqueue(mysg) 
	// 5.3 阻塞当前g,挂起
	reason := waitReasonChanSend
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)

	// 阻塞在这里........

	// 5.4 被唤醒:两者可能 1) 读方拿掉了值 2) chan被close了
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success // 发送未成功,表示chan被关了
	gp.param = nil
	mysg.c.set(nil) // 值被消费掉了
	releaseSudog(mysg) // 释放伪g
	if closed {
		panic(plainError("send on closed channel"))
	}
	return true
}

总结发送操作:

  1. 非阻塞部分的逻辑(select相关):为nil/满了(未关闭),则直接返回;
  2. 有读操作阻塞,直接把值给它
  3. 缓存区有空间,放入缓存区
  4. 无空间,表示阻塞了,构建伪g(值存这里),挂载到chan上
  5. 等唤醒:读操作或关闭chan唤醒(伪g资源释放、关闭chan唤醒需要panic)

NOTE: 发送到nil的chan,挂起(select则不挂起);发送到closed的chan,panic(select的也panic)

1.3.3.2 · 接收

ch := make(chan int)
val := <- ch
// 或者
val, ok := <- ch

分是否带ok两种,查看汇编,可知往chan发送数据的函数是runtime.chanrecv1runtime.chanrecv2,最终还是调用runtime.chanrecv

查看chanrecv源码

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	// 1. 从不阻塞的chan读(select)则直接返回;否则挂起
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
		throw("unreachable")
	}


	// 2. 快速路径(不阻塞,且为空):
	if !block && empty(c) {
		// 2.1 未关闭,直接返回
		if atomic.Load(&c.closed) == 0 {
			return
		}
		// 2.2 已经关闭且为空,返回零值
		if empty(c) {
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	// 3. 已经关闭
	if c.closed != 0 {
		// 3.1 环形队列里面也没值了,返回零值
		if c.qcount == 0 {
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	} else {
		// 3.2 未关闭,且当前chan有阻塞写,直接获取值并返回
		if sg := c.sendq.dequeue(); sg != nil {
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}

	// 4. 未关闭,且当前chan无阻塞写
	if c.qcount > 0 {
		// 获取缓存区的元素
		qp := chanbuf(c, c.recvx)
		// 赋值到接受值
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp) // 清空缓冲区的值
		// 索引数据的调整
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		return true, true
	}

	if !block {
		// 5. 后续都是阻塞操作了
		return false, false
	}

	gp := getg()
	// 5.1 构建伪g
	mysg := acquireSudog()
	mysg.releasetime = 0
	mysg.elem.set(ep) // 设置值
	mysg.waitlink = nil
	gp.waiting = mysg

	mysg.g = gp
	mysg.isSelect = false
	mysg.c.set(c)
	gp.param = nil
	// 5.2 伪g入chan
	c.recvq.enqueue(mysg)

	// 5.3 阻塞当前g
	reason := waitReasonChanReceive
	if c.bubble != nil {
		reason = waitReasonSynctestChanReceive
	}
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)

	// 阻塞在这里.......

	// 6. 被唤醒:对chan有写操作,或被close
	gp.waiting = nil
	gp.activeStackChans = false
	success := mysg.success
	gp.param = nil
	mysg.c.set(nil)
	releaseSudog(mysg) // 释放伪g
	return true, success
}

总结接收操作:

  1. 不阻塞也就是select中,为nil或为空,直接放回;和发送类似,为nil或者满了,直接返回
  2. close了且为空,则返回零值,这里和发送不一样,往close里面发会panic
  3. 不为空,有阻塞写,则接受值后返回
  4. 缓存区有值,直接拿
  5. 无值,阻塞,直到有写操作或者被close

1.4 · Patterns#

1.4.1 · 1. Or-Done 模式#

场景:多个 goroutine 各自干活(可能做不同事),谁先完成就关 done(或 cancel context),其他 goroutine 通过 select 发现后提前退出。

目的:竞速 / 取“第一个结果”,并取消还没完成的任务,节省资源。

典型写法: done := make(chan struct{}),先完成的 goroutine 里 close(done),其他 goroutine 里 select { case <-done: return; case … }。 这是“多路竞速,先到者胜,其余收工”的编排。

1.4.2 · 2. 扇入模式 (Fan-In)#

多个输入 channel,一个输出 channel。将多个源 channel 的数据合并到一个目的 channel。

方法:使用select监听多个chan,或者使用waitgroup开启多个协程往chan内写

1.4.3 · 3. 扇出模式 (Fan-Out)#

一个输入 channel,多个输出 channel。将一个源 channel 的数据分发到多个目的 channel。

方法:开启多个协程从chan读

1.4.4 · 4. Stream 模式#

把 channel 看作数据流,提供 map、filter、take 等操作。

// take:只取前 n 个元素
func take(ctx context.Context, in <-chan int, n int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 0; i < n; i++ {
            select {
            case val, ok := <-in:
                if !ok {
                    return
                }
                out <- val
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// map:映射转换
func streamMap(ctx context.Context, in <-chan int, f func(int) int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for val := range in {
            select {
            case out <- f(val):
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// filter:过滤
func filter(ctx context.Context, in <-chan int, f func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for val := range in {
            if f(val) {
                select {
                case out <- val:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

1.5 · 思考

Do not communicate by sharing memory; instead, share memory by communicating. 翻译:不要通过分享内存进行通信,而是通过通信进行分享内存。

理解: communicate是多个程序之间进行通信; sharing memory是分享一块内存的信息; 前半句中,通过分享内存,从而进行通信;目的是通信,方法是分享内存;举个例子,多个人之间,共用一个内存块,利用这一个内存块,进行通信; 后半句中,通过通信,从而分享内存;目的是分享内存,方法是通信;举个例子,我想知道这个内存块的信息(分享),使用通信(channel)的方式复制这个块的信息,从而达到分享内存信息的目的;

注意:go是值传递

场景:

  1. 共享资源的并发访问使用传统并发原语;
  2. 复杂的任务编排消息传递使用 Channel;
  3. 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
  4. 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
  5. 需要和 Select 语句结合,使用 Channel;
  6. 需要和超时配合时,使用 Channel 和 Context。
  1. Channel Axioms
  2. Go Concurrency Patterns: Pipelines and cancellation
  3. https://www.youtube.com/watch?v=KBZlN0izeiY