Heim >Backend-Entwicklung >Golang >Implementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer

Implementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer

PHPz
PHPznach vorne
2024-02-09 11:30:20903Durchsuche

Implementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer

php-Editor Strawberry stellt Ihnen heute eine neue Technologie vor – die Verwendung des neuen Typs atomic.Pointer zur Implementierung einer sperrenfreien und unbegrenzten Warteschlange. Bei der gleichzeitigen Programmierung sind Warteschlangen eine übliche Datenstruktur. Bei herkömmlichen Warteschlangenimplementierungen ist jedoch normalerweise die Verwendung von Sperren erforderlich, um die Thread-Sicherheit zu gewährleisten, was zu Leistungseinbußen führt. Der neue Typ atomic.Pointer bietet eine sperrenfreie Lösung, mit der effiziente gleichzeitige Warteschlangenoperationen erreicht werden können. Im Folgenden stellen wir diese neue Implementierung im Detail vor, sowie ihre Vorteile und deren Verwendung.

Frageninhalt

Ich versuche, diese nicht blockierende Warteschlange von Michael und Scott zu implementieren.

Ich versuche, den neuen Typ atomic.pointer zu verwenden, der in go 1.19 eingeführt wurde, aber in meiner Anwendung treten Datenrennen auf.

Das ist meine Umsetzung:

package queue

import (
    "errors"
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    n := &node[T]{}
    head.Store(n)
    tail.Store(n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return head.value, errors.New("queue is empty")
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v, nil
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
        return q.head.Load() == q.tail.Load()
}

Ich habe hier eine andere Implementierung gefunden, die in meiner App ohne Datenrennen funktioniert, aber ich kann anscheinend nicht herausfinden, was genau der Unterschied zwischen den beiden ist.

Danke für jede Hilfe oder Rückmeldung!

Workaround

Es stellt sich heraus, dass die Änderung einiger Dinge das Problem lösen kann.

Erste Änderung:

var n = node[t]{}
head.store(&n)
tail.store(&n)

Die zweite Änderung besteht darin, die dequeue() Rücksendesignatur zu ändern.

Die endgültige Datei sieht so aus:

package queue

import (
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    var n = node[T]{}
    head.Store(&n)
    tail.Store(&n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() T {
    var t T
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return t
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
    return q.head.Load() == q.tail.Load()
}

Das obige ist der detaillierte Inhalt vonImplementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:stackoverflow.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen