병원에서 돈을 지불하기 위해 줄을 서야 하고, 핵산 검사를 하기 위해 줄을 서야 하고, 자동차 신호등에도 줄을 서야 합니다.
큐는 선착순 큐가 앞에 배치되고, 두번째 선착순 큐가 뒤에 배치되는 데이터 구조입니다. 배열과 연결리스트를 사용해 구현했습니다. 일반적으로 작업 실행 및 데이터 교환을 조정하는 데 사용됩니다.
LinkedBlockingQueue는 선택적인 제한된 차단 대기열입니다. Bounded는 대기열에 최대 용량이 있다는 것을 의미합니다. 즉, 대기열이 가득 차고 계속해서 대기열에 요소를 추가하려는 경우 이 작업이 일시 중지되고 대기열에 공간이 생길 때까지 추가 작업은 계속되지 않습니다. 큐가 이미 비어 있고 큐에서 요소를 가져오려는 경우 큐에 요소가 있을 때까지 작업이 일시 중지되어 획득 작업을 계속합니다.
LinkedBlockingQueue는 내부적으로 연결 목록을 요소의 저장 구조로 사용합니다. 저장 작업과 검색 작업을 위해 각각 내부적으로 두 개의 잠금이 사용됩니다.
액세스 작업을 수행할 때 LinkedBlockingQueue가 스레드로부터 안전한지 확인하기 위해 액세스 작업을 수행하기 전에 먼저 잠금을 획득해야 합니다.
LinkedBlockingQueue는 두 개의 조건 조건 대기열(notFull 조건 하나와 notEmpty 조건 하나)을 전달합니다. 큐에 요소를 삽입할 때 현재 큐가 꽉 찼다고 판단되면 다른 스레드가 해당 스레드에 큐에 요소를 계속 삽입할 수 있음을 알릴 때까지 notFull 조건을 통해 해당 스레드를 차단합니다. 큐에서 요소를 제거할 때 현재 큐가 비어 있다고 판단되면 다른 스레드가 이 스레드를 통해 요소를 계속 얻을 수 있을 때까지 해당 스레드는 notEmpty 조건을 통해 차단됩니다.
이렇게 하면 스레드의 액세스 작업에 오류가 발생하지 않습니다. 큐가 가득 찼을 때 삽입된 요소를 삭제하지 마세요. 또한 큐가 비어 있을 때 null 값을 가져오는 것도 방지하세요.
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
매개변수 없는 생성자에서는 Integer.MAX_VALUE
가 기본적으로 대기열의 최대 용량으로 사용됩니다.
매개변수화된 생성자에서 대기열의 최대 용량을 직접 지정하고 헤드 노드와 테일 노드를 생성할 수 있습니다. 그런 다음 LinkedBlockingQueue는 단방향 연결 목록을 사용합니다.
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; // 取锁 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 存锁 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
그리고 객체가 초기화되면 저장 작업과 검색 작업에 각각 사용되는 두 개의 잠금이 생성됩니다. 비어 있는 대기열 조건과 꽉 찬 대기열 조건에 대해 각각 두 개의 조건부 대기열이 생성됩니다.
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1. 잠금 획득
2. 현재 대기열이 가득 찼는지 확인
대기열이 가득 차면 notFull 조건부 대기열의 wait() 메서드를 호출하여 스레드를 차단하고 일시 중지합니다. 스레드 삽입 작업. 내부 오버플로 문제를 피하세요.
가득 차 있지 않으면 enqueue 함수 enqueue를 직접 호출하여 대기열 끝에 삽입합니다.
3. 이때 큐가 꽉 찼는지 확인하세요.
꽉 차지 않았다면 notFull 조건 큐의 signal() 메서드를 호출하여 notFull 조건 큐에 차단된 스레드를 깨웁니다.
4. 잠금 해제
요소를 삽입하기 전의 대기열 요소 수가 0
인지 확인한 다음 notEmpty 조건부 대기열의 signal() 메서드를 호출하여 이제 대기열은 비어 있지 않으며 차단 스레드를 깨울 수 있습니다. 요소를 가져옵니다.
왜 notFull 조건 대기열의 signal() 메서드를 호출해야 합니까? 대기열 검색 작업과 저장 작업에 사용되는 잠금이 다르기 때문에 한 스레드가 입금 작업을 수행하면 다른 스레드가 검색 작업을 수행할 수 있음을 의미합니다. 다음 예를 살펴보겠습니다.
큐의 총 용량은 5이고 현재 요소 수는 5입니다. 스레드 A는 잠금을 획득하고 요소를 삽입하려고 합니다. 하지만 대기열 용량이 가득 차서 잠금이 해제되고 조건 대기열에 추가되어 깨어나기를 기다립니다.
스레드 B가 잠금을 획득하고 요소를 삽입하려고 합니다. 하지만 대기열 용량이 가득 차서 잠금이 해제되고 조건 대기열에 추가되어 깨어나기를 기다립니다.
스레드 C가 잠금을 획득하고 요소 1을 꺼냈습니다. 그리고 notFull이라는 신호 방식을 통해 조건 대기열에 있는 차단된 스레드 A를 깨웁니다. 스레드 A는 깨어난 후 동기화 대기열에 참가하지만 현재로서는 아직 잠금을 위해 경쟁하지 않았습니다.
스레드 D가 잠금 장치를 획득하고 요소 2를 꺼냈습니다. 그러나 차단된 스레드를 깨우는 코드는 아직 실행되지 않았습니다.
스레드 A는 잠금을 놓고 경쟁하고 요소 삽입을 시작합니다. 요소를 삽입한 후 대기열 요소의 개수가 대기열의 전체 용량보다 적은 4개인지 확인하므로 notFull 의 시그널 메서드를 실행하여 조건부 대기열에 차단된 스레드 B를 깨웁니다. 스레드 B가 깨어난 후 동기화 대기열에 합류하고 잠금 경쟁을 시작합니다.
스레드 B가 잠금을 놓고 경쟁하고 요소 삽입을 시작합니다. 요소를 삽입한 후 대기열 요소의 개수가 5인지 확인하고 wake-up 작업을 수행하지 않습니다.
这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1.获得取锁
2.判断当前队列是否为空
如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。
如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。
3.检查此时队列是否为空
如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。
4.释放锁
5.检查获取元素前的队列元素数量是否等于最大容量
等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。
步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。
private void enqueue(Node<E> node) { last = last.next = node; }
入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。
LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:
线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。
生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。
Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。
Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。
Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。
위 내용은 Java LinkedBlockingQueue를 마스터하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!