Maison  >  Article  >  Java  >  File d'attente de blocage Java Analyse de l'instance BlockingQueue

File d'attente de blocage Java Analyse de l'instance BlockingQueue

王林
王林avant
2023-04-25 15:13:151102parcourir

Types de files d'attente

  • File d'attente illimitéeAucune limite de capacité, changements uniquement avec le stockage

  • File d'attente limitéeDéfinit la capacité maximale

Ajouter des éléments à la file d'attente infinie Toutes les opérations ne bloqueront jamais (également les threads -safe), il peut donc atteindre de très grandes capacités. Utiliser une file d'attente de blocage infinie BlockingQueue La chose la plus importante lors de la conception d'un modèle producteur-consommateur est que le consommateur doit pouvoir consommer les messages aussi vite que le producteur ajoute des messages à la file d'attente. Sinon, la mémoire pourrait être insuffisante et une exception OutOfMemory pourrait être levée.

Structure des données

  • 1. Généralement implémentée à l'aide de listes chaînées ou de tableaux

  • 2 Généralement avec des caractéristiques FIFO (premier entré, premier sorti), elle peut également être conçue comme une file d'attente à double extrémité

  • . 3. Les principales opérations de la file d'attente : Entrée et retrait de la file d'attente

Blocking Queue BlockingQueue

Définition : Dans la communication des threads, à tout moment, quelle que soit la hauteur de la concurrence, sur une seule JVM, un seul thread peut toujours entrer dans la file d'attente en même temps. Opérations de mise en file d'attente ou de retrait de la file d'attente. BlockingQueue peut être partagé entre les threads sans aucune synchronisation explicite. Types de files d'attente de blocage :

Wait

File d'attente de blocage commune

File dattente de blocage Java Analyse de linstance BlockingQueue

ArrayBlockingQueue :

File d'attente limitée prise en charge par array

    Scénarios d'application :
  • Il existe de nombreuses applications. et des modèles producteur-consommateur dans le pool de threads

  • Principe de fonctionnement :
      Basé sur ReentrantLock pour garantir la sécurité des threads et implémenter le blocage lorsque la file d'attente est pleine selon la condition
    • LinkedBlockingQueue : File d'attente illimitée basé sur une liste chaînée (théoriquement délimitée)

  • Priority BlockingQueue :

    File d'attente prioritaire illimitée prise en charge par le tas prioritaire PriorityQueue et file d'attente illimitée basée sur l'implémentation de l'expansion du tableau

    Utilisation :
  • Les objets ajoutés à la file d'attente doivent implémenter l'interface Delayed, et Delayed est intégré à partir de l'interface Comparable

  • Scénarios d'application :
  • Vente de billets de cinéma , etc.

  • Principe de fonctionnement :
      La file d'attente sera traitée en interne selon le temps Trié par priorité. Retarder l’exécution du cycle du pool de threads de classe.
    • Ils implémentent tous l'interface BlockingQueue, avec les méthodes put() et take()
    • La méthode de création est la suivante :
    • BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);
      BlockingQueue API

    • Ajouter des éléments :
    • method
    .
Signification

add()

Si l'insertion est réussie, retournez true, sinon une exception IllegalStateException sera levée

put()Insérez l'élément spécifié dans la file d'attente Si la file d'attente est pleine, il bloquera jusqu'à ce qu'il y ait de l'espace pour insérerRenvoie vrai si l'insertion est réussie, sinon renvoie fauxEssayez d'insérer des éléments dans la file d'attente. Si la file d'attente est pleine, elle se bloquera jusqu'à ce qu'il y ait une insertion spatiale, le blocage a un contrôle temporel. Récupérez l'élément de tête de la file d'attente et supprimez-le si la file d'attente est vide, puis bloque et attend que l'élément devienne disponiblepoll (long timeout, unité TimeUnit)Récupérez et supprimez la tête de la file d'attente, si nécessaire, attendez le temps d'attente spécifié pour rendre l'élément disponible, et revenez s'il expire null

ArrayBlockingQueue 源码简解

实现:同步等待队列(CLH)+ 条件等待队列满足条件的元素在CLH队列中等待锁,不满足条件的队列挪到条件等待队列,满足条件后再从 tail 插入 CLH 队列

线程获取锁的条件: 在 CLH 队列里等待的 Node 节点,并且 Node 节点的前驱节点是 Singal。条件等待队列里的线程是无法获取锁的。

/**
 * 构造方法
 * 还有两个构造函数,一个无fair参数,一个可传入集合,创建时插入队列
 * @param capacity 固定容量
 * @param fair 默认是false:访问顺序未指定; true:按照FIFO顺序处理
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
   if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); // 根据fair创建对应的锁
    // 条件对象,配合容器能满足业务
    notEmpty = lock.newCondition(); // 出队条件对象
    notFull =  lock.newCondition(); // 入队条件对象
}
/**
 * 入队方法
 * 在队列的尾部插入指定的元素,如果队列已满,则等待空间可用
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e); // 检查put对象是否为空,空抛出异常
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 若未被中断尝试获取锁,详见下文
    try {
        // 队列中元素的数量 等于 排队元素的长度
        while (count == items.length)
            notFull.await(); // 见下文
        enqueue(e); // 元素入队
    } finally {
        lock.unlock();
    }
}
/**
 * 出队方法
 * 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 见下文
    try {
        while (count == 0)
            notEmpty.await(); // 见下文
        return dequeue(); // 元素出队
    } finally {
        lock.unlock();
    }
}

令当前线程等待,直到收到信号或被中断详:与此 Condition 关联的锁被自动释放,进入等待,并且处于休眠状态,直到发生以下四种情况之一:

  • ①其他线程调用这个Condition的 signal 方法,当前线程恰好被选为要被唤醒的线程;

  • ②其他线程调用这个条件的 signalAll 方法

  • ③其他线程中断当前线程,支持中断线程挂起;

  • ④一个“虚假的唤醒”发生了。

在这些情况下,在此方法返回之前,当前线程必须重新获得与此条件相关联的锁。当线程返回时,保证它持有这个锁。

如果当前线程有以下两种情况之一:

  • ①在进入该方法时设置中断状态;

  • ②在等待时被中断,支持线程挂起的中断 抛出InterruptedException

生产者消费者模式

BlockingQueue 可以在线程之间共享而无需任何显式同步,在生产者消费者之间,只需要将阻塞队列以参数的形式进行传递即可。它内部的机制会自动保证线程的安全性。

生产者:实现了 Runnable 接口,每个生产者生产100种商品和1个中断标记后完成线程任务

@Slf4j
@Slf4j
public class Producer implements Runnable{
    // 作为参数的阻塞队列
    private BlockingQueue<Integer> blockingQueue;
    private final int stopTag;
    /**
     * 构造方法
     * @param blockingQueue
     * @param stopTag
     */
    public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
        this.blockingQueue = blockingQueue;
        this.stopTag = stopTag;
    }
    @Override
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
   private void generateNumbers() throws InterruptedException {
        // 每个生产者都随机生产10种商品
        for (int i = 0; i < 10; i++) {
            int product = ThreadLocalRandom.current().nextInt(1000,1100);
            log.info("生产者{}号,生产了商品,编号为{}",Thread.currentThread().getId(),product);
            blockingQueue.put(product);
        }
        // 生产终止标记
        blockingQueue.put(stopTag);
        log.info("生产者{}号,生产了第终止标记编号{}",Thread.currentThread().getId(),Thread.currentThread().getId());
    }
}

消费者:消费者拿到终止消费标记终止消费,否则消费商品,拿到终止标记后完成线程任务

@Slf4j
public class Consumer implements Runnable{
    // 作为参数的阻塞队列
    private BlockingQueue<Integer> queue;
    private final int stopTage;
    public Consumer(BlockingQueue<Integer> queue, int stopTage) {
        this.queue = queue;
        this.stopTage = stopTage;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Integer product = queue.take();
                if (product.equals(stopTage)) {
                    log.info("{}号消费者,停止消费,因为拿到了停止消费标记",Thread.currentThread().getId());
                    return;
                }
                log.info("{}号消费者,拿到的商品编号:{}",Thread.currentThread().getId(),product);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

客户端类: 创建与计算机 CPU 核数相同的线程数,与 16个生产者

public class ProductConsumerTest {
    public static void main(String[] args) {
        // 阻塞队列容量
        int blockingQueueSize = 10;
        // 生产者数量
        int producerSize = 16;
        // 消费者数量 = 计算机线程核数 8
        int consumerSize = Runtime.getRuntime().availableProcessors();
        // 终止消费标记
        int stopTag = Integer.MAX_VALUE;
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
        // 创建16个生产者线程
        for (int i = 0; i < producerSize; i++) {
            new Thread(new Producer(blockingQueue, stopTag)).start();
        }
        // 创建8个消费者线程
        for (int j = 0; j < consumerSize; j++) {
            new Thread(new Consumer(blockingQueue, stopTag)).start();
        }
    }
}

延迟队列 DelayQueue

定义: Java 延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 来判断。延时队列不能存放空元素。

/**
 * 电影票类,实现了Delayed接口,重写 compareTo 和 getDelay方法
 */
public class MovieTicket implements Delayed {
    //延迟时间
    private final long delay;
    //到期时间
    private final long expire;
    //数据
    private final String msg;
    //创建时间
    private final long now;
    public long getDelay() {
        return delay;
    }
    public long getExpire() {
        return expire;
    }
    public String getMsg() {
        return msg;
    }
    public long getNow() {
        return now;
    }
    /**
     * @param msg 消息
     * @param delay 延期时间
     */
    public MovieTicket(String msg , long delay) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间
        now = System.currentTimeMillis();
    }
    /**
     * @param msg
     */
    public MovieTicket(String msg){
        this(msg,1000);
    }
    public MovieTicket(){
        this(null,1000);
    }
    /**
     * 获得延迟时间   用过期时间-当前时间,时间单位毫秒
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire
                - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }
    /**
     * 用于延迟队列内部比较排序  当前时间的延迟时间 - 比较对象的延迟时间
     * 越早过期的时间在队列中越靠前
     * @param delayed
     * @return
     */
    @Override
    public int compareTo(Delayed delayed) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS)
                - delayed.getDelay(TimeUnit.MILLISECONDS));
    }
}

测试类:

public static void main(String[] args) {
    DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
    MovieTicket ticket = new MovieTicket("电影票1",10000);
    delayQueue.put(ticket);
    MovieTicket ticket1 = new MovieTicket("电影票2",5000);
    delayQueue.put(ticket1);
    MovieTicket ticket2 = new MovieTicket("电影票3",8000);
    delayQueue.put(ticket2);
    log.info("message:--->入队完毕");
    while( delayQueue.size() > 0 ){
        try {
            ticket = delayQueue.take();
            log.info("电影票出队:{}",ticket.getMsg());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

从运行结果可以看出队列是延迟出队,间隔和我们所设置的时间相同

offer()
offer(E e, long timeout, unité TimeUnit)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer