Skip to main content

Go: coro

📅 2026-03-19 ✏️ 2026-03-19 Inside Go CS GO
No related notes

1 · Go: coro#

src/runtime/coro.go https://research.swtch.com/coro

1.1 · 为什么需要 coro#

Go 已经有 goroutine,但 goroutine 提供的是 并发 + 并行。有些场景只需要 并发而不需要并行(concurrency without parallelism):

  • Rob Pike 的 lexer/parser 案例:text/template 最初用 goroutine+channel 实现 lexer 和 parser 之间的协作,但 goroutine 带来的并行导致了 data race,最终不得不改为手动状态机。如果有 coroutine,就不会有 race,且比 goroutine 更高效。
  • range over func / iter.Pulliter.Pull 需要将 push-style 迭代器转换为 pull-style,本质上就是两个控制流交替执行,不需要并行。
  ┌──────────────────────────────────────────────────────┐
  │         go statement vs coro.New                     │
  │                                                      │
  │  go f()    → 新的并发 + 新的并行 (可同时执行)         │
  │  coro.New  → 新的并发, 无新并行 (交替执行, 一次只跑一个)│
  └──────────────────────────────────────────────────────┘

1.2 · 概念模型:特殊的 channel#

coro 可以理解为一个 始终有一个 goroutine 阻塞在上面的特殊 channel

  1. goroutine A 阻塞在 coro 上
  2. goroutine B 调用 coroswitch(c) → B 变成阻塞在 coro 上的那个,A 开始运行
  3. 如此交替,直到 coroexit(c) 结束协作

核心不变量:总有一个 goroutine 阻塞,一个在运行,并行度始终不增加

1.3 · 纯 Go 语义定义(channel 实现)#

runtime 的实现必须和纯 Go 定义在语义上等价。纯 Go 版本用一对 channel 模拟:

func New[In, Out any](f func(In, func(Out) In) Out) (resume func(In) (Out, bool)) {
    cin := make(chan In)
    cout := make(chan Out)
    running := true
    resume = func(in In) (out Out, ok bool) {
        if !running { return }
        cin <- in       // 唤醒 coroutine
        out = <-cout    // 等待 coroutine yield
        return out, running
    }
    yield := func(out Out) In {
        cout <- out     // 交出值
        return <-cin    // 等待 resume
    }
    go func() {
        out := f(<-cin, yield)
        running = false
        cout <- out
    }()
    return resume
}
  • resume(in) = 向 cin 发送 → 从 cout 接收(一次 coroutine switch)
  • yield(out) = 向 cout 发送 → 从 cin 接收(一次 coroutine switch)
  • 共享 running 不是 race:两个 goroutine 通过 channel 同步,交替执行

1.4 · runtime 实现(src/runtime/coro.go)#

纯 Go 版本每次 switch ≈190ns。runtime 版本绕过 channel 和调度器,直接做 goroutine 切换,只需 3 个 CAS ≈20ns/switch(快 ~10x)。

1.4.1 · 核心数据结构

type coro struct {
    gp guintptr       // 当前阻塞在此 coro 上的 goroutine
    f  func(*coro)    // coroutine 要执行的函数

    // 线程锁定状态验证
    mp        *m
    lockedExt uint32  // 创建时 mp 的外部 LockOSThread 计数
    lockedInt uint32  // 创建时 mp 的内部 lockOSThread 计数
}

1.4.2 · 生命周期

newcoro(f)           coroswitch(c)         coroswitch(c)         coroexit(c)
    │                    │                     │                     │
    ▼                    ▼                     ▼                     ▼
┌────────┐         ┌──────────┐          ┌──────────┐          ┌──────────┐
│ 创建新g │         │ 当前g→等待│          │ 当前g→等待│          │ 当前g销毁│
│ g阻塞在c│────────▶│ coro的g运行│────────▶│ 调用者运行 │────────▶│ coro的g运行│
│ 返回coro│         │          │          │          │          │ (结束)    │
└────────┘         └──────────┘          └──────────┘          └──────────┘

1.4.3 · newcoro(f) — 创建#

func newcoro(f func(*coro)) *coro {
    c := new(coro)
    c.f = f
    // 在 systemstack 上通过 newproc1 创建新 goroutine
    // 新 goroutine 入口是 corostart,初始状态是等待(waitReasonCoroutine)
    // 如果当前线程有 LockOSThread,记录锁定状态到 coro 中
    gp.coroarg = c
    c.gp.set(gp)   // coro 记住这个阻塞的 goroutine
    return c
}

关键点:

  • 新 goroutine 通过 newproc1 创建,但 不会被放入运行队列true 参数),只阻塞在 coro 上等待
  • corostart 是入口:执行 c.f(c),完成后 defer coroexit(c) 自动清理

1.4.4 · coroswitch_m(gp) — 切换(核心热路径)#

这是最关键的函数,设计目标是 极致轻量。快速路径只有 3 个 CAS:

CAS 1: gp.atomicstatus  _Grunning → _Gwaiting    (当前 g 暂停)
CAS 2: c.gp.cas(next, self)                        (原子交换 coro 中的 g)
CAS 3: gnext.atomicstatus _Gwaiting → _Grunning   (目标 g 恢复运行)

详细流程:

func coroswitch_m(gp *g) {
    // 1. 验证线程锁定状态一致性
    //    如果 coro 创建时有 LockOSThread,切换时必须在同一线程上

    // 2. 获取 tracer(如果有 trace 在记录)

    // 3. 暂停当前 goroutine
    //    快速路径:直接 CAS _Grunning → _Gwaiting
    //    慢速路径:走 casgstatus(和 GC 协调)

    // 4. 原子交换 coro 中阻塞的 goroutine
    //    把自己放进去,把之前阻塞的那个取出来
    for {
        next := c.gp                    // racy load(故意的,省 atomic load)
        if c.gp.cas(next, self) {       // CAS 交换
            gnext = next.ptr()
            break
        }
    }

    // 5. 切换到 gnext
    //    设置 mp.curg = gnext, gnext.m = mp
    //    CAS gnext: _Gwaiting → _Grunning
    //    调用 gogo(&gnext.sched) —— 不返回,直接跳到 gnext 的执行上下文
}

为什么 racy load 是安全的c.gp 的读取是非原子的,但如果读到错误值,后面的 CAS 会失败,下一轮循环会读到正确值。这样省掉了一次 atomic load(在非 x86 平台上有开销)。

1.4.5 · coroexit(c) — 退出#

func coroexit(c *coro) {
    gp.coroexit = true
    mcall(coroswitch_m)  // 复用 coroswitch_m 的逻辑
}

coroswitch_m 中检测到 exit == true 时,会调用 gdestroy(gp) 销毁当前 goroutine 而不是暂停它,然后切换到 coro 中等待的那个 goroutine。

1.5 · 线程锁定(LockOSThread)验证#

如果创建 coro 时当前 goroutine 锁定了 OS 线程(LockOSThread),那么:

  • coro 记录下 (mp, lockedExt, lockedInt)
  • 每次 coroswitch 时验证:必须在同一线程上,且锁定计数必须一致
  • 切换时会 detach/reattach lockedm,把线程锁定状态”转移”给目标 goroutine

这保证了 cgo callback 等依赖线程绑定的场景不会出错。

1.6 · synctest bubble 支持#

如果 goroutine 在 synctest group 中:

  • 不走快速 CAS 路径,改走 casgstatus(跟踪 group 的活跃状态)
  • 切换前 bubble.incActive(),切换后 bubble.decActive()

1.7 · 实际应用:iter.Pull#

iter.Pull 是 coro 的主要用户,将 push-style 迭代器转为 pull-style:

func Pull[V any](seq iter.Seq[V]) (next func() (V, bool), stop func()) {
    // 内部使用 newcoro 创建 coroutine
    // next() 调用 coroswitch 切到迭代器,迭代器 yield 时 coroswitch 回来
    // stop() 调用 coroexit 结束迭代器
}

1.8 · 性能对比

实现方式每次 switchPull 每个值
channel 纯 Go~190ns~380ns
channel 编译器优化~118ns~236ns
runtime coro (3 CAS)~20ns~40ns

runtime 实现比 channel 快约 10 倍

https://unskilled.blog/posts/coroutines-in-go