go调度

[TOC]

调度器由来

CPU成本

操作系统经过不断改善,在多进程/多线程的操作系统中,如果进程/线程的任务阻塞,CPU会切换到其他其他进程/线程中去执行。

并且调度算法可以保证在运行的进程都可以被分配到CPU的运行时间片,从宏观来看,似乎多个进程实在同时被运行。但是进程的创建、切换、销毁都会占用很长的时间(意味着占用的CPU资源多),所以CPU资源虽然利用起来了,但是如果进程过多,CPU有很大部分都被用来进行调度了。

在Linux系统中,CPU对进程线程的态度是一样的。

内存成本

在32位的操作系统中创建进程大概消耗4GB虚拟内存,创建线程大概需要4MB。

在高并发场景下,每个任务如果都创建一个线程来处理,那么内存成本也是相当昂贵。

用户态线程

对于操作系统的线程使用起来有昂贵的CPU成本内存成本,因此产生了用户态线程

一个用户态线程必须要绑定一个内核态线程,但是CPU并不知道用户态线程的存在,它只知道它运行的是内核态线程(Linux的PCB进程控制块)

用户态线程也叫协程,内核态线程也叫线程。

goroutine(go协程),go的用户态线程,创建一个goroutine只需约2KB,远比线程、进程开销小,这样就能在有限的内存空间内支持大量的goroutine。

详见[典藏版] Golang 调度器 GMP 原理与调度全分析 “Golang 调度器的由来”部分

协程与线程的映射关系

N:1关系,N个协程绑定在1个线程

优点:协程在用户态线程即完成切换,不会陷入到内核态,切换上下文开销小。

缺点:某个程序用不了多核加速能力;一旦某个协程阻塞,本进程线程所绑定的其他协程都无法执行了。

M:N关系,M个协程绑定N个线程

避免了“一旦某个协程阻塞,本进程线程所绑定的其他协程都无法执行”的情况产生。

但是在go1.13版本以及之前协程由用户态调度是协作式的,一个协程让出CPU后其所绑定的的线程才会执行下一个协程。这样看来如果在M>N时,有N个协程正在执行并且都阻塞了,那么N个线程就无法处理下一个协程了?

Go调度器模型

GM模型

激烈的锁竞争

该模型通过一个全局队列来维护go协程,多个M操作G都需要访问全局队列,因此需要加锁来进行保护,这样无论是创建、销毁还是调度G的M都需要获取锁,就形成了激烈的锁竞争。

传递造成延迟和额外的开销

当G1中存在创建协程动作时,假设创建了G2,那么G2会被存放在全局队列而不是本地,并且有可能被其他M所持有从而造成延迟和额外的开销。

浪费内存缓存

每个 M持有 mcachestack alloc,然而只有在M运行Go代码时才需要使用的内存(每个mcache可以高达2mb),当M在处于syscall时并不需要。当前在M运行后对M的内存进行了预热,因为现在G调度到同一个 M 的概率并不高就降低了缓存命中率,从而造成内存上的浪费。

GMP模型

GMP模型和GM模型相比,新增了P(process),在P中维护着一个本地队列可以用来存储G且上限为256个。如果本地的G所创建G‘则优先加入到runnext,如果runnext之前绑定了 G ,则将之前的G放到本地队列(Go:并发以及调度器亲和),如果本地队列容量不够了,则会把本地队列中一半(队列前半部分)的 G 和当前创建的 G' 移动到全局队列。

所有的P在程序启动的时候创建并保存在一个数组中,最多有GOMAXPROCS个,GOMAXPROCS可以通过设置环境变量$GOMAXPROCS或由runtimeGOMAXPROCS()函数决定。

对于M来说,要想运行任务就需要获得P,如果本地队列没有G则需要进行窃取,窃取策略见work stealing 机制 部分。由于如M被阻塞导致许多就绪任务分配不了给空闲的M,那么就会去创建新的M

调度策略与机制

调度策略

  • 复用线程:避免频繁地创建、销毁线程。

  • 利用并行:GOMAXPROCS设置P的数量,最多有GOMAXPROCECS个协程分布在多个CPU上同时运行。

  • 抢占:一个goroutine每次调度最多占用CPU10ms,避免其他goroutine饥饿。

  • 全局队列:当M执行work stealing从其他P偷不到G时,它可以去全局队列中获取。

work stealing 机制

当本地队列的G都被执行完了,就会尝试从其他的P中窃取一半数量的G,如果多次窃取都失败,那么就去全局队列中获取min(total/GOMAXPROCS+1, 128)个G。

[Scalable Go Scheduler Design Doc](https://docs.google.com/document/d/1TTj4T2JO42uD5ID9e89oa0sLKhJYD0Y_kqxDv3I3XMw/) Scheduling中写道:
When a new G is created or an existing G becomes runnable, it is pushed onto a list of runnable goroutines of current P. When P finishes executing G, it first tries to pop a G from own list of runnable goroutines; if the list is empty, P chooses a random victim (another P) and tries to steal a half of runnable goroutines from it.

从实现的大致逻辑看:

在本地队列为空时会有1/61的机会先去全局队列中获取G

// https://github.com/golang/go/blob/master/src/runtime/proc.go#L3355
if gp == nil {
		// Check the global runnable queue once in a while to ensure fairness.
		// Otherwise two goroutines can completely occupy the local runqueue
		// by constantly respawning each other.
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
}
runtime schedule

hand off 机制

当本线程因为G进行如系统调用等造成阻塞时,线程会释放其所绑定的P,让P可以转移到其他空闲的线程执行。

G1发生阻塞
唤醒一个空闲的的M或者创建一个M
将P绑定到新的M上

等待阻塞操作(比如阻塞的系统调用 syscall)完成后:

  1. 尝试获取G1原来所绑定的P,恢复G1的执行。

  2. 尝试从睡眠列表中获取其他空闲 P,恢复执行G1的执行。

  3. G1则会被放入全局队列等待下次被调度,M1就会睡眠队列或者被销毁。

调度流程

生产者

当使用 go 关键字,如执行 go fun(){ do something } 时,会向 runtime 提交一个计算任务,do something 就是任务内容。

go 维护了本地队列全局队列两种队列,本地队列是一个容量为256的数组,全局队列则是一个链表。

依据 局部性原理,最新提交的 goroutine 会更优先执行,他会放入 runnext 中,意味着在下次被调度到时马上执行,而原来 runnext 中的 goroutine 则会尝试放入本地队列,如果本地队列满了则将本地队列前面一半的goroutine拿出同原来在 runnext 中的goroutine一起放入全局队列。

// https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L4250
func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})
}
  1. 使用 go 关键字创建一个协程,会向 runtime 提交一个计算任务,通过 runqput 入队

    //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L5953
    // runqput tries to put g on the local runnable queue.
    // If next is false, runqput adds g to the tail of the runnable queue.
    // If next is true, runqput puts g in the _p_.runnext slot.
    // If the run queue is full, runnext puts g on the global queue.
    // Executed only by the owner P.
    func runqput(_p_ *p, gp *g, next bool) {
    	if randomizeScheduler && next && fastrand()%2 == 0 {
    		next = false
    	}
    
    	if next {
    	retryNext:
    		oldnext := _p_.runnext
    		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
    			goto retryNext
    		}
    		if oldnext == 0 {
    			return
    		}
    		// Kick the old runnext out to the regular run queue.
    		gp = oldnext.ptr()
    	}
    
    retry:
    	h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    	t := _p_.runqtail
    	if t-h < uint32(len(_p_.runq)) {
    		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
    		atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
    		return
    	}
    	if runqputslow(_p_, gp, h, t) {
    		return
    	}
    	// the queue is not full, now the put above must succeed
    	goto retry
    }
  2. 新创建的 G 绑定到 runnext

    retryNext:
    		oldnext := _p_.runnext
    		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
    			goto retryNext
    		}
  3. 如果之前 runnext 有绑定 G,则将现试着放入本地队列中

    	if t-h < uint32(len(_p_.runq)) {
    		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
    		atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
    		return
    	}
  4. 如果本地队列满了,则将本地队列前半部分(128个)的 G 和 之前绑定在 runnext 上的 G 一起放到全局队列

    //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L5988
    // Put g and a batch of work from local runnable queue on global queue.
    // Executed only by the owner P.
    func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    	var batch [len(_p_.runq)/2 + 1]*g
    
    	// First, grab a batch from local queue.
    	n := t - h
    	n = n / 2
    	if n != uint32(len(_p_.runq)/2) {
    		throw("runqputslow: queue is not full")
    	}
    	for i := uint32(0); i < n; i++ {
    		batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
    	}
    	if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
    		return false
    	}
    	batch[n] = gp
    
    	if randomizeScheduler {
    		for i := uint32(1); i <= n; i++ {
    			j := fastrandn(i + 1)
    			batch[i], batch[j] = batch[j], batch[i]
    		}
    	}
    
    	// Link the goroutines.
    	for i := uint32(0); i < n; i++ {
    		batch[i].schedlink.set(batch[i+1])
    	}
    	var q gQueue
    	q.head.set(batch[0])
    	q.tail.set(batch[n])
    
    	// Now put the batch on global queue.
    	lock(&sched.lock)
    	globrunqputbatch(&q, int32(n+1))
    	unlock(&sched.lock)
    	return true
    } 

消费者

是一个死循环, schedule -> execute -> gogo -> goexit ... -> schedule

schedule 是核心,负责获取要执行的goroutine。

图中红色部分为 GC 相关逻辑

  1. globalrunqget/61会检查是否是每第 61 次(意味着 schedtick % 61 == 0)获取 groutine,如果是则本次从全局队列中获取一个 g

    //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L3355
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
    if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
      lock(&sched.lock)
      gp = globrunqget(_g_.m.p.ptr(), 1) // 指定 max=1
      unlock(&sched.lock)
    }
  2. 不是第 61 的倍数次获取,或者是每第 61 次获取但是全局队列为空,则执行 runqget 从本地获取,依次从 runnext、本地队列中获取(队首的 g )

    //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L6065
    // Get g from local runnable queue.
    // If inheritTime is true, gp should inherit the remaining time in the
    // current time slice. Otherwise, it should start a new time slice.
    // Executed only by the owner P.
    func runqget(_p_ *p) (gp *g, inheritTime bool) {
    	// If there's a runnext, it's the next G to run.
    	for {
    		next := _p_.runnext
    		if next == 0 {
    			break
    		}
    		if _p_.runnext.cas(next, 0) {
    			return next.ptr(), true
    		}
    	}
    
    	for {
    		h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
    		t := _p_.runqtail
    		if t == h {
    			return nil, false
    		}
    		gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
    		if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
    			return gp, false
    		}
    	}
    }
  3. 如果没有获取到就进入到 findrunnable,分为 top (图中绿色)和 stop(图中蓝色) 两部分

    代码:https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L2705

    • top阶段:

      1. 再试一次 runqget 从本地获取

        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L2734
        // local runq
        if gp, inheritTime := runqget(_p_); gp != nil {
          return gp, inheritTime
        }
      2. 仍然没有获取到就继续执行 globalrunqget 从全局队列获取

        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L2741
        // global runq
        if sched.runqsize != 0 {
          lock(&sched.lock)
          gp := globrunqget(_p_, 0)
          unlock(&sched.lock)
          if gp != nil {
            return gp, false
          }
        }
      3. 如果没有从全局队列中获取到 goroutine,就通过 netpoll 检查之前由于网络读写而阻塞的 goroutine 是否 runnable, 先尝试先放入当前从 TLS(Thread-local Storage)或者专用寄存器中获取到的 g 绑定的 p(pp := getg().m.p.ptr()) ,如果没有获取到就将 runnable 的 goroutine 全部推送到全局队列中;如果获取到了从 runnable queue 中取出当前闲置 p (npidle)数个(如果足够的话)放入全局队列,并激活 p (npidle)数个 m,然后将 runnable queue 中剩余的 g 依次(每次取出队首的 g)放入 pp(当前从专用寄存器中获取到 p),直到 pp 的本地队列满了,如果runnable queue 中还有剩余的 g 则全部放入全局队列。

        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L2755
        // Poll network.
        // This netpoll is only an optimization before we resort to stealing.
        // We can safely skip it if there are no waiters or a thread is blocked
        // in netpoll already. If there is any kind of logical race with that
        // blocked thread (e.g. it has already returned from netpoll, but does
        // not set lastpoll yet), this thread will do blocking netpoll below
        // anyway.
        if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
          if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
              traceGoUnpark(gp, 0)
            }
            return gp, false
          }
        }
      4. 如果网络读写而阻塞的 goroutine 没有 runnable 的,那么就其他 P 中偷取其队列前面一半的 goroutine 执行(runqsteal -> runqgrab 流程也称为 work stealing)

        从细节看:会重试 4 次(const stealTries = 4),并且最后一次如果从其他 p 的本地队列中窃取不到 g,就尝试窃取其他 p 的 runnext g。

        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L3022
        stealTimersOrRunNextG := i == stealTries-1
        
        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L6140
        if n == 0 {
           if stealRunNextG {
              // Try to steal from _p_.runnext.
        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L2779
        gp, inheritTime, tnow, w, newWork := stealWork(now)
        now = tnow
        if gp != nil {
          // Successfully stole.
          return gp, inheritTime
        }
        
        
        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L3015
        // stealWork attempts to steal a runnable goroutine or timer from any P.
        //
        // If newWork is true, new work may have been readied.
        //
        // If now is not 0 it is the current time. stealWork returns the passed time or
        // the current time if now was passed as 0.
        func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
        	pp := getg().m.p.ptr()
        
        	ranTimer := false
        
        	const stealTries = 4
        	for i := 0; i < stealTries; i++ {
        		stealTimersOrRunNextG := i == stealTries-1
        
        		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
        			if sched.gcwaiting != 0 {
        				// GC work may be available.
        				return nil, false, now, pollUntil, true
        			}
        			p2 := allp[enum.position()]
        			if pp == p2 {
        				continue
        			}
        
        			// Steal timers from p2. This call to checkTimers is the only place
        			// where we might hold a lock on a different P's timers. We do this
        			// once on the last pass before checking runnext because stealing
        			// from the other P's runnext should be the last resort, so if there
        			// are timers to steal do that first.
        			//
        			// We only check timers on one of the stealing iterations because
        			// the time stored in now doesn't change in this loop and checking
        			// the timers for each P more than once with the same value of now
        			// is probably a waste of time.
        			//
        			// timerpMask tells us whether the P may have timers at all. If it
        			// can't, no need to check at all.
        			if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
        				tnow, w, ran := checkTimers(p2, now)
        				now = tnow
        				if w != 0 && (pollUntil == 0 || w < pollUntil) {
        					pollUntil = w
        				}
        				if ran {
        					// Running the timers may have
        					// made an arbitrary number of G's
        					// ready and added them to this P's
        					// local run queue. That invalidates
        					// the assumption of runqsteal
        					// that it always has room to add
        					// stolen G's. So check now if there
        					// is a local G to run.
        					if gp, inheritTime := runqget(pp); gp != nil {
        						return gp, inheritTime, now, pollUntil, ranTimer
        					}
        					ranTimer = true
        				}
        			}
        
        			// Don't bother to attempt to steal if p2 is idle.
        			if !idlepMask.read(enum.position()) {
        				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
        					return gp, false, now, pollUntil, ranTimer
        				}
        			}
        		}
        	}
        
        	// No goroutines found to steal. Regardless, running a timer may have
        	// made some goroutine ready that we missed. Indicate the next timer to
        	// wait for.
        	return nil, false, now, pollUntil, ranTimer
        }
    • stop阶段(休眠阶段),执行 top 阶段后未获得可执行的 g 时:

      1. 在休眠前仍尝试 globalrunqget 从全局队列中获取 goroutine (没有获取到将当前 p 绑定的 m 解绑,并把当前的 p 闲置队列中)

      2. 没有获取到就 check again all runq,自旋检查所有的队列有无 goroutine

        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L2891
        // Check all runqueues once again.
        _p_ = checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
        if _p_ != nil {
          acquirep(_p_)
          _g_.m.spinning = true
          atomic.Xadd(&sched.nmspinning, 1)
          goto top
        }
        
        //https://github.com/golang/go/blob/go1.17.13/src/runtime/proc.go#L3089
        // Check all Ps for a runnable G to steal.
        //
        // On entry we have no P. If a G is available to steal and a P is available,
        // the P is returned which the caller should acquire and attempt to steal the
        // work to.
        func checkRunqsNoP(allpSnapshot []*p, idlepMaskSnapshot pMask) *p {
        	for id, p2 := range allpSnapshot {
        		if !idlepMaskSnapshot.read(uint32(id)) && !runqempty(p2) {
        			lock(&sched.lock)
        			pp := pidleget()
        			unlock(&sched.lock)
        			if pp != nil {
        				return pp
        			}
        
        			// Can't get a P, don't bother checking remaining Ps.
        			break
        		}
        	}
        
        	return nil
        }
      3. 没有获取到,执行 netpoll 流程,检查网络读写阻塞的 goroutine 有无可执行的

      4. 仍然没有获取到,就执行 stopm 将这个线程休眠

为什么会在休眠阶段要自旋

runtime.getg() 的实现

getg() 函数目的是从 TLS(Thread-local Storage)或者专用寄存器中获取 g

//https://github.com/golang/go/blob/go1.17.13/src/runtime/stubs.go#L22
// getg returns the pointer to the current g.
// The compiler rewrites calls to this function into instructions
// that fetch the g directly (from TLS or from the dedicated register).
func getg() *g

它的具体实现又在哪?Search for OpGetG in $GOROOT/src/cmd/compile/internal

https://groups.google.com/g/golang-nuts/c/KgPOzaMylHo/m/zX0GosvnAQAJ?pli=1

以 amd64 为例:

搜索到对应代码为:

//https://github.com/golang/go/blob/go1.17.10/src/cmd/compile/internal/amd64/ssa.go#L999	
case ssa.OpAMD64LoweredGetG:
		if buildcfg.Experiment.RegabiG && s.ABI == obj.ABIInternal {
			v.Fatalf("LoweredGetG should not appear in ABIInternal")
		}
		r := v.Reg()
		getgFromTLS(s, r)
//https://github.com/golang/go/blob/go1.17.10/src/cmd/compile/internal/amd64/ssa.go#L171
func getgFromTLS(s *ssagen.State, r int16) {
	// See the comments in cmd/internal/obj/x86/obj6.go
	// near CanUse1InsnTLS for a detailed explanation of these instructions.
	if x86.CanUse1InsnTLS(base.Ctxt) {
		// MOVQ (TLS), r
		p := s.Prog(x86.AMOVQ)
		p.From.Type = obj.TYPE_MEM
		p.From.Reg = x86.REG_TLS
		p.To.Type = obj.TYPE_REG
		p.To.Reg = r
	} else {
		// MOVQ TLS, r
		// MOVQ (r)(TLS*1), r
		p := s.Prog(x86.AMOVQ)
		p.From.Type = obj.TYPE_REG
		p.From.Reg = x86.REG_TLS
		p.To.Type = obj.TYPE_REG
		p.To.Reg = r
		q := s.Prog(x86.AMOVQ)
		q.From.Type = obj.TYPE_MEM
		q.From.Reg = r
		q.From.Index = x86.REG_TLS
		q.From.Scale = 1
		q.To.Type = obj.TYPE_REG
		q.To.Reg = r
	}
}

另外:getg 返回指向当前g的指针。编译器将对此函数的调用重写为直接获取g的指令(来自TLS或来自专用寄存器)。 要获取当前用户堆栈的g,可以使用getg().m.curggetg()返回当前g,但是当在系统或信号堆栈上执行时,这将分别返回当前m的 g0 或 gsignal 。 要确定g是在用户堆栈还是系统堆栈上运行,可以使用getg() == getg().m.curg,相等表示在用户态堆栈,不相等表示在系统堆栈。

处理阻塞

可被 runtime 拦截到的阻塞

怎么找到可以被 runtime 接管的阻塞代码,看 runtime.gopark 的调用方。

操作没有可用 buffer 的 channel

往没有 buffer 的 channel 写:

var ch = make(chan struct{})
ch <- struct{}{}

会通过生成 sudog 放入 channel sendq 中,挂起

从没有 buffer 的 channel 读:

var ch = make(chan int)
<- ch

会通过生成 sudog 放入 channel recvq 中,挂起

网络读写

从网络中读,但没有可读取的数据

var c net.Conn
var buf = make([]byte, 1024)

// 没有可读取的数据,阻塞
var n, err = c.Read(buf)

往网络中写,但没有 buffer

var c net.Conn
var buf = []byte("hello")

// 没有可用 buffer,写操作被阻塞
var n, err = c.Write(buf)

Sleep

// 主动休眠,阻塞
time.Sleep(time.Hour)

同步等待

select 没有满足执行条件的分支,阻塞

var (
  ch1 = make(chan int)
  ch2 = make(chan int)
)

select {
  case <- ch1:
  	println("ch1 ready")
  case <- ch2:
  	println("ch2 ready")
}

没有获得锁,将阻塞

var lock sync.RWMutex

lock.Lock()

runtime 无法拦截的阻塞

执行 c 代码发生的的阻塞

package main

/*
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
void output(char *str) {
	usleep(1000000);
	printf("%s\n", str);
}
/*
import "C"
import "unsafe"

系统调用阻塞

TODO

通过 sysmon 处理 runtime 无法拦截的阻塞

sysmon 即 system monitor,是一个高优先级任务,在专用线程中执行,不需要绑定 P 就可以执行。

会在 sysmon 的 retake 阶段处理:

  1. 如果是系统调用(syscall)卡了很久(10ms),那就把 P 剥离(handoffp)

  2. 1.14 新增信号抢占,如果是用户 g 运行了很久(10ms),那么就发信号抢占

sysmon 会每隔一段时间执行,最开始是 20us,指数增长(20us、40us、80us ... 10ms),直到超过 10ms,重置为 20us

语法实现分析

使用 go 关键字创建协程到底发生了什么?

package main

func main() {
	go func() {}()
}

//go tool objdump go | grep "go.go:4"
//go.go:4               0x1054d14               31c0                    XORL AX, AX
//go.go:4               0x1054d16               488d1d430b0100          LEAQ go.func.*+315(SB), BX
//go.go:4               0x1054d1d               0f1f00                  NOPL 0(AX)
//go.go:4               0x1054d20               e8db0bfeff              CALL runtime.newproc(SB)
//go.go:4               0x1054d40               c3                      RET    

调度分析

特殊的M:M0

M0是启动程序后编号为0的主线程,负责执行初始化和启动第一个G,之后M0就和其他的M一样了。

特殊的G:G0

G0是每次启动一个M都会第一个创建的GG0负责其所运行着的线程上的 Goroutine 进行调度和管理,当出现go func() {}需要创建goroutine的时候会将函数的创建交给G0

协作式调度

在1.13及以前是基于栈增长检测实现的协作式调度模式。没有函数调用的循环是如何无法被调度的:

func main() {
   var total int
   var wg sync.WaitGroup

   for i := 0; i < 20; i++ {
      wg.Add(1)
      go func() {
         for j := 0; j < 1e6; j++ {
            total ++
         }
         wg.Done()
      }()
   }

   wg.Wait()
}

由于没有函数调用,也就没有机会执行栈增长检测的代码,所有的 Goroutine 永远都不会被 block,所以调度器没有抢占这些 goroutine。我们来看一下 tracing 中的状态:*(所有的 goroutines 没有被抢占)

force_preemption_tracing

抢占式调度

go1.14及以后在原来的执行栈增长检测基础上添加注册异步回调函数的方式实现真正意义上的抢占式调度。

抢占信号的发送是由 preemptM 进行的,在以下场景会发生抢占:

  1. Go 后台监控 runtime.sysmon 检测超时(每个goroutine单次调度最多持有10ms CPU分片)发送抢占信号;

  2. Go GC 栈扫描发送抢占信号;

  3. Go GC STW 的时候调用 preemptall 抢占所有 P,让其暂停;

从源码剖析Go语言基于信号抢占式调度

深度探索Go语言:抢占式调度

深度探索Go语言:抢占式调度(2)

对于如下一段代码,在在go1.14之前会阻塞(依赖栈增长的检测),在go1.14及之后运行正常。

原因是空的for{}一直在等待,没有机会执行栈增长检测代码也就意味着没有机会让出,但在GC的时候STW一直在等待它让出,从而陷入了僵局。而对于fmt.Println所在goroutine由于收到GC的通知而让出,就不会在继续打印,需要等待GC完毕之后,goroutine调度才能继续执行。

package main

import "fmt"

// 在go1.14之前会阻塞
// 在go1.14及之后运行正常
func main() {
	go func(n int) {
		for {
			n++
			fmt.Println(n)
		}
	}(0)

	for {
	}
}

发送信号

处理信号

image-20210731141309753

疑问

切换的时候G保存的上下文有哪些?

主要在调度器保存或者恢复上下文的时候用到【参考:详解Go语言调度循环源码实现6.5 调度器】:

type gobuf struct {
    // 栈指针
    sp   uintptr
    // 程序计数器
    pc   uintptr
    // 持有 runtime.gobuf 的 Goroutine
    g    guintptr 
    // 系统调用的返回值
    ret  sys.Uintreg
    ...
}
  1. 栈指针,为了恢复栈内数据

  2. 系统调用的返回值,判断是否正在进行系统函数调用

  3. 持有 runtime.gobuf 的 Goroutine,也就是G

  4. 程序计数器,存放着下一条指令所在单元的地址,方便恢复运行时继续执行

应该尽最大努力将P调度到上一次所绑定的M上?

M会记录上次所调度的P的信息,在活跃状态下会优先判断上次处理的P是否可以被调度。

新创建的 goroutine 调度优先级一定比老的 goroutine 优先级高?

在同一个 P 中是的。优先级是 runnext、local queue、global queue。新创建的 goroutine 会放入到 runnext。

如果 groutine 执行的程序中有阻塞,那该 goroutine 所绑定的线程也会阻塞吗?为什么

在 runtime 中有处理阻塞,将 groutine 挂起,让出线程,待 ready 后再开始。这个线程又会进入调度循环,继续消费其他队列。详见:处理阻塞

还有些是 runtime 中无法拦截的,比如 cgo 执行 c 代码、发生系统阻塞等情况,则通过 sysmon(system monitor) 处理

在处理阻塞时,为啥有的等待是 sudog,有的等待是 g ?

一个 g 可以对应多个 sudog。比如一个 g 中的 select 可以对应多个 case,当发生阻塞时,每个 case 会对应一个 sudog

reference

[典藏版] Golang 调度器 GMP 原理与调度全分析

视频-Golang深入理解GMP

Go:协程,操作系统线程和 CPU 管理

Go:并发以及调度器亲和

Go 调度器的任务窃取(Work-Stealing)

Go:Goroutine 与抢占机制 基于go1.13

调度系统设计精要

详解Go语言调度循环源码实现

Go:g0,特殊的 Goroutine

从源码剖析Go语言基于信号抢占式调度

Scalable Go Scheduler Design Doc

6.5 调度器

深度探索Go语言:抢占式调度

深度探索Go语言:抢占式调度(2)

深度探索Go语言(六):抢占式调度

深度探索Go语言(七):抢占式调度

Go语言的抢占式调度.pdf

runtime: tight loops should be preemptible

深入golang runtime的调度

最后更新于

这有帮助吗?