Go语言进阶——mutex的实现机制


mutex的实现机制

互斥锁是并发程序中对共享资源进行访问控制的主要手段,对此Go语言提供了非常简单易用的Mutex,Mutex为一结构体类型,对外暴露两个方法Lock()和Unlock()分别用于加锁和解锁。

Mutex使用起来非常方便,但其内部实现却复杂得多,这包括Mutex的几种状态。另外,我们也想探究一下Mutex重复解锁引起panic的原因。

按照惯例,本节内容从源码入手,提取出实现原理,又不会过分纠结于实现细节。

Mutex数据结构

源码包src/sync/mutex.go:Mutex定义了互斥锁的数据结构:

type Mutex struct {
    state int32
    sema  uint32
}
  • Mutex.state表示互斥锁的状态,比如是否被锁定等。
  • Mutex.sema表示信号量,协程阻塞等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程。

我们看到Mutex.state是32位的整型变量,内部实现时把该变量分成四份,用于记录Mutex的四种状态。

下图展示Mutex的内存布局:

null

  • Locked: 表示该Mutex是否已被锁定,0:没有锁定 1:已被锁定。
  • Woken: 表示是否有协程已被唤醒,0:没有协程唤醒 1:已有协程唤醒,正在加锁过程中。
  • Starving:表示该Mutex是否处于饥饿状态,0:没有饥饿 1:饥饿状态,说明有协程阻塞了超过1ms。
  • Waiter: 表示阻塞等待锁的协程个数,协程解锁时根据此值来判断是否需要释放信号量。

协程之间抢锁实际上是抢给Locked赋值的权利,能给Locked域置1,就说明抢锁成功。抢不到的话就阻塞等待Mutex.sema信号量,一旦持有锁的协程解锁,等待的协程会依次被唤醒。

Woken和Starving主要用于控制协程间的抢锁过程,后面再进行了解。

加解锁过程

简单加锁

假定当前只有一个协程在加锁,没有其他协程干扰,那么过程如下图所示:

null

加锁过程会去判断Locked标志位是否为0,如果是0则把Locked位置1,代表加锁成功。从上图可见,加锁成功后,只是Locked位置1,其他状态位没发生变化。

加锁被阻塞

假定加锁时,锁已被其他协程占用了,此时加锁过程如下图所示:

null

从上图可看到,当协程B对一个已被占用的锁再次加锁时,Waiter计数器增加了1,此时协程B将被阻塞,直到Locked值变为0后才会被唤醒。

简单解锁

假定解锁时,没有其他协程阻塞,此时解锁过程如下图所示:

null

由于没有其他协程阻塞等待加锁,所以此时解锁时只需要把Locked位置为0即可,不需要释放信号量。

解锁并唤醒协程

假定解锁时,有1个或多个协程阻塞,此时解锁过程如下图所示:

null

协程A解锁过程分为两个步骤,一是把Locked位置0,二是查看到Waiter>0,所以释放一个信号量,唤醒一个阻塞的协程,被唤醒的协程B把Locked位置1,于是协程B获得锁。

自旋过程

加锁时,如果当前Locked位为1,说明该锁当前由其他协程持有,尝试加锁的协程并不是马上转入阻塞,而是会持续的探测Locked位是否变为0,这个过程即为自旋过程。

自旋时间很短,但如果在自旋过程中发现锁已被释放,那么协程可以立即获取锁。此时即便有协程被唤醒也无法获取锁,只能再次阻塞。

自旋的好处是,当加锁失败时不必立即转入阻塞,有一定机会获取到锁,这样可以避免协程的切换。

什么是自旋

自旋对应于CPU的”PAUSE”指令,CPU对该指令什么都不做,相当于CPU空转,对程序而言相当于sleep了一小段时间,时间非常短,当前实现是30个时钟周期。

自旋过程中会持续探测Locked是否变为0,连续两次探测间隔就是执行这些PAUSE指令,它不同于sleep,不需要将协程转为睡眠状态。

自旋条件

加锁时程序会自动判断是否可以自旋,无限制的自旋将会给CPU带来巨大压力,所以判断是否可以自旋就很重要了。

自旋必须满足以下所有条件:

  • 自旋次数要足够小,通常为4,即自旋最多4次
  • CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁
  • 协程调度机制中的Process数量要大于1,比如使用GOMAXPROCS()将处理器设置为1就不能启用自旋
  • 协程调度机制中的可运行队列必须为空,否则会延迟协程调度

可见,自旋的条件是很苛刻的,总而言之就是不忙的时候才会启用自旋。

自旋的优势

自旋的优势是更充分的利用CPU,尽量避免协程切换。因为当前申请加锁的协程拥有CPU,如果经过短时间的自旋可以获得锁,当前协程可以继续运行,不必进入阻塞状态。

自旋的问题

如果自旋过程中获得锁,那么之前被阻塞的协程将无法获得锁,如果加锁的协程特别多,每次都通过自旋获得锁,那么之前被阻塞的进程将很难获得锁,从而进入饥饿状态。

为了避免协程长时间无法获取锁,自1.8版本以来增加了一个状态,即Mutex的Starving状态。这个状态下不会自旋,一旦有协程释放锁,那么一定会唤醒一个协程并成功加锁。

Mutex模式

前面分析加锁和解锁过程中只关注了Waiter和Locked位的变化,现在我们看一下Starving位的作用。

每个Mutex都有两个模式,称为Normal和Starving。下面分别说明这两个模式。

normal模式

默认情况下,Mutex的模式为normal。

该模式下,协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁。

starvation模式

自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为”饥饿”模式,然后再阻塞。

处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,同时也会把等待计数减1。

Woken状态

Woken状态用于加锁和解锁过程的通信,举个例子,同一时刻,两个协程一个在加锁,一个在解锁,在加锁的协程可能在自旋过程中,此时把Woken标记为1,用于通知解锁协程不必释放信号量了,好比在说:你只管解锁好了,不必释放信号量,我马上就拿到锁了。

源码实现(源码包src/sync/mutex.go

加锁实现(重点

  1. 首先如果当前锁处于初始化状态就直接用 CAS 方法尝试获取锁,这是**_ Fast Path_**

  2. 如果失败就进入**_ Slow Path_**

    1. 会首先判断当前能不能进入自旋状态,如果可以就进入自旋,最多自旋 4 次

    2. 自旋完成之后,就会去计算当前的锁的状态

    3. 然后尝试通过 CAS 获取锁

    4. 如果没有获取到就调用 runtime_SemacquireMutex 方法休眠当前 goroutine 并且尝试获取信号量

    5. goroutine 被唤醒之后会先判断当前是否处在饥饿状态,(如果当前 goroutine 超过 1ms 都没有获取到锁就会进饥饿模式) 1. 如果处在饥饿状态就会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出 1. 如果不在,就会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环

      CAS 方法在这里指的是 atomic.CompareAndSwapInt32(addr, old, new) bool 方法,这个方法会先比较传入的地址的值是否是 old,如果是的话就尝试赋新值,如果不是的话就直接返回 false,返回 true 时表示赋值成功 饥饿模式是 Go 1.9 版本之后引入的优化,用于解决公平性的问题[10]

回味一下上面看到的流程图,我们来看看互斥锁是如何加锁的

func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}
  • 当我们调用 Lock 方法的时候,会先尝试走 Fast Path,也就是如果当前互斥锁如果处于未加锁的状态,尝试加锁,只要加锁成功就直接返回
  • 否则的话就进入 slow path
func (m *Mutex) lockSlow() {
	var waitStartTime int64 // 等待时间
	starving := false // 是否处于饥饿状态
	awoke := false // 是否处于唤醒状态
	iter := 0 // 自旋迭代次数
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}

lockSlow 方法中我们可以看到,有一个大的 for 循环,不断的尝试去获取互斥锁,在循环的内部,第一步就是判断能否自旋状态。
进入自旋状态的判断比较苛刻,具体需要满足什么条件呢? runtime_canSpin 源码见下方

  • 当前互斥锁的状态是非饥饿状态,并且已经被锁定了
  • 自旋次数不超过 4 次
  • cpu 个数大于一,必须要是多核 cpu
  • 当前正在执行当中,并且队列空闲的 p 的个数大于等于一
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

如果可以进入自旋状态之后就会调用 runtime_doSpin 方法进入自旋, doSpin 方法会调用 procyield(30) 执行三十次 PAUSE 指令

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

为什么使用 PAUSE 指令呢? PAUSE 指令会告诉 CPU 我当前处于处于自旋状态,这时候 CPU 会针对性的做一些优化,并且在执行这个指令的时候 CPU 会降低自己的功耗,减少能源消耗

if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
	atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
	awoke = true
}

在自旋的过程中会尝试设置 mutexWoken 来通知解锁,从而避免唤醒其他已经休眠的 goroutine 在自旋模式下,当前的 goroutine 就能更快的获取到锁

new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
	new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
	new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
	new |= mutexStarving
}
if awoke {
	// The goroutine has been woken from sleep,
	// so we need to reset the flag in either case.
	if new&mutexWoken == 0 {
		throw("sync: inconsistent mutex state")
	}
	new &^= mutexWoken
}

自旋结束之后就会去计算当前互斥锁的状态,如果当前处在饥饿模式下则不会去请求锁,而是会将当前 goroutine 放到队列的末端

if atomic.CompareAndSwapInt32(&m.state, old, new) {
    if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
    }
    // If we were already waiting before, queue at the front of the queue.
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
    }
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    if old&mutexStarving != 0 {
        // If this goroutine was woken and mutex is in starvation mode,
        // ownership was handed off to us but mutex is in somewhat
        // inconsistent state: mutexLocked is not set and we are still
        // accounted as waiter. Fix that.
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
        }
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        if !starving || old>>mutexWaiterShift == 1 {
            // Exit starvation mode.
            // Critical to do it here and consider wait time.
            // Starvation mode is so inefficient, that two goroutines
            // can go lock-step infinitely once they switch mutex
            // to starvation mode.
            delta -= mutexStarving
        }
        atomic.AddInt32(&m.state, delta)
        break
    }
    awoke = true
    iter = 0
}

状态计算完成之后就会尝试使用 CAS 操作获取锁,如果获取成功就会直接退出循环 如果获取失败,则会调用 runtime_SemacquireMutex(&m.sema, queueLifo, 1) 方法保证锁不会同时被两个 goroutine 获取。runtime_SemacquireMutex 方法的主要作用是:

  • 不断调用尝试获取锁
  • 休眠当前 goroutine
  • 等待信号量,唤醒 goroutine

goroutine 被唤醒之后就会去判断当前是否处于饥饿模式,如果当前等待超过 1ms 就会进入饥饿模式

  • 饥饿模式下:会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出
  • 正常模式下:会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环

解锁

和加锁比解锁就很简单了,直接看注释就好

// 解锁一个没有锁定的互斥量会报运行时错误
// 解锁没有绑定关系,可以一个 goroutine 锁定,另外一个 goroutine 解锁
func (m *Mutex) Unlock() {
	// Fast path: 直接尝试设置 state 的值,进行解锁
	new := atomic.AddInt32(&m.state, -mutexLocked)
    // 如果减去了 mutexLocked 的值之后不为零就会进入慢速通道,这说明有可能失败了,或者是还有其他的 goroutine 等着
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
    // 解锁一个没有锁定的互斥量会报运行时错误
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
    // 判断是否处于饥饿模式
	if new&mutexStarving == 0 {
        // 正常模式
		old := new
		for {
			// 如果当前没有等待者.或者 goroutine 已经被唤醒或者是处于锁定状态了,就直接返回
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 唤醒等待者并且移交锁的控制权
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// 饥饿模式,走 handoff 流程,直接将锁交给下一个等待的 goroutine,注意这个时候不会从饥饿模式中退出
		runtime_Semrelease(&m.sema, true, 1)
	}
}

为什么重复解锁要panic

可能你会想,为什么Go不能实现得更健壮些,多次执行Unlock()也不要panic?

仔细想想Unlock的逻辑就可以理解,这实际上很难做到。Unlock过程分为将Locked置为0,然后判断Waiter值,如果值>0,则释放信号量。

如果多次Unlock(),那么可能每次都释放一个信号量,这样会唤醒多个协程,多个协程唤醒后会继续在Lock()的逻辑里抢锁,势必会增加Lock()实现的复杂度,也会引起不必要的协程切换。

编程Tips

使用defer避免死锁

加锁后立即使用defer对其解锁,可以有效的避免死锁。

加锁和解锁应该成对出现

加锁和解锁最好出现在同一个层次的代码块中,比如同一个函数。

重复解锁会引起panic,应避免这种操作的可能性。

参考文章:

Go专家编程

mohuishou的博客


文章作者: hypo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hypo !
评论
 上一篇
Go语言进阶——rwmutex的实现机制 Go语言进阶——rwmutex的实现机制
前面我们聊了互斥锁Mutex,所谓读写锁RWMutex,完整的表述应该是读写互斥锁,可以说是Mutex的一个改进版,在某些场景下可以发挥更加灵活的控制能力
2022-10-28
下一篇 
Go语言进阶——map的实现机制浅析 Go语言进阶——map的实现机制浅析
Golang的map使用哈希表作为底层实现,一个哈希表里可以有多个哈希表节点,也即bucket,而每个bucket就保存了map中的一个或一组键值对。
2022-10-26
  目录