Go: schedule
Outlinks (0)
No outlinks found
Backlinks (1)
Backlinks (1)
1 · Go: schedule#
Go 语言原生支持并发,开发者可以非常轻松地创建成千上万个 goroutine。但问题是,操作系统能够直接调度的是线程,而不是 goroutine。那么,这么多 goroutine 究竟是如何高效运行起来的?
引出问题:goroutine 如何被调度的?
想法:为每个 goroutine 分配一个线程,让两者一一对应。这样实现起来似乎最简单,因为操作系统本来就负责线程调度,Go 运行时只需要把 goroutine 交给线程执行即可。 遇到问题:goroutine 的设计目标是轻量,程序里常常会创建成千上万个 goroutine;而线程是操作系统资源,创建、销毁和切换成本都更高,栈空间占用也更大。如果为每个 goroutine 都分配一个线程,线程数量会迅速膨胀,带来巨大的内存和调度开销,这显然无法支撑 Go 所追求的高并发。
想法:goroutine 不能简单地与线程一一对应,通过运行时调度器复用到更少的线程上执行 问题:如何复用到更少的线程上?大量的可运行的goroutine放哪里?
想法:引入资源P,每个线程想要运行goroutine,必须持有一个P;其次就是每个P有一个本地队列,创建的goroutine放本地队列 问题:线程阻塞了怎么办?
想法:底层可以转为非阻塞操作的,只阻塞g,线程运行本地队列的其他g;不能转的,挂起线程,P找一个空闲的线程或创建一个线程,继续执行P上本地队列的g 问题:P本地队列没有g了怎么办? 阻塞的g的操作或者挂起线程的操作好了怎么办?无非阻塞操作时就不能调度了吗?
想法:窃取其他P的 想法:sysmon 一个用于监控线程,监控阻塞恢复的g或线程 想法:还是sysmon,监控哪个g长期运行了,设置g的标志位,让其主动让出(协作使),如果没有主动让出,发送信号,让线程信号处理器,让其让出(抢占式)
- os: 线程调度
- runtime: 协程调度
2 · 涉及的结构体、函数
2.1 · GMP 模型#
- g: goroutine, user task; 通过 sp, pc, bp 等寄存器切换
- p: processor,代表调度上下文,数量等于CPU核心数
- m: machine,OS线程,由syscall.clone创建
2.1.1 · 核心思想
- 复用m、限制同时运行的m为cpu核心数
- m私有可运行的g队列,并且可以从其他m窃取
- 没有g的m,短暂自旋
- 当线程阻塞时,可以将g与其绑定的上下文p一起转移到其他m,保持在调度中
- go程序启动后,每个cpu逻辑核心分配一个p,每个p分配一个m,m由os scheduler调度
- 同步: g与m绑定,p空出来,新建m,执行p上的本地g队列
- 异步: g由netpoller处理,m和p处理p的本地g队列;完成后g回到p的本地g队列
限制:先进先出队列;并不保证绝对公平与延迟
2.2 · 相关结构体
2.2.1 · g - Goroutine#
type stack struct {
lo uintptr
hi uintptr
}
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr
}
type g struct {
stack stack
stackguard0 uintptr
stackguard1 uintptr
_panic *_panic
_defer *_defer
m *m
sched gobuf
syscallsp uintptr
syscallpc uintptr
stktopsp uintptr
param unsafe.Pointer
atomicstatus uint32
stackLock uint32
goid int64
schedlink guintptr
waitsince int64
waitreason waitReason
preempt bool
preemptStop bool
preemptShrink bool
asyncSafePoint bool
paniconfault bool
gcscandone bool
throwsplit bool
activeStackChans bool
parkingOnChan uint8
raceignore int8
sysblocktraced bool
tracking bool
trackingSeq uint8
runnableStamp int64
runnableTime int64
sysexitticks int64
traceseq uint64
tracelastp puintptr
lockedm muintptr
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
gopc uintptr
ancestors *[]ancestorInfo
startpc uintptr
racectx uintptr
waiting *sudog
cgoCtxt []uintptr
labels unsafe.Pointer
timer *timer
selectDone uint32
gcAssistBytes int64
}
2.2.2 · sudog - 等待列表中的 G#
代表在等待列表里的 g,比如向 channel 发送/接收内容时。需要 sudog 的理由是 g 和同步对象之间的关系是多对多的:一个 g 可能在多个等待队列中,多个 g 也可以等待在同一个同步对象上。
sudog 从特殊的pool中分配,使用 acquireSudog 和 releaseSudog 来分配和释放。
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
parent *sudog
waitlink *sudog
waittail *sudog
c *hchan
}
2.2.3 · m - OS 线程#
type m struct {
g0 *g
morebuf gobuf
divmod uint32
_ uint32
procid uint64
gsignal *g
goSigStack gsignalStack
sigmask sigset
tls [6]uintptr
mstartfn func()
curg *g
cgo_in_range bool
cgoCallSema uint32
cgoTraceback func(x int32) string
cgoContext uintptr
lockedg guintptr
lockedExt uint32
lockedInt uint32
lockedRandom uint64
sleep wakeup
resumeTime int64
readyWithinTrace bool
incgo bool
isextra bool
isExtraInC bool
printlock int8
ptrnames bool
preemptoff string
locks int32
dying int32
profilehz int32
spinning bool
blocked bool
newSigstack bool
printlock_count int32
park note
alllink *m
schedlink muintptr
lockedm muintptr
nextp puintptr
needm bool
sigmask sigset
startSig uint32
sig uint32
waitunlockf func(*g, unsafe.Pointer) bool
waitlock *g
waitunlockf_arg unsafe.Pointer
paniconfault bool
gcstopwait int32
oscarch *osArchstruct
mOS
mTraceback
}
2.2.4 · p - Processor#
type p struct {
id int32
status uint32
link puintptr
schedtick uint32
syscalltick uint32
sysmontick uint32
m muintptr
mcache *mcache
racectx uintptr
deferpool [5][]*_defer
deferunpool [5][]*_defer
gcAssistTime int64
gcFractionalTime int64
gcMarkWorkerMode gcMarkWorkerMode
gcMarkWorkerStartTime int64
preempt bool
pads [128 - unsafe.Sizeof(p{})%128]byte
}
2.2.5 · schedt - 全局调度器#
type schedt struct {
goidgen uint64
lastpoll uint64
poll uint32
pollUntil uint64
startTime int64
now int64
setTimersTick uint32
nmspinning uint32
nmidlelocked uint32
mnext int64
mnext uint64
maxmcount int32
nmsys int32
nmfreed int64
ngsys uint32
pidle puintptr
npidle uint32
nmspinning uint32
runq gQueue
runqsize int32
gFreeStack *g
gFreeG *g
gCreate *g
allglen uintptr
allg []*g
allgM *g
allgs []*g
gcwaiting uint32
stopwait int32
stopnote note
sysmonwait uint32
sysmonnote note
safePointFn func()
safePointWait int32
safePointNote note
profilehz int32
totalRuntimeLock uint64
totalRuntimeUnlock uint64
procresizetime int64
totalBaseTime uint64
idleTime uint64
totalTime uint64
}
2.3 · 调度时机
scheduler调度可能发生在:
go func()- GC
- syscall
- 内存同步访问(g阻塞)
2.4 · m 和 p 解绑定#
进行系统调用时,线程会被阻塞,需要将上下文(p)传递给其他线程继续调度。
3 · Schedule Loop#
schedule() -> execute() -> gogo() -> goroutine 任务 -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()
3.1 · schedule() 函数详解#
func schedule() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false)
}
if _g_.m.incgo {
throw("schedule: in cgo")
}
top:
pp := _g_.m.p.ptr()
pp.preempt = false
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if pp.runSafePointFn != 0 {
runSafePointFn()
}
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
checkTimers(pp, 0)
var gp *g
var inheritTime bool
tryWakeP := false
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
gp, inheritTime = findrunnable()
}
if _g_.m.spinning {
resetspinning()
}
if sched.disable.user && !schedEnabled(gp) {
lock(&sched.lock)
if schedEnabled(gp) {
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
sched.disable.n++
unlock(&sched.lock)
goto top
}
}
if tryWakeP {
wakep()
}
if gp.lockedm != 0 {
startlockedm(gp)
goto top
}
execute(gp, inheritTime)
}
3.2 · execute() - 执行 Goroutine#
将gp调度运行在当前m上,设置相关状态并最终调用gogo()开始执行。
3.3 · gogo() - 恢复 Goroutine 执行#
从gobuf中恢复协程执行状态,并跳转到上一次指令处继续执行。
3.4 · Preempt Based Signal#
抢占调度过程:
- sysmon 线程检测到执行时间过长的 goroutine、GC stw 时,向相应的 M 发送 SIGURG 信号
- M 注册的信号处理函数 sighandler 执行
- 通过 pushCall 插入 asyncPreempt 函数调用
- 执行 asyncPreempt,通过 mcall 切到 g0 栈执行 gopreempt_m
- 将当前 goroutine 插入到全局可运行队列
- 被抢占的 goroutine 再次调度时,继续原来的执行流
4 · 特殊函数
4.1 · systemstack#
// 1. 已经在 g0 上或 gsignal 上,不切换栈,直接执行函数后返回
// 2. 在普通 g(curg)上:切到 g0 → 执行 fn → 切回来
// - 为什么要切栈?普通 g 栈很小(默认2KB),可能触发扩容/拷贝,不适合做运行时内部操作
func systemstack(fn func())