Maison  >  Article  >  Java  >  Comment maîtriser Java LinkedBlockingQueue

Comment maîtriser Java LinkedBlockingQueue

王林
王林avant
2023-04-18 16:04:031084parcourir

    Les files d'attente sont visibles partout dans la vie. Vous devez faire la queue pour payer à l'hôpital, vous devez faire la queue pour faire des tests d'acide nucléique, vous devez faire la queue aux feux tricolores pour les voitures, etc.

    Comment maîtriser Java LinkedBlockingQueue

    La file d'attente est une structure de données dans laquelle la file d'attente premier arrivé, premier arrivé, premier servi est placée à l'avant et la file d'attente deuxième arrivé, premier servi est classée à l'arrière lors du retrait de la file d'attente. , la file d'attente est également la première arrivée, premier servi. Implémenté à l'aide de tableaux et de listes chaînées. Habituellement utilisé pour coordonner l’exécution des tâches et l’échange de données.

    Introduction

    LinkedBlockingQueue est une file d'attente de blocage limitée facultative. Bounded signifie que la file d'attente a une capacité maximale de blocage, ce qui signifie que si la file d'attente est pleine et que vous souhaitez continuer à ajouter des éléments à la file d'attente, cette opération sera suspendue et l'opération sera interrompue. l’opération d’ajout ne continuera pas tant qu’il n’y aura pas d’espace dans la file d’attente. Si la file d'attente est déjà vide et que vous souhaitez récupérer des éléments de la file d'attente, l'opération sera suspendue jusqu'à ce qu'il y ait des éléments dans la file d'attente pour continuer l'opération d'acquisition.

    Principe de mise en œuvre

    LinkedBlockingQueue utilise en interne une liste chaînée comme structure de stockage des éléments. Deux verrous sont utilisés en interne, respectivement pour les opérations de stockage et les opérations de récupération.

    Lors de l'exécution d'opérations d'accès, le verrou doit d'abord être acquis avant que l'opération d'accès puisse être effectuée afin de garantir que LinkedBlockingQueue est thread-safe.

    LinkedBlockingQueue transmet deux files d'attente de conditions, une condition notFull et une condition notEmpty. Lors de l'insertion d'éléments dans la file d'attente, s'il est jugé que la file d'attente actuelle est pleine, le thread sera bloqué via la condition notFull jusqu'à ce que d'autres threads informent le thread que la file d'attente peut continuer à insérer des éléments. Lors de la suppression d'éléments de la file d'attente, s'il est déterminé que la file d'attente actuelle est vide, le thread sera bloqué via la condition notEmpty jusqu'à ce que d'autres threads puissent continuer à obtenir des éléments via ce thread.

    Cela garantit qu'il n'y aura pas d'erreurs dans les opérations d'accès au fil. Évitez de supprimer les éléments insérés lorsque la file d'attente est pleine ; évitez également d'obtenir une valeur nulle lorsque la file d'attente est vide.

    Constructeur

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    Dans le constructeur sans paramètre, Integer.MAX_VALUE est utilisé par défaut comme capacité maximale de la file d'attente.

    Dans le constructeur paramétré, vous pouvez spécifier vous-même la capacité maximale de la file d'attente et créer le nœud principal et le nœud final. Ensuite, LinkedBlockingQueue utilise une liste chaînée unidirectionnelle.

    private final int capacity;
    
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
    
    transient Node<E> head;
    
    private transient Node<E> last;
    
    // 取锁
    private final ReentrantLock takeLock = new ReentrantLock();
    
    private final Condition notEmpty = takeLock.newCondition();
    
    // 存锁
    private final ReentrantLock putLock = new ReentrantLock();
    
    private final Condition notFull = putLock.newCondition();

    Et lorsque l'objet est initialisé, deux verrous sont créés, qui sont utilisés pour les opérations de stockage et les opérations de récupération. Deux files d'attente conditionnelles sont créées, respectivement pour la situation où la file d'attente est vide et pleine.

    Insérer une fonction

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final int c;
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    1. Acquérir le verrou

    2. Déterminer si la file d'attente actuelle est pleine

    • Si la file d'attente est pleine, appelez la méthode wait() de la file d'attente conditionnelle notFull pour bloquer le thread et mettre en pause. l'opération d'insertion du fil. Évitez les problèmes de débordement interne.

    • S'il n'est pas plein, appelez directement la enqueue fonction enqueue pour l'insérer en fin de file d'attente.

    3. Vérifiez si la file d'attente est pleine à ce moment

    Si elle n'est pas pleine, appelez la méthode signal() de la file d'attente de conditions notFull pour réveiller le thread bloqué dans la file d'attente de conditions notFull.

    4. Déverrouiller

    • Vérifiez si le nombre d'éléments de la file d'attente avant d'insérer l'élément est égal à 0

    • est égal à 0, puis appelez la méthode signal() de la file d'attente conditionnelle notEmpty pour lui signaler que le la file d'attente n'est pas vide maintenant et vous pouvez réveiller le thread bloquant pour obtenir l'élément.

    Comment maîtriser Java LinkedBlockingQueue

    Pourquoi devons-nous appeler la méthode signal() de la file d'attente de conditions notFull ? Étant donné que les verrous utilisés pour les opérations de récupération de file d'attente et les opérations de stockage sont différents, cela signifie que lorsqu'un thread effectue une opération de dépôt, d'autres threads peuvent effectuer des opérations de récupération. Regardons l'exemple suivant :

    Comment maîtriser Java LinkedBlockingQueue

    • La capacité totale de la file d'attente est de 5 et le nombre actuel d'éléments est de 5. Le thread A acquiert le verrou et souhaite insérer l'élément. Mais comme la capacité de la file d'attente est pleine, le verrou est libéré et ajouté à la file d'attente des conditions, en attente d'être réveillé.

    • Le fil B acquiert le verrou et souhaite insérer l'élément. Mais comme la capacité de la file d'attente est pleine, le verrou est libéré et ajouté à la file d'attente des conditions, en attente d'être réveillé.

    • Le fil C a acquis le verrou et a retiré l'élément 1. Et réveillez le thread bloqué A dans la file d'attente des conditions via la méthode de signal de notFull. Le thread A rejoint la file d'attente de synchronisation après avoir été réveillé, mais n'a pas encore concouru pour le verrou à ce moment-là.

    • Le fil D a acquis le verrou et a retiré l'élément 2. Mais le code qui réveille le thread bloqué n’a pas encore été exécuté.

    • Le fil A est en compétition pour le verrou et commence à insérer des éléments. Après avoir inséré l'élément, il est vérifié que le nombre d'éléments de la file d'attente est de 4, ce qui est inférieur à la capacité totale de la file d'attente, donc la méthode de signal de notFull est exécutée pour réveiller le thread B bloqué dans la file d'attente conditionnelle. Une fois le thread B réveillé, il rejoint la file d’attente de synchronisation et commence à rivaliser pour le verrou.

    • Le fil B est en compétition pour le verrou et commence à insérer des éléments. Après insertion de l'élément, on vérifie que le nombre d'éléments de la file d'attente est égal à 5, et aucune opération de réveil n'est effectuée.

    这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。

    获取函数

    public E take() throws InterruptedException {
        final E x;
        final int c;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    1.获得取锁

    2.判断当前队列是否为空

    • 如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。

    • 如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。

    3.检查此时队列是否为空

    如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。

    4.释放锁

    5.检查获取元素前的队列元素数量是否等于最大容量

    等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。

    Comment maîtriser Java LinkedBlockingQueue

    步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。

    入队函数

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。

    Comment maîtriser Java LinkedBlockingQueue

    出队函数

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。

    Comment maîtriser Java LinkedBlockingQueue

    应用场景

    适用场景

    LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:

    线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。

    生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。

    实际应用场景

    • Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。

    • Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。

    • Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Déclaration:
    Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer