Heim >Backend-Entwicklung >Golang >Implementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer
php-Editor Strawberry stellt Ihnen heute eine neue Technologie vor – die Verwendung des neuen Typs atomic.Pointer zur Implementierung einer sperrenfreien und unbegrenzten Warteschlange. Bei der gleichzeitigen Programmierung sind Warteschlangen eine übliche Datenstruktur. Bei herkömmlichen Warteschlangenimplementierungen ist jedoch normalerweise die Verwendung von Sperren erforderlich, um die Thread-Sicherheit zu gewährleisten, was zu Leistungseinbußen führt. Der neue Typ atomic.Pointer bietet eine sperrenfreie Lösung, mit der effiziente gleichzeitige Warteschlangenoperationen erreicht werden können. Im Folgenden stellen wir diese neue Implementierung im Detail vor, sowie ihre Vorteile und deren Verwendung.
Ich versuche, diese nicht blockierende Warteschlange von Michael und Scott zu implementieren.
Ich versuche, den neuen Typ atomic.pointer zu verwenden, der in go 1.19 eingeführt wurde, aber in meiner Anwendung treten Datenrennen auf.
Das ist meine Umsetzung:
package queue import ( "errors" "sync/atomic" ) // LockfreeQueue represents a FIFO structure with operations to enqueue // and dequeue generic values. // Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html type LockFreeQueue[T any] struct { head atomic.Pointer[node[T]] tail atomic.Pointer[node[T]] } // node represents a node in the queue type node[T any] struct { value T next atomic.Pointer[node[T]] } // newNode creates and initializes a node func newNode[T any](v T) *node[T] { return &node[T]{value: v} } // NewQueue creates and initializes a LockFreeQueue func NewLockFreeQueue[T any]() *LockFreeQueue[T] { var head atomic.Pointer[node[T]] var tail atomic.Pointer[node[T]] n := &node[T]{} head.Store(n) tail.Store(n) return &LockFreeQueue[T]{ head: head, tail: tail, } } // Enqueue adds a series of Request to the queue func (q *LockFreeQueue[T]) Enqueue(v T) { n := newNode(v) for { tail := q.tail.Load() next := tail.next.Load() if tail == q.tail.Load() { if next == nil { if tail.next.CompareAndSwap(next, n) { q.tail.CompareAndSwap(tail, n) return } } else { q.tail.CompareAndSwap(tail, next) } } } } // Dequeue removes a Request from the queue func (q *LockFreeQueue[T]) Dequeue() (T, error) { for { head := q.head.Load() tail := q.tail.Load() next := head.next.Load() if head == q.head.Load() { if head == tail { if next == nil { return head.value, errors.New("queue is empty") } q.tail.CompareAndSwap(tail, next) } else { v := next.value if q.head.CompareAndSwap(head, next) { return v, nil } } } } } // Check if the queue is empty. func (q *LockFreeQueue[T]) IsEmpty() bool { return q.head.Load() == q.tail.Load() }
Ich habe hier eine andere Implementierung gefunden, die in meiner App ohne Datenrennen funktioniert, aber ich kann anscheinend nicht herausfinden, was genau der Unterschied zwischen den beiden ist.
Danke für jede Hilfe oder Rückmeldung!
Es stellt sich heraus, dass die Änderung einiger Dinge das Problem lösen kann.
Erste Änderung:
var n = node[t]{} head.store(&n) tail.store(&n)
Die zweite Änderung besteht darin, die dequeue()
Rücksendesignatur zu ändern.
Die endgültige Datei sieht so aus:
package queue import ( "sync/atomic" ) // LockfreeQueue represents a FIFO structure with operations to enqueue // and dequeue generic values. // Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html type LockFreeQueue[T any] struct { head atomic.Pointer[node[T]] tail atomic.Pointer[node[T]] } // node represents a node in the queue type node[T any] struct { value T next atomic.Pointer[node[T]] } // newNode creates and initializes a node func newNode[T any](v T) *node[T] { return &node[T]{value: v} } // NewQueue creates and initializes a LockFreeQueue func NewLockFreeQueue[T any]() *LockFreeQueue[T] { var head atomic.Pointer[node[T]] var tail atomic.Pointer[node[T]] var n = node[T]{} head.Store(&n) tail.Store(&n) return &LockFreeQueue[T]{ head: head, tail: tail, } } // Enqueue adds a series of Request to the queue func (q *LockFreeQueue[T]) Enqueue(v T) { n := newNode(v) for { tail := q.tail.Load() next := tail.next.Load() if tail == q.tail.Load() { if next == nil { if tail.next.CompareAndSwap(next, n) { q.tail.CompareAndSwap(tail, n) return } } else { q.tail.CompareAndSwap(tail, next) } } } } // Dequeue removes a Request from the queue func (q *LockFreeQueue[T]) Dequeue() T { var t T for { head := q.head.Load() tail := q.tail.Load() next := head.next.Load() if head == q.head.Load() { if head == tail { if next == nil { return t } q.tail.CompareAndSwap(tail, next) } else { v := next.value if q.head.CompareAndSwap(head, next) { return v } } } } } // Check if the queue is empty. func (q *LockFreeQueue[T]) IsEmpty() bool { return q.head.Load() == q.tail.Load() }
Das obige ist der detaillierte Inhalt vonImplementierung einer sperrenfreien, unbegrenzten Warteschlange mit dem neuen Typ atomic.Pointer. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!