Home >Backend Development >Golang >Implementing a lock-free unbounded queue using the new atomic.Pointer type

Implementing a lock-free unbounded queue using the new atomic.Pointer type

PHPz
PHPzforward
2024-02-09 11:30:20903browse

Implementing a lock-free unbounded queue using the new atomic.Pointer type

php editor Strawberry will introduce a new technology to you today - using the new atomic.Pointer type to implement a lock-free and unbounded queue. In concurrent programming, queues are a common data structure, but traditional queue implementations usually require the use of locks to ensure thread safety, which will cause performance losses. The new atomic.Pointer type provides a lock-free solution that can achieve efficient concurrent queue operations. Below we will introduce this new implementation in detail, as well as its advantages and how to use it.

Question content

I am trying to implement this non-blocking queue of michael and scott.

I'm trying to use the new atomic.pointer type introduced in go 1.19, but I'm getting data races in my application.

This is my implementation:

package queue

import (
    "errors"
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    n := &node[T]{}
    head.Store(n)
    tail.Store(n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return head.value, errors.New("queue is empty")
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v, nil
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
        return q.head.Load() == q.tail.Load()
}

I found a different implementation here that works in my application without data races, but I can't seem to figure out what exactly is the difference between the two.

Thanks for any help or feedback!

Solution

It turns out that changing a few things can solve the problem.

First change:

var n = node[t]{}
head.store(&n)
tail.store(&n)

The second change is to change the dequeue() return signature.

The final file looks like this:

package queue

import (
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    var n = node[T]{}
    head.Store(&n)
    tail.Store(&n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() T {
    var t T
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return t
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
    return q.head.Load() == q.tail.Load()
}

The above is the detailed content of Implementing a lock-free unbounded queue using the new atomic.Pointer type. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:stackoverflow.com. If there is any infringement, please contact admin@php.cn delete