加入收藏
举报
02-15 09:22
#0
文件名称:
10.Golang 锁 sync包 源码分析 (一、Mutex加锁解锁).md
所在目录:
编程语言 / Golang / 源码分析
文件大小:
16.95 KB
下载地址:
longpi1/Reading-notes
   
免责声明:本网站仅提供指向 GitHub 上的文件的链接,所有文件的版权归原作者所有,本网站不对文件内容的合法性、准确性或安全性承担任何责任。
文本预览:
# Golang 锁 sync包 源码分析 (一、Mutex加锁解锁)
> 注意当前go版本代码为1.23
>
> sync包 源码分析包主要介绍 Go 语言中常见的同步原语 [`sync.Mutex`](https://draveness.me/golang/tree/sync.Mutex)、[`sync.RWMutex`](https://draveness.me/golang/tree/sync.RWMutex)、[`sync.WaitGroup`](https://draveness.me/golang/tree/sync.WaitGroup)、[`sync.Once`](https://draveness.me/golang/tree/sync.Once) 和 [`sync.Cond`](https://draveness.me/golang/tree/sync.Cond) 以及扩展原语 [`golang/sync/errgroup.Group`](https://draveness.me/golang/tree/golang/sync/errgroup.Group)、[`golang/sync/semaphore.Weighted`](https://draveness.me/golang/tree/golang/sync/semaphore.Weighted) 和 [`golang/sync/singleflight.Group`](https://draveness.me/golang/tree/golang/sync/singleflight.Group) 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。
## 定义
```go
type Mutex struct {
state int32 // mutex 的状态
sema uint32 // 用于阻塞/唤醒 goroutine 的信号量
}
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota // mutex 已锁定
mutexWoken // 有 waiter 被唤醒
mutexStarving // mutex 处于饥饿模式
mutexWaiterShift = iota // waiter 数量的偏移量
starvationThresholdNs = 1e6 // 切换到饥饿模式的阈值,单位纳秒
)
```
在默认情况下,互斥锁的所有状态位都是 0,`int32` 中的不同位分别表示了不同的状态:
- `mutexLocked` — 表示互斥锁的锁定状态;
- `mutexWoken` — 表示从正常模式被从唤醒;
- `mutexStarving` — 当前的互斥锁进入饥饿状态;
### 正常模式和饥饿模式
[`sync.Mutex`](https://draveness.me/golang/tree/sync.Mutex) 有两种模式 — 正常模式和饥饿模式。
在正常模式下,waiter 按照 FIFO 顺序排队,但是被唤醒的 waiter并不拥有 mutex,而是与新到达的 goroutine 竞争 mutex 的所有权。新到达的 goroutine 具有优势 -- 它们已经在 CPU 上运行,并且可能有很多,因此被唤醒的 waiter 很有可能竞争失败。在这种情况下,它会被重新排到等待队列的前面。为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
// 在饥饿模式下,mutex 的所有权直接从解锁的 goroutine 移交给队列最前面的 waiter。
// 新到达的 goroutine 不会尝试获取 mutex,即使它看起来是解锁的,
// 并且不会尝试自旋。相反,它们会将自己排到等待队列的末尾。
//
// 如果一个 waiter 获得了 mutex 的所有权,并且看到
// (1) 它是队列中的最后一个 waiter,或者 (2) 它等待的时间少于 1ms,
// it switches mutex back to normal operation mode.
// mutex 将切换回正常操作模式。
// 正常模式具有更好的性能,因为一个 goroutine 可以连续多次获取mutex,即使有阻塞的 waiter。
// 饥饿模式对于防止尾部延迟的病态情况非常重要。
## 加锁和解锁
[`sync.Mutex.Lock`](https://draveness.me/golang/tree/sync.Mutex.Lock) 和 [`sync.Mutex.Unlock`](https://draveness.me/golang/tree/sync.Mutex.Unlock) 方法。
### 加锁
```go
// Lock 方法用于获取互斥锁。
func (m *Mutex) Lock() {
// 快速路径:尝试直接获取未锁定的互斥锁。
// 使用原子操作 CompareAndSwapInt32 来尝试将 m.state 从 0 修改为 mutexLocked。
// 如果成功,说明当前 goroutine 成功获取了锁,直接返回。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// 如果启用了竞态检测,记录当前 goroutine 获取了锁。
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 慢速路径:如果快速路径失败,调用 lockSlow 方法进行更复杂的锁获取逻辑。
// 将慢速路径单独放在一个函数中,以便快速路径可以被内联优化。
m.lockSlow()
}
// lockSlow 方法用于处理锁获取的慢速路径。
// 1. 判断当前 Goroutine 能否进入自旋;
// 2. 通过自旋等待互斥锁的释放;
// 3. 计算互斥锁的最新状态;
// 4. 更新互斥锁的状态并获取锁;
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 记录当前 goroutine 开始等待的时间
starving := false // 标记当前 goroutine 是否处于饥饿模式
awoke := false // 标记当前 goroutine 是否被唤醒
iter := 0 // 记录自旋次数
old := m.state // old 存储互斥锁的旧状态。
for {
// 如果互斥锁已被锁定且不处于饥饿模式,并且当前 goroutine 可以进行自旋,
// 则尝试通过自旋来获取锁。
// runtime.sync_runtime_canSpin 返回 true的前提
// 运行在多 CPU 的机器上;
// 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
// 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 尝试设置 mutexWoken 标志位,通知 Unlock 方法不要唤醒其他阻塞的 goroutine。
// 这样做是为了尝试将互斥锁“传递”给当前正在自旋的 goroutine,减少上下文切换。
// 只有在当前 goroutine 未被唤醒,且 mutexWoken 位未被设置,
// 并且存在等待的 goroutine 时,才尝试设置 mutexWoken。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 执行一次自旋操作,让出 CPU 时间片。
runtime_doSpin()
// 增加自旋次数。
iter++
// 更新互斥锁的状态。
continue // 继续循环
}
new := old // 创建一个新的状态变量,用于后续的 CAS 操作
// 如果互斥锁不处于饥饿模式,则尝试获取锁。
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果互斥锁已被锁定或处于饥饿模式,则增加等待者计数。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前 goroutine 处于饥饿模式,并且互斥锁已被锁定,
// 则将互斥锁切换到饥饿模式。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果当前 goroutine 已被唤醒,则需要重置 mutexWoken 标志。
if awoke {
// 如果新的状态中没有 mutexWoken 标志,则抛出异常。
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 重置 mutexWoken 标志
}
// 尝试通过 CAS 操作将互斥锁的状态从 old 修改为 new。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果互斥锁未被锁定且不处于饥饿模式,则当前 goroutine 成功获取了锁,退出循环。
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 如果当前 goroutine 之前已经在等待,则将其放在等待队列的前面。
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime() // 记录开始等待的时间
}
// 阻塞当前 goroutine,等待信号量唤醒。
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 如果当前 goroutine 等待的时间超过了饥饿阈值,则将其标记为饥饿模式。
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state // 重新获取当前互斥锁的状态
// 如果互斥锁处于饥饿模式,则处理饥饿模式下的逻辑。
if old&mutexStarving != 0 {
// 如果当前 goroutine 被唤醒且互斥锁处于饥饿模式,
// 但互斥锁的状态不一致(例如,mutexLocked 未设置或等待者计数为 0),则抛出异常。
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 计算状态增量,用于退出饥饿模式。
delta := int32(mutexLocked - 1< if !starving || old>>mutexWaiterShift == 1 {
// 如果当前 goroutine 不再饥饿,或者等待者计数为 1,则退出饥饿模式。
delta -= mutexStarving
}
// 更新互斥锁的状态,退出饥饿模式。
atomic.AddInt32(&m.state, delta)
break
}
// 当前 goroutine 已被唤醒,重置 awoke 和 iter 标志。
awoke = true
iter = 0
} else {
// 如果 CAS 操作失败,则重新获取当前互斥锁的状态,继续循环。
old = m.state
}
}
// 如果启用了竞态检测,记录当前 goroutine 获取了锁。
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
```
### 解锁
```go
// Unlock 解锁 Mutex。
// 如果 m 没有被锁定,则会抛出运行时错误。
func (m *Mutex) Unlock() {
// 如果启用了竞态检测器。
if race.Enabled {
_ = m.state // 对 m.state 的访问,用于竞态检测。确保在 Release 之前读取。
race.Release(unsafe.Pointer(m)) // 通知竞态检测器 Mutex 已被释放。
}
// 快速路径:清除锁定位。
// 使用原子操作将状态值减去 mutexLocked (1)。
// 如果新的状态值不为 0,说明还有其他位被设置(例如,有等待者或处于饥饿模式)。
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 外联的慢速路径,允许内联快速路径。
// 为了在追踪 GoUnblock 时隐藏 unlockSlow,我们跳过一个额外的栈帧。
// 这样做可以使追踪信息更简洁,只显示关键的阻塞/唤醒操作。
m.unlockSlow(new)
}
}
// unlockSlow 是 Unlock 的慢速路径。
// 当解锁时,Mutex 的状态不仅仅是锁定状态时,会调用此函数。
func (m *Mutex) unlockSlow(new int32) {
// 如果 (new + mutexLocked) & mutexLocked == 0,意味着在解锁前 Mutex 就没有被锁定。
// 因为在 Unlock 的快速路径中,我们已经减去了 mutexLocked,
// 如果解锁前没有被锁定,那么 new 应该就是 0 或者负数(如果存在等待者或者处于饥饿模式)。
// 加上 mutexLocked 后,如果原先没被锁定,结果与 mutexLocked 进行与运算应该为 0。
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex") // 抛出致命错误,因为尝试解锁一个未锁定的 Mutex。
}
// 如果没有处于饥饿模式。
if new&mutexStarving == 0 {
old := new // 保存当前状态值。
for {
// 如果没有等待者,或者一个 goroutine 已经醒来或已经获取了锁,则不需要唤醒任何人。
// 在饥饿模式下,所有权直接从解锁的 goroutine 移交给下一个等待者。
// 我们不属于这个链条,因为我们在解锁 Mutex 时没有观察到 mutexStarving。
// 所以让开道路。
// old>>mutexWaiterShift == 0 表示没有等待者。
// old&(mutexLocked|mutexWoken|mutexStarving) != 0 表示 Mutex 仍然被锁定,或者有 goroutine 正在被唤醒,或者处于饥饿模式。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return // 直接返回,无需唤醒任何 goroutine。
}
// 争夺唤醒某人的权利。
// 将等待者计数减 1,并设置唤醒位。
new = (old - 1< // 使用原子操作比较并交换 Mutex 的状态。
// 如果当前状态仍然是 old,则将其更新为 new,表示我们成功获得了唤醒的权利。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 释放一个信号量,唤醒一个等待的 goroutine。
// false 表示不是在持有锁的情况下释放信号量。
// 1 表示唤醒一个等待者。
runtime_Semrelease(&m.sema, false, 1)
return // 返回,表示已成功唤醒一个等待者。
}
// 如果 CAS 失败,说明状态已被其他 goroutine 修改,重新读取当前状态并重试。
old = m.state
}
} else {
// 饥饿模式:将 Mutex 的所有权移交给下一个等待者,并让出我们的时间片,
// 以便下一个等待者可以立即开始运行。
// 注意:mutexLocked 没有被设置,等待者会在唤醒后设置它。
// 但是如果设置了 mutexStarving,Mutex 仍然被认为是锁定的,
// 因此新的 goroutine 不会获取它。
// true 表示在持有锁的情况下释放信号量(虽然逻辑上锁已经释放,但在饥饿模式下,
// 我们希望直接将锁交给下一个等待者,所以使用 true 可以确保公平性,
// 避免其他 goroutine 抢占到锁)。
// 1 表示唤醒一个等待者。
runtime_Semrelease(&m.sema, true, 1)
}
}
```
## 总结
本篇文章主要从加锁和解锁两个部分的源码进行分析。
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,会通过置位 `mutexLocked` 加锁;
- 如果互斥锁处于 `mutexLocked` 状态并且在普通模式下工作,会进入自旋,执行 30 次 `PAUSE` 指令消耗 CPU 时间等待锁的释放;
- 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过 [`runtime.sync_runtime_SemacquireMutex`](https://draveness.me/golang/tree/runtime.sync_runtime_SemacquireMutex) 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
- 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,调用 [`sync.Mutex.Unlock`](https://draveness.me/golang/tree/sync.Mutex.Unlock) 会直接抛出异常;
- 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 `mutexLocked` 标志位;
- 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 [`sync.runtime_Semrelease`](https://draveness.me/golang/tree/sync.runtime_Semrelease) 唤醒对应的 Goroutine;
## 参考链接
1.3.[Go 语言设计与实现](https://draveness.me/golang)
点赞 回复
回帖
支持markdown部分语法 ?