Go: coro
1 · Go: coro#
src/runtime/coro.go https://research.swtch.com/coro
1.1 · 为什么需要 coro#
Go 已经有 goroutine,但 goroutine 提供的是 并发 + 并行。有些场景只需要 并发而不需要并行(concurrency without parallelism):
- Rob Pike 的 lexer/parser 案例:text/template 最初用 goroutine+channel 实现 lexer 和 parser 之间的协作,但 goroutine 带来的并行导致了 data race,最终不得不改为手动状态机。如果有 coroutine,就不会有 race,且比 goroutine 更高效。
- range over func / iter.Pull:
iter.Pull需要将 push-style 迭代器转换为 pull-style,本质上就是两个控制流交替执行,不需要并行。
┌──────────────────────────────────────────────────────┐
│ go statement vs coro.New │
│ │
│ go f() → 新的并发 + 新的并行 (可同时执行) │
│ coro.New → 新的并发, 无新并行 (交替执行, 一次只跑一个)│
└──────────────────────────────────────────────────────┘
1.2 · 概念模型:特殊的 channel#
coro 可以理解为一个 始终有一个 goroutine 阻塞在上面的特殊 channel:
- goroutine A 阻塞在 coro 上
- goroutine B 调用
coroswitch(c)→ B 变成阻塞在 coro 上的那个,A 开始运行 - 如此交替,直到
coroexit(c)结束协作
核心不变量:总有一个 goroutine 阻塞,一个在运行,并行度始终不增加。
1.3 · 纯 Go 语义定义(channel 实现)#
runtime 的实现必须和纯 Go 定义在语义上等价。纯 Go 版本用一对 channel 模拟:
func New[In, Out any](f func(In, func(Out) In) Out) (resume func(In) (Out, bool)) {
cin := make(chan In)
cout := make(chan Out)
running := true
resume = func(in In) (out Out, ok bool) {
if !running { return }
cin <- in // 唤醒 coroutine
out = <-cout // 等待 coroutine yield
return out, running
}
yield := func(out Out) In {
cout <- out // 交出值
return <-cin // 等待 resume
}
go func() {
out := f(<-cin, yield)
running = false
cout <- out
}()
return resume
}
resume(in)= 向 cin 发送 → 从 cout 接收(一次 coroutine switch)yield(out)= 向 cout 发送 → 从 cin 接收(一次 coroutine switch)- 共享
running不是 race:两个 goroutine 通过 channel 同步,交替执行
1.4 · runtime 实现(src/runtime/coro.go)#
纯 Go 版本每次 switch ≈190ns。runtime 版本绕过 channel 和调度器,直接做 goroutine 切换,只需 3 个 CAS ≈20ns/switch(快 ~10x)。
1.4.1 · 核心数据结构
type coro struct {
gp guintptr // 当前阻塞在此 coro 上的 goroutine
f func(*coro) // coroutine 要执行的函数
// 线程锁定状态验证
mp *m
lockedExt uint32 // 创建时 mp 的外部 LockOSThread 计数
lockedInt uint32 // 创建时 mp 的内部 lockOSThread 计数
}
1.4.2 · 生命周期
newcoro(f) coroswitch(c) coroswitch(c) coroexit(c)
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 创建新g │ │ 当前g→等待│ │ 当前g→等待│ │ 当前g销毁│
│ g阻塞在c│────────▶│ coro的g运行│────────▶│ 调用者运行 │────────▶│ coro的g运行│
│ 返回coro│ │ │ │ │ │ (结束) │
└────────┘ └──────────┘ └──────────┘ └──────────┘
1.4.3 · newcoro(f) — 创建#
func newcoro(f func(*coro)) *coro {
c := new(coro)
c.f = f
// 在 systemstack 上通过 newproc1 创建新 goroutine
// 新 goroutine 入口是 corostart,初始状态是等待(waitReasonCoroutine)
// 如果当前线程有 LockOSThread,记录锁定状态到 coro 中
gp.coroarg = c
c.gp.set(gp) // coro 记住这个阻塞的 goroutine
return c
}
关键点:
- 新 goroutine 通过
newproc1创建,但 不会被放入运行队列(true参数),只阻塞在 coro 上等待 corostart是入口:执行c.f(c),完成后defer coroexit(c)自动清理
1.4.4 · coroswitch_m(gp) — 切换(核心热路径)#
这是最关键的函数,设计目标是 极致轻量。快速路径只有 3 个 CAS:
CAS 1: gp.atomicstatus _Grunning → _Gwaiting (当前 g 暂停)
CAS 2: c.gp.cas(next, self) (原子交换 coro 中的 g)
CAS 3: gnext.atomicstatus _Gwaiting → _Grunning (目标 g 恢复运行)
详细流程:
func coroswitch_m(gp *g) {
// 1. 验证线程锁定状态一致性
// 如果 coro 创建时有 LockOSThread,切换时必须在同一线程上
// 2. 获取 tracer(如果有 trace 在记录)
// 3. 暂停当前 goroutine
// 快速路径:直接 CAS _Grunning → _Gwaiting
// 慢速路径:走 casgstatus(和 GC 协调)
// 4. 原子交换 coro 中阻塞的 goroutine
// 把自己放进去,把之前阻塞的那个取出来
for {
next := c.gp // racy load(故意的,省 atomic load)
if c.gp.cas(next, self) { // CAS 交换
gnext = next.ptr()
break
}
}
// 5. 切换到 gnext
// 设置 mp.curg = gnext, gnext.m = mp
// CAS gnext: _Gwaiting → _Grunning
// 调用 gogo(&gnext.sched) —— 不返回,直接跳到 gnext 的执行上下文
}
为什么 racy load 是安全的:c.gp 的读取是非原子的,但如果读到错误值,后面的 CAS 会失败,下一轮循环会读到正确值。这样省掉了一次 atomic load(在非 x86 平台上有开销)。
1.4.5 · coroexit(c) — 退出#
func coroexit(c *coro) {
gp.coroexit = true
mcall(coroswitch_m) // 复用 coroswitch_m 的逻辑
}
coroswitch_m 中检测到 exit == true 时,会调用 gdestroy(gp) 销毁当前 goroutine 而不是暂停它,然后切换到 coro 中等待的那个 goroutine。
1.5 · 线程锁定(LockOSThread)验证#
如果创建 coro 时当前 goroutine 锁定了 OS 线程(LockOSThread),那么:
- coro 记录下
(mp, lockedExt, lockedInt) - 每次
coroswitch时验证:必须在同一线程上,且锁定计数必须一致 - 切换时会 detach/reattach
lockedm,把线程锁定状态”转移”给目标 goroutine
这保证了 cgo callback 等依赖线程绑定的场景不会出错。
1.6 · synctest bubble 支持#
如果 goroutine 在 synctest group 中:
- 不走快速 CAS 路径,改走
casgstatus(跟踪 group 的活跃状态) - 切换前
bubble.incActive(),切换后bubble.decActive()
1.7 · 实际应用:iter.Pull#
iter.Pull 是 coro 的主要用户,将 push-style 迭代器转为 pull-style:
func Pull[V any](seq iter.Seq[V]) (next func() (V, bool), stop func()) {
// 内部使用 newcoro 创建 coroutine
// next() 调用 coroswitch 切到迭代器,迭代器 yield 时 coroswitch 回来
// stop() 调用 coroexit 结束迭代器
}
1.8 · 性能对比
| 实现方式 | 每次 switch | Pull 每个值 |
|---|---|---|
| channel 纯 Go | ~190ns | ~380ns |
| channel 编译器优化 | ~118ns | ~236ns |
| runtime coro (3 CAS) | ~20ns | ~40ns |
runtime 实现比 channel 快约 10 倍。