首頁  >  文章  >  Java  >  怎麼掌握Java LinkedBlockingQueue

怎麼掌握Java LinkedBlockingQueue

王林
王林轉載
2023-04-18 16:04:031080瀏覽

    隊列在生活中隨處可見,醫院繳費需要排隊、做核酸需要排隊、汽車等紅綠燈需要排隊等等。

    怎麼掌握Java LinkedBlockingQueue

    佇列是一個按照先來到就排在前面,後來到排在後面的資料結構,並且出隊的時候也是按照先來到先出隊。使用數組和鍊錶進行實作。通常用於協調任務的執行和資料的交換。

    介紹

    LinkedBlockingQueue 是一個可選有界阻塞佇列,有界指的是佇列存在一個最大容量;阻塞指的是如果佇列已經滿了,想要繼續往佇列新增元素的話,那麼這個操作將會被暫停,直到佇列中有空位才會繼續完成新增操作。如果佇列已經為空,想要從佇列中取得元素,那麼這個操作將會被暫停,直接佇列中存在元素才會繼續完成取得操作。

    實作原理

    LinkedBlockingQueue 內部使用鍊錶作為元素的儲存結構。內部使用了兩個鎖,分別使用於儲存操作和取操作。

    執行存取操作時,都必須先取得鎖,才可以執行存取操作,並保證 LinkedBlockingQueue 是執行緒安全性。

    LinkedBlockingQueue 透過兩個 Condition 條件佇列,一個 notFull 條件,一個 notEmpty 條件。在對佇列進行插入元素操作時,判斷目前佇列已經滿,則透過 notFull 條件將執行緒阻塞,直到其他執行緒通知該執行緒佇列可以繼續插入元素。在對佇列進行移除元素操作時,判斷目前佇列已經空,則透過 notEmpty 條件阻塞線程,直到其他執行緒通過該執行緒可以繼續取得元素。

    這樣保證執行緒的存取操作不會出現錯誤。避免佇列在滿時,丟棄插入的元素;也避免在佇列空時取到一個 null 值。

    建構子

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    無參考建構子中,預設使用 Integer.MAX_VALUE 作為佇列的最大容量。

    有參構造函數中,可以自行指定佇列的最大容量,並且建立了頭節點和尾節點。那麼 LinkedBlockingQueue 使用的是有頭單向鍊錶

    private final int capacity;
    
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
    
    transient Node<E> head;
    
    private transient Node<E> last;
    
    // 取锁
    private final ReentrantLock takeLock = new ReentrantLock();
    
    private final Condition notEmpty = takeLock.newCondition();
    
    // 存锁
    private final ReentrantLock putLock = new ReentrantLock();
    
    private final Condition notFull = putLock.newCondition();

    並且在物件初始化時,創建了兩個鎖,分別使用於儲存操作和取操作。創建了兩個條件隊列,分別用於隊列空和滿的情況。

    插入函數

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final int c;
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    1.取得鎖定

    2.判斷目前佇列是否已經滿了

    • 如果佇列已經滿了,呼叫notFull 條件佇列的await() 方法,將該執行緒阻塞,暫停該執行緒的插入操作。避免內部溢出的問題。

    • 如果沒有滿,則直接呼叫入隊函數 enqueue 插入到佇列結尾。

    3.檢查此時佇列是否已滿

    如果未滿,則呼叫notFull 條件佇列的signal() 方法,喚醒被阻塞在notFull 條件佇列的線程。

    4.解鎖

    • 檢查插入元素前的佇列元素數量是否等於0

    • 等於0,則呼叫notEmpty條件佇列的signal() 方法,通知其佇列現在不為空,可以喚醒阻塞執行緒取得元素。

    怎麼掌握Java LinkedBlockingQueue

    為什麼需要呼叫 notFull 條件佇列的 signal() 方法? 因為佇列取操作和儲存操作所使用的鎖是不一樣的,那麼就說明,當一個執行緒執行存入操作時,其他執行緒是可以執行取出操作的。我們來看下面這個範例:

    怎麼掌握Java LinkedBlockingQueue

    • 佇列總容量為 5,目前元素數量為5。線程 A 獲取了存鎖,想要插入了元素。但是因為佇列容量已滿,釋放鎖定,並且加入到條件佇列中,等待被喚醒。

    • 線程 B 取得了存鎖,想要插入了元素。但是因為佇列容量已滿,釋放鎖定,並且加入到條件佇列中,等待被喚醒。

    • 線程 C 取得了取鎖,取出了元素 1。並且透過 notFull 的 signal 方法喚醒條件佇列中被阻塞的執行緒 A。執行緒 A 被喚醒後加入到同步佇列中,但此時還沒有競爭到鎖定。

    • 線程 D 取得了取鎖,取出了元素 2。但是還沒有執行到喚醒阻塞執行緒的程式碼。

    • 執行緒 A 競爭到鎖定,開始執行插入元素操作。將元素插入之後,檢查到佇列元素數量為 4,小於佇列的總容量,因此執行 notFull 的 signal 方法喚醒條件佇列中被阻塞的執行緒 B。執行緒 B 被喚醒後加入到同步佇列中,開始競爭鎖定。

    • 執行緒 B 競爭到鎖定,開始執行插入元素操作。將元素插入之後,檢查到佇列元素數量等於 5,不進行喚醒操作。

    这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。

    获取函数

    public E take() throws InterruptedException {
        final E x;
        final int c;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    1.获得取锁

    2.判断当前队列是否为空

    • 如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。

    • 如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。

    3.检查此时队列是否为空

    如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。

    4.释放锁

    5.检查获取元素前的队列元素数量是否等于最大容量

    等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。

    怎麼掌握Java LinkedBlockingQueue

    步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。

    入队函数

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。

    怎麼掌握Java LinkedBlockingQueue

    出队函数

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。

    怎麼掌握Java LinkedBlockingQueue

    应用场景

    适用场景

    LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:

    线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。

    生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。

    实际应用场景

    • Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。

    • Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。

    • Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。

    以上是怎麼掌握Java LinkedBlockingQueue的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除