>백엔드 개발 >Golang >원자적 연산이란 무엇입니까? 이동 중 원자 작업에 대한 심층 분석

원자적 연산이란 무엇입니까? 이동 중 원자 작업에 대한 심층 분석

青灯夜游
青灯夜游앞으로
2023-03-28 19:04:402002검색

원자적 연산이란 무엇입니까? 이동 중 원자 작업에 대한 심층 분석

sync 패키지를 소개하는 이전 기사에서 우리는 원자 연산이 여러 곳에서 사용된다는 사실도 발견했어야 했습니다. 예를 들어, 이러한 구조의 구현에는 원자적 작업이 포함됩니다. 원자적 연산은 동시 프로그래밍에서 매우 중요한 연산이며 동시성 안전성을 보장할 수 있으며 매우 효율적입니다. 이 기사에서는 Go의 원자적 작업의 원리, 사용 시나리오, 사용법 등을 자세히 살펴보겠습니다. sync.WaitGroupsync.Map 再到 sync.Pool

원자적 연산이란 무엇인가요?

원자적 연산은 가변 수준 뮤텍스 잠금입니다.

원자적 연산이 무엇인지 한 문장으로 설명한다면 다음과 같습니다.

원자적 연산은 가변 수준 뮤텍스 잠금입니다. 간단히 말하면 하나의 CPU만 동시에 변수를 읽거나 쓸 수 있습니다. 공식적으로 제공되는 패키지의 원자적 연산을 사용하는 것 외에도 특정 변수를 동시에 안전하게 수정하고 싶을 때, 변수 읽기 또는 수정이 다른 코루틴의 영향을 받지 않도록 할 수 있습니다. Mutex,还可以使用 sync/atomic

다음 그림을 사용하여 이를 나타낼 수 있습니다.

원자적 연산이란 무엇입니까? 이동 중 원자 작업에 대한 심층 분석

설명: 위 그림에는 3개의 CPU 논리 코어가 있으며 그 중 CPU 1은 변수

에 대해 작업을 수행합니다. CPU 1 작업이 완료된 후 CPU 2와 CPU 3은 v의 최신 값을 얻을 수 있습니다. v 做原子操作,这个时候 CPU 2 和 CPU 3 不能对 vv 的最新值。

从这个角度看,我们可以把 sync/atomic

이러한 관점에서 sync/atomic 패키지의 원자 연산을 변수 수준 뮤텍스 잠금으로 간주할 수 있습니다. 즉, 코루틴이 변수에 대해 원자적 연산을 수행하면 다른 코루틴은 코루틴 연산이 완료될 때까지 변수에 대해 어떤 연산도 수행할 수 없습니다.

원자적 연산의 사용 시나리오는 무엇입니까?

원자적 연산의 사용 시나리오를 설명하기 위해 간단한 예를 들어보세요.

func TestAtomic(t *testing.T) {
	var sum = 0
	var wg sync.WaitGroup
	wg.Add(1000)

	// 启动 1000 个协程,每个协程对 sum 做加法操作
	for i := 0; i < 1000; i++ {
		go func() {
			defer wg.Done()
			sum++
		}()
	}

	// 等待所有的协程都执行完毕
	wg.Wait()
	fmt.Println(sum) // 这里输出多少呢?
}

이 코드를 우리 컴퓨터에서 실행하고 출력 결과가 무엇인지 확인할 수 있습니다. 그게 아니라면 매번 달라야 하고, 1000이 되어서는 안 됩니다. 왜 그럴까요?

sum 做加法的时候,需要先将 sum이것은 CPU가 sum 的值,然后都进行了加法操作,然后都再写回到内存中,那么就会导致 sum 的值被覆盖,从而导致结果不正确。

举个例子,目前内存中的 sum현재 값을 CPU의 레지스터로 읽어온 다음 덧셈 연산을 수행하고 마지막으로 이를 다시 메모리에 쓰기 때문입니다. 두 CPU가 동시에 sum 값을 취하고 둘 다 덧셈 작업을 수행한 다음 다시 메모리에 쓰면

값이 덮어쓰여 잘못된 결과가 발생합니다.

예를 들어, 메모리의 현재

는 1이고 두 CPU는 동시에 추가를 위해 이 1을 취하고 두 CPU 모두 결과 2를 얻습니다. 그런 다음 두 CPU는 각각의 계산 결과를 메모리에 다시 쓰므로 메모리의

는 3이 아닌 2가 됩니다.

이 시나리오에서는 원자 연산을 사용하여 동시적이고 안전한 추가 연산을 구현할 수 있습니다.

func TestAtomic1(t *testing.T) {
	// 将 sum 的类型改成 int32,因为原子操作只能针对 int32、int64、uint32、uint64、uintptr 这几种类型
	var sum int32 = 0
	var wg sync.WaitGroup
	wg.Add(1000)

    // 启动 1000 个协程,每个协程对 sum 做加法操作
	for i := 0; i < 1000; i++ {
		go func() {
			defer wg.Done()
			// 将 sum++ 改成下面这样
			atomic.AddInt32(&sum, 1)
		}()
	}

	wg.Wait()
	fmt.Println(sum) // 输出 1000
}
위 예에서는 실행할 때마다 1000이라는 결과를 얻을 수 있습니다.

원자적 연산을 사용하면 하나의 CPU만 동시에 변수를 읽거나 쓸 수 있기 때문에 위와 같은 문제는 발생하지 않습니다.

그래서 변수의 동시 읽기 및 쓰기가 필요한 많은 곳에서 원자 연산을 사용하여 동시적이고 안전한 작업을 달성할 수 있는지 여부를 고려할 수 있습니다(원자 연산보다 효율적인 뮤텍스 잠금을 사용하는 대신). .

원자적 연산의 사용 시나리오는 뮤텍스 잠금과 유사하지만 잠금 세분성은 단지 변수일 뿐이라는 차이점이 있습니다. 즉, 여러 CPU가 동시에 변수를 읽고 쓰는 것을 허용하지 않는 경우(단 하나의 CPU만 동시에 변수에 대해 작업할 수 있도록 하기 위해) 원자적 연산을 사용할 수 있습니다.

원자적 연산은 어떻게 구현되나요?

🎜🎜위의 원자 연산 소개를 읽고 나면 원자 연산이 놀랍고 이렇게 유용한 것이 있다는 생각이 드시나요? 그렇다면 어떻게 달성됩니까? 🎜🎜일반적으로 원자 연산을 구현하려면 특수 CPU 명령어나 시스템 호출이 필요합니다. 이러한 명령어나 시스템 호출은 실행 중에 다른 작업이나 이벤트에 의해 중단되지 않도록 하여 작업의 원자성을 보장합니다. 🎜

例如,在 x86 架构的 CPU 中,可以使用 LOCK 前缀来实现原子操作。LOCK 前缀可以与其他指令一起使用,用于锁定内存总线,防止其他 CPU 访问同一内存地址,从而实现原子操作。 在使用 LOCK 前缀的指令执行期间,CPU 会将当前处理器缓存中的数据写回到内存中,并锁定该内存地址, 防止其他 CPU 修改该地址的数据(所以原子操作总是可以读取到最新的数据)。 一旦当前 CPU 对该地址的操作完成,CPU 会释放该内存地址的锁定,其他 CPU 才能继续对该地址进行访问。

x86 LOCK 的时候发生了什么

我们再来捋一下上面的内容,看看 LOCK 前缀是如何实现原子操作的:

  1. CPU 会将当前处理器缓存中的数据写回到内存中。(因此我们总能读取到最新的数据)
  2. 然后锁定该内存地址,防止其他 CPU 修改该地址的数据。
  3. 一旦当前 CPU 对该地址的操作完成,CPU 会释放该内存地址的锁定,其他 CPU 才能继续对该地址进行访问。

其他架构的 CPU 可能会略有不同,但是原理是一样的。

原子操作有什么特征?

  1. 不会被中断:原子操作是一个不可分割的操作,要么全部执行,要么全部不执行,不会出现中间状态。这是保证原子性的基本前提。同时,原子操作过程中不会有上下文切换的过程。
  2. 操作对象是共享变量:原子操作通常是对共享变量进行的,也就是说,多个协程可以同时访问这个变量,因此需要采用原子操作来保证数据的一致性和正确性。
  3. 并发安全:原子操作是并发安全的,可以保证多个协程同时进行操作时不会出现数据竞争问题(虽然说是同时,但是实际上在操作那个变量的时候是互斥的)。
  4. 无需加锁:原子操作不需要使用互斥锁来保证数据的一致性和正确性,因此可以避免互斥锁的使用带来的性能损失。
  5. 适用场景比较局限:原子操作适用于操作单个变量,如果需要同时并发读写多个变量,可能需要考虑使用互斥锁。

go 里面有哪些原子操作?

在 go 中,主要有以下几种原子操作:AddCompareAndSwapLoadStoreSwap

增减(Add)

  1. 用于进行增加或减少的原子操作,函数名以 Add 为前缀,后缀针对特定类型的名称。
  2. 原子增被操作的类型只能是数值类型,即 int32int64uint32uint64uintptr
  3. 原子增减函数的第一个参数为原值,第二个参数是要增减多少。
  4. 方法:
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

int32int64 的第二个参数可以是负数,这样就可以做原子减法了。

比较并交换(CompareAndSwap)

也就是我们常见的 CAS,在 CAS 操作中,会需要拿旧的值跟 old 比较,如果相等,就将 new 赋值给 addr。 如果不相等,则不做任何操作。最后返回一个 bool 值,表示是否成功 swap

也就是说,这个操作可能是不成功的。这很正常,在并发环境下,多个协程对同一个变量进行操作,肯定会存在竞争的情况。 在这种情况下,偶尔的失败是正常的,我们只需要在失败的时候,重新尝试即可。 因为原子操作需要的时间往往是比较短的,因此在失败的时候,我们可以通过自旋的方式来再次进行尝试。

在这种情况下,如果不自旋,那就需要将这个协程挂起,等待其他协程完成操作,然后再次尝试。这个过程相比自旋可能会更加耗时。 因为很有可能这次原子操作不成功,下一次就成功了。如果我们每次都将协程挂起,那么效率就会大大降低。

for + 原子操作的方式,在 go 的 sync 包中很多地方都有使用,比如 sync.Mapsync.Pool 等。 这也是使用原子操作时一个非常常见的使用模式。

CompareAndSwap 的功能:

  1. 用于比较并交换的原子操作,函数名以 CompareAndSwap 为前缀,后缀针对特定类型的名称。
  2. 原子比较并交换被操作的类型可以是数值类型或指针类型,即 int32int64uint32uint64uintptrunsafe.Pointer
  3. 原子比较并交换函数的第一个参数为原值指针,第二个参数是要比较的值,第三个参数是要交换的值。
  4. 方法:
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

载入(Load)

原子性的读取操作接受一个对应类型的指针值,返回该指针指向的值。原子性读取意味着读取值的同时,当前计算机的任何 CPU 都不会进行针对值的读写操作。

如果不使用原子 Load,当使用 v := value 这种赋值方式为变量 v 赋值时,读取到的 value 可能不是最新的,因为在读取操作时其他协程对它的读写操作可能会同时发生。

Load 操作有下面这些:

func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

存储(Store)

Store 可以将 val 值保存到 *addr 中,Store 操作是原子性的,因此在执行 Store 操作时,当前计算机的任何 CPU 都不会进行针对 *addr 的读写操作。

  1. 原子性存储会将 val 值保存到 *addr 中。
  2. 与读操作对应的写入操作,sync/atomic 提供了与原子值载入 Load 函数相对应的原子值存储 Store 函数,原子性存储函数均以 Store 为前缀。

Store 操作有下面这些:

func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintpre, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

交换(Swap)

SwapStore 有点类似,但是它会返回 *addr 的旧值。

func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

原子操作任意类型的值 - atomic.Value

从上一节中,我们知道了在 go 中原子操作可以操作 int32int64uint32uint64uintptrunsafe.Pointer 这些类型的值。 但是在实际开发中,我们的类型还有很多,比如 stringstruct 等等,那这些类型的值如何进行原子操作呢?答案是使用 atomic.Value

atomic.Value 是一个结构体,它的内部有一个 any 类型的字段,存储了我们要原子操作的值,也就是一个任意类型的值。

atomic.Value 支持以下操作:

  • Load:原子性的读取 Value 中的值。
  • Store:原子性的存储一个值到 Value 中。
  • Swap:原子性的交换 Value 中的值,返回旧值。
  • CompareAndSwap:原子性的比较并交换 Value 中的值,如果旧值和 old 相等,则将 new 存入 Value 中,返回 true,否则返回 false

atomic.Value 的这些操作跟上面讲到的那些操作其实差不多,只不过 atomic.Value 可以操作任意类型的值。 那 atomic.Value 是如何实现的呢?

atomic.Value 源码分析

atomic.Value 是一个结构体,这个结构体只有一个字段:

// Value 提供一致类型值的原子加载和存储。
type Value struct {
	v any
}

Load - 读取

Load 返回由最近的 Store 设置的值。如果还没有 Store 过任何值,则返回 nil

// Load 返回由最近的 Store 设置的值。
func (v *Value) Load() (val any) {
	// atomic.Value 转换为 efaceWords
	vp := (*efaceWords)(unsafe.Pointer(v))

	// 判断 atomic.Value 的类型
	typ := LoadPointer(&vp.typ)
	// 第一次 Store 还没有完成,直接返回 nil
	if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
		// firstStoreInProgress 是一个特殊的变量,存储到 typ 中用来表示第一次 Store 还没有完成
		return nil
	}

	// 获取 atomic.Value 的值
	data := LoadPointer(&vp.data)
	// 将 val 转换为 efaceWords 类型
	vlp := (*efaceWords)(unsafe.Pointer(&val))
	// 分别赋值给 val 的 typ 和 data
	vlp.typ = typ
	vlp.data = data
	return
}

atomic.Value 的源码中,我们都可以看到 efaceWords 的身影,它实际上代表的是 interface{}/any 类型:

// 表示一个 interface{}/any 类型
type efaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}

看到这里我们会不会觉得很困惑,直接返回 val 不就可以了吗?为什么要将 val 转换为 efaceWords 类型呢?

这是因为 go 中的原子操作只能操作 int32int64uint32uint64uintptrunsafe.Pointer 这些类型的值, 不支持 interface{} 类型,但是如果了解 interface{} 底层结构的话,我们就知道 interface{} 底层其实就是一个结构体, 它有两个字段,一个是 type,一个是 datatype 用来存储 interface{} 的类型,data 用来存储 interface{} 的值。 而且这两个字段都是 unsafe.Pointer 类型的,所以其实我们可以对 interface{}typedata 分别进行原子操作, 这样最终其实也可以达到了原子操作 interface{} 的目的了,是不是非常地巧妙呢?

Store - 存储

StoreValue 的值设置为 val。对给定值的所有存储调用必须使用相同具体类型的值。不一致类型的存储会发生恐慌,Store(nil) 也会 panic

// Store 将 Value 的值设置为 val。
func (v *Value) Store(val any) {
	// 不能存储 nil 值
	if val == nil {
		panic("sync/atomic: store of nil value into Value")
	}
	// atomic.Value 转换为 efaceWords
	vp := (*efaceWords)(unsafe.Pointer(v))
	// val 转换为 efaceWords
	vlp := (*efaceWords)(unsafe.Pointer(&val))
	
	// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
	for {
		// LoadPointer 可以保证获取到的是最新的
		typ := LoadPointer(&vp.typ)
		// 第一次 store 的时候 typ 还是 nil,说明是第一次 store
		if typ == nil {
			// 尝试开始第一次 Store。
			// 禁用抢占,以便其他 goroutines 可以自旋等待完成。
			// (如果允许抢占,那么其他 goroutine 自旋等待的时间可能会比较长,因为可能会需要进行协程调度。)
			runtime_procPin()
			// 抢占失败,意味着有其他 goroutine 成功 store 了,允许抢占,再次尝试 Store
			// 这也是一个原子操作。
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
				runtime_procUnpin()
				continue
			}
			// 完成第一次 store
			// 因为有 firstStoreInProgress 标识的保护,所以下面的两个原子操作是安全的。
			StorePointer(&vp.data, vlp.data) // 存储值(原子操作)
			StorePointer(&vp.typ, vlp.typ)   // 存储类型(原子操作)
			runtime_procUnpin()              // 允许抢占
			return
		}

		// 另外一个 goroutine 正在进行第一次 Store。自旋等待。
		if typ == unsafe.Pointer(&firstStoreInProgress) {
			continue
		}

		// 第一次 Store 已经完成了,下面不是第一次 Store 了。
		// 需要检查当前 Store 的类型跟第一次 Store 的类型是否一致,不一致就 panic。
		if typ != vlp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}

		// 后续的 Store 只需要 Store 值部分就可以了。
		// 因为 atomic.Value 只能保存一种类型的值。
		StorePointer(&vp.data, vlp.data)
		return
	}
}

Store 中,有以下几个注意的点:

  1. 使用 firstStoreInProgress 来确保第一次 Store 的时候,只有一个 goroutine 可以进行 Store 操作,其他的 goroutine需要自旋等待。如果没有这个保护,那么存储 typdata 的时候就会出现竞争(因为需要两个原子操作),导致数据不一致。在这里其实可以将 firstStoreInProgress 看作是一个互斥锁。
  2. 在进行第一次 Store 的时候,会将当前的 goroutine 和 P 绑定,这样拿到 firstStoreInProgress 锁的协程就可以尽快地完成第一次 Store操作,这样一来,其他的协程也不用等待太久。
  3. 在第一次 Store 的时候,会有两个原子操作,分别存储类型和值,但是因为有 firstStoreInProgress 的保护,所以这两个原子操作本质上是对 interface{} 的一个原子存储操作。
  4. 其他协程在看到有 firstStoreInProgress 标识的时候,就会自旋等待,直到第一次 Store 完成。
  5. 在后续的 Store 操作中,只需要存储值就可以了,因为 atomic.Value 只能保存一种类型的值。

Swap - 交换

SwapValue 的值设置为 new 并返回旧值。对给定值的所有交换调用必须使用相同具体类型的值。同时,不一致类型的交换会发生恐慌,Swap(nil) 也会 panic

// Swap 将 Value 的值设置为 new 并返回旧值。
func (v *Value) Swap(new any) (old any) {
	// 不能存储 nil 值
	if new == nil {
		panic("sync/atomic: swap of nil value into Value")
	}

	// atomic.Value 转换为 efaceWords
	vp := (*efaceWords)(unsafe.Pointer(v))
	// new 转换为 efaceWords
	np := (*efaceWords)(unsafe.Pointer(&new))
	
	// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
	for {
		// 下面这部分代码跟 Store 一样,不细说了。
		// 这部分代码是进行第一次存储的代码。
		typ := LoadPointer(&vp.typ)
		if typ == nil {
			runtime_procPin()
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
				runtime_procUnpin()
				continue
			}
			StorePointer(&vp.data, np.data)
			StorePointer(&vp.typ, np.typ)
			runtime_procUnpin()
			return nil
		}
		if typ == unsafe.Pointer(&firstStoreInProgress) {
			continue
		}
		if typ != np.typ {
			panic("sync/atomic: swap of inconsistently typed value into Value")
		}

		// ---- 下面是 Swap 的特有逻辑 ----
		// op 是返回值
		op := (*efaceWords)(unsafe.Pointer(&old))
		// 返回旧的值
		op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)
		return old
	}
}

CompareAndSwap - 比较并交换

CompareAndSwapValue 的值与 old 比较,如果相等则设置为 new 并返回 true,否则返回 false。 对给定值的所有比较和交换调用必须使用相同具体类型的值。同时,不一致类型的比较和交换会发生恐慌,CompareAndSwap(nil, nil) 也会 panic

// CompareAndSwap 比较并交换。
func (v *Value) CompareAndSwap(old, new any) (swapped bool) {
	// 注意:old 是可以为 nil 的,new 不能为 nil。
	// old 是 nil 表示是第一次进行 Store 操作。
	if new == nil {
		panic("sync/atomic: compare and swap of nil value into Value")
	}

	// atomic.Value 转换为 efaceWords
	vp := (*efaceWords)(unsafe.Pointer(v))
	// new 转换为 efaceWords
	np := (*efaceWords)(unsafe.Pointer(&new))
	// old 转换为 efaceWords
	op := (*efaceWords)(unsafe.Pointer(&old))

	// old 和 new 类型必须一致,且不能为 nil
	if op.typ != nil && np.typ != op.typ {
		panic("sync/atomic: compare and swap of inconsistently typed values")
	}

	// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
	for {
		// LoadPointer 可以保证获取到的 typ 是最新的
		typ := LoadPointer(&vp.typ)
		if typ == nil { // atomic.Value 是 nil,还没 Store 过
			// 准备进行第一次 Store,但是传递进来的 old 不是 nil,compare 这一步就失败了。直接返回 false
			if old != nil {
				return false
			}

			// 下面这部分代码跟 Store 一样,不细说了。 
			// 这部分代码是进行第一次存储的代码。
			runtime_procPin()
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
				runtime_procUnpin()
				continue
			}
			StorePointer(&vp.data, np.data)
			StorePointer(&vp.typ, np.typ)
			runtime_procUnpin()
			return true
		}
		if typ == unsafe.Pointer(&firstStoreInProgress) {
			continue
		}
		if typ != np.typ {
			panic("sync/atomic: compare and swap of inconsistently typed value into Value")
		}

		// 通过运行时相等性检查比较旧版本和当前版本。
		// 这允许对值类型进行比较,这是包函数所没有的。
		// 下面的 CompareAndSwapPointer 仅确保 vp.data 自 LoadPointer 以来没有更改。
		data := LoadPointer(&vp.data)
		var i any
		(*efaceWords)(unsafe.Pointer(&i)).typ = typ
		(*efaceWords)(unsafe.Pointer(&i)).data = data
		if i != old { // atomic.Value 跟 old 不相等
			return false
		}
		// 只做 val 部分的 cas 操作
		return CompareAndSwapPointer(&vp.data, data, np.data)
	}
}

这里需要特别说明的只有最后那个比较相等的判断,也就是 data := LoadPointer(&vp.data) 以及往后的几行代码。 在开发 atomic.Value 第一版的时候,那个开发者其实是将这几行写成 CompareAndSwapPointer(&vp.data, old.data, np.data) 这种形式的。 但是在旧的写法中,会存在一个问题,如果我们做 CAS 操作的时候,如果传递的参数 old 是一个结构体的值这种类型,那么这个结构体的值是会被拷贝一份的, 同时再会被转换为 interface{}/any 类型,这个过程中,其实参数的 olddata 部分指针指向的内存跟 vp.data 指向的内存是不一样的。 这样的话,CAS 操作就会失败,这个时候就会返回 false,但是我们本意是要比较它的值,出现这种结果显然不是我们想要的。

将值作为 interface{} 参数使用的时候,会存在一个将值转换为 interface{} 的过程。具体我们可以看看 interface{} 的实现原理。

所以,在上面的实现中,会将旧值的 typdata 赋值给一个 any 类型的变量, 然后使用 i != old 这种方式进行判断,这样就可以实现在比较的时候,比较的是值,而不是由值转换为 interface{} 后的指针。

其他原子类型

我们现在知道了,atomic.Value 可以对任意类型做原子操作。 而对于其他的原子类型,比如 int32int64uint32uint64uintptrunsafe.Pointer 等, 其实在 go 中也提供了包装的类型,让我们可以以对象的方式来操作这些类型。

对应的类型如下:

  • atomic.Bool:这个比较特别,但底层实际上是一个 uint32 类型的值。我们对 atomic.Bool 做原子操作的时候,实际上是对 uint32 做原子操作。
  • atomic.Int32int32 类型的包装类型
  • atomic.Int64int64 类型的包装类型
  • atomic.Uint32uint32 类型的包装类型
  • atomic.Uint64uint64 类型的包装类型
  • atomic.Uintptruintptr 类型的包装类型
  • atomic.Pointerunsafe.Pointer 类型的包装类型

这几种类型的实现的代码基本一样,除了类型不一样,我们可以看看 atomic.Int32 的实现:

// An Int32 is an atomic int32. The zero value is zero.
type Int32 struct {
	_ noCopy
	v int32
}

// Load atomically loads and returns the value stored in x.
func (x *Int32) Load() int32 { return LoadInt32(&x.v) }

// Store atomically stores val into x.
func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) }

// Swap atomically stores new into x and returns the previous value.
func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) }

// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) {
	return CompareAndSwapInt32(&x.v, old, new)
}

可以看到,atomic.Int32 的实现都是基于 atomic 包中 int32 类型相关的原子操作函数来实现的。

原子操作与互斥锁比较

那我们有了互斥锁,为什么还要有原子操作呢?我们进行比较一下就知道了:


原子操作 互斥锁
保护的范围 变量 代码块
保护的粒度
性能
如何实现的 硬件指令 软件层面实现,逻辑较多

如果我们只需要对某一个变量做并发读写,那么使用原子操作就可以了,因为原子操作的性能比互斥锁高很多。 但是如果我们需要对多个变量做并发读写,那么就需要用到互斥锁了,这种场景往往是在一段代码中对不同变量做读写。

性能比较

我们前面这个表格提到了原子操作与互斥锁性能上有差异,我们写几行代码来进行比较一下:

// 系统信息 cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
// 10.13 ns/op
func BenchmarkMutex(b *testing.B) {
   var mu sync.Mutex

   for i := 0; i < b.N; i++ {
      mu.Lock()
      mu.Unlock()
   }
}

// 5.849 ns/op
func BenchmarkAtomic(b *testing.B) {
   var sum atomic.Uint64

   for i := 0; i < b.N; i++ {
      sum.Add(uint64(1))
   }
}

在对 Mutex 的性能测试中,我只是写了简单的 Lock()UnLock() 操作,因为这种比较才算是对 Mutex 本身的测试,而在 Atomic 的性能测试中,对 sum 做原子累加的操作。最终结果是,使用 Atomic 的操作耗时大概比 Mutex 少了 40% 以上。

在实际开发中,Mutex 保护的临界区内往往有更多操作,也就意味着 Mutex 锁需要耗费更长的时间才能释放,也就是会需要耗费比上面这个 40% 还要多的时间另外一个协程才能获取到 Mutex 锁。

go 的 sync 包中的原子操作

在文章的开头,我们就说了,在 go 的 sync.Mapsync.Pool 中都有用到了原子操作,本节就来看一看这些操作。

sync.Map 中的原子操作

sync.Map 中使用到了一个 entry 结构体,这个结构体中大部分操作都是原子操作,我们可以看看它下面这两个方法的定义:

// 删除 entry
func (e *entry) delete() (value any, ok bool) {
	for {
		p := e.p.Load()
		// 已经被删除了,不需要再删除
		if p == nil || p == expunged {
			return nil, false
		}
		// 删除成功
		if e.p.CompareAndSwap(p, nil) {
			return *p, true
		}
	}
}

// 如果条目尚未删除,trySwap 将交换一个值。
func (e *entry) trySwap(i *any) (*any, bool) {
	for {
		p := e.p.Load()
		// 已经被删除了
		if p == expunged {
			return nil, false
		}
		// swap 成功
		if e.p.CompareAndSwap(p, i) {
			return p, true
		}
	}
}

我们可以看到一个非常典型的特征就是 for + CompareAndSwap 的组合,这个组合在 entry 中出现了很多次。

如果我们也需要对变量做并发读写,也可以尝试一下这种 for + CompareAndSwap 的组合。

sync.WaitGroup 中的原子操作

sync.WaitGroup 中有一个类型为 atomic.Uint64state 字段,这个变量是用来记录 WaitGroup 的状态的。 在实际使用中,它的高 32 位用来记录 WaitGroup 的计数器,低 32 位用来记录 WaitGroupWaiter 的数量,也就是等待条件变量满足的协程数量。

如果不使用一个变量来记录这两个值,那么我们就需要使用两个变量来记录,这样就会导致我们需要对两个变量做并发读写, 在这种情况下,我们就需要使用互斥锁来保护这两个变量,这样就会导致性能的下降。

而使用一个变量来记录这两个值,我们就可以使用原子操作来保护这个变量,这样就可以保证并发读写的安全性,同时也能得到更好的性能:

// WaitGroup 的 Add 函数:高 32 位加上 delta
state := wg.state.Add(uint64(delta) << 32)

// WaitGroup 的 Wait 函数:低 32 位加 1
// 等待者的数量加 1
wg.state.CompareAndSwap(state, state+1)

CAS 操作有失败必然有成功

当然这里是指指向同一行 CAS 代码的时候(也就是有竞争的时候),如果是指向不同行 CAS 代码的时候,那么就不一定了。 比如下面这个例子,我们把前面计算 sum 的例子改一改,改成用 CAS 操作来完成:

func TestCas(t *testing.T) {
	var sum int32 = 0
	var wg sync.WaitGroup
	wg.Add(1000)

	for i := 0; i < 1000; i++ {
		go func() {
			defer wg.Done()
			// 这一行是有可能会失败的
			atomic.CompareAndSwapInt32(&sum, sum, sum+1)
		}()
	}

	wg.Wait()
	fmt.Println(sum) // 不是 1000
}

在这个例子中,我们把 atomic.AddInt32(&sum, 1) 改成了 atomic.CompareAndSwapInt32(&sum, sum, sum+1), 这样就会导致有可能会有多个 goroutine 同时执行到 atomic.CompareAndSwapInt32(&sum, sum, sum+1) 这一行代码, 这样肯定会有不同的 goroutine 同时拿到一个相同的 sum 的旧值,那么在这种情况下,就会导致 CAS 操作失败。 也就是说,将 sum 替换为 sum + 1 的操作可能会失败。

失败意味着什么呢?意味着另外一个协程序先把 sum 的值加 1 了,这个时候其实我们不应该在旧的 sum 上加 1 了, 而是应该在最新的 sum 上加上 1,那我们应该怎么做呢?我们可以在 CAS 操作失败的时候,重新获取 sum 的值, 然后再次尝试 CAS 操作,直到成功为止:

func TestCas(t *testing.T) {
	var sum int32 = 0
	var wg sync.WaitGroup
	wg.Add(1000)

	for i := 0; i < 1000; i++ {
		go func() {
			defer wg.Done()
			
			// cas 失败的时候,重新获取 sum 的值进行计算。
			// cas 成功则返回。
			for {
				if atomic.CompareAndSwapInt32(&sum, sum, sum+1) {
					return
				}
			}
		}()
	}

	wg.Wait()
	fmt.Println(sum)
}

总结

原子操作是并发编程中非常重要的一个概念,它可以保证并发读写的安全性,同时也能得到更好的性能。

最后,总结一下本文讲到的内容:

  • 원자적 연산은 단일 변수를 보호하는 반면, 뮤텍스 잠금은 코드 조각을 보호할 수 있습니다.
  • 원자적 연산은 CPU 명령어를 통해 구현되어야 하는 반면, 뮤텍스 잠금은 소프트웨어 수준에서 구현됩니다.
  • go의 원자 연산에는 다음이 포함됩니다.
    • Add: 원자 더하기 및 빼기 Add:原子增减
    • CompareAndSwap:原子比较并交换
    • Load:原子读取
    • Store:原子写入
    • Swap:原子交换
  • go 里面所有类型都能使用原子操作,只是不同类型的原子操作使用的函数不太一样。
  • atomic.Value 可以用来原子操作任意类型的变量。
  • go 里面有些底层实现也使用了原子操作,比如:
    • sync.WaitGroup:使用原子操作来保证计数器和等待者数量的并发读写安全性。
    • sync.Mapentry 结构体中基本所有操作都有原子操作的身影。
  • 原子操作有失败必然有成功(说的是同一行 CAS 操作),如果 CAS 操作失败了,那么我们可以重新获取旧值,然后再次尝试 CAS
  • CompareAndSwap: 원자 비교 및 ​​교환

Load : Atomic read

Store: Atomic writeSwap: Atomic exchange

🎜🎜🎜 Go의 모든 유형은 원자 연산을 사용할 수 있지만 다른 유형의 원자만 사용할 수 있습니다. 작업에 사용되는 것이 다릅니다. 🎜🎜atomic.Value는 모든 유형의 변수를 원자적으로 작동하는 데 사용할 수 있습니다. 🎜🎜go의 일부 기본 구현은 다음과 같은 원자 연산을 사용합니다. 🎜🎜sync.WaitGroup: 원자 연산을 사용하여 카운터와 웨이터의 동시 읽기 및 쓰기의 안전을 보장합니다. 🎜🎜sync.Map: entry 기본적으로 구조의 모든 작업에는 원자적 작업이 있습니다. 🎜🎜🎜🎜원자적 연산이 실패하면 성공해야 합니다(같은 줄의 CAS 연산 참조). CAS 연산이 실패하면 다시 얻을 수 있습니다. 이전 값을 확인한 다음 성공할 때까지 CAS 작업을 다시 시도하세요. 🎜🎜🎜일반적으로 원자 연산 자체에는 그다지 복잡한 논리가 없습니다. 일단 원리를 이해하면 쉽게 사용할 수 있습니다. 🎜🎜추천 학습: 🎜Golang 튜토리얼🎜🎜

위 내용은 원자적 연산이란 무엇입니까? 이동 중 원자 작업에 대한 심층 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.cn에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제