1. DelayQueue는 지연된 무제한 차단 큐
DelayQueue의 사용과 원리에 관해 이야기할 때 먼저 DelayQueue를 소개합니다. DelayQueue는 지연이 만료될 때만 요소를 추출할 수 있는 것입니다. 대기열의 헤드는 지연이 만료된 후 가장 오랫동안 저장될 지연된 요소입니다. (추천 연구: java 인터뷰 질문)
DelayQueue 차단 대기열은 시스템 개발에서도 자주 사용됩니다. 예를 들어, 캐시 시스템 설계, 캐시에 있는 객체가 유휴 시간을 초과하여 다른 곳에서 이동해야 합니다. 캐시; 작업 스케줄링 시스템은 작업의 실행 시간을 정확하게 파악할 수 있습니다. 스레드를 통해 시간이 중요한 많은 데이터를 처리해야 할 수도 있습니다.
일반 스레드를 사용하는 경우 모든 개체를 순회하여 데이터가 만료되었는지 등을 하나씩 확인해야 합니다. 우선 실행 효율성이 너무 높지 않을 것이며 두 번째로 디자인 스타일이 또한 데이터 정확도에 큰 영향을 미칩니다. 12시에 실행해야 하는 작업은 12시 1분까지 실행되지 않을 수 있으며, 이는 데이터 요구 사항이 높은 시스템에 더 큰 단점이 있습니다. 여기에서 DelayQueue를 사용할 수 있습니다.
다음은 DelayQueue에 대한 소개와 예시입니다. 그리고 Delayed 인터페이스와 샘플 코드의 구현을 제공합니다. DelayQueue는 특수 매개변수가 Delayed인 BlockingQueue입니다.
(BlockingQueue를 모르는 학생은 먼저 BlockingQueue에 대해 알아보고 이 기사를 읽어보세요.) Delayed는 Comparable 인터페이스를 확장합니다. 비교 벤치마크는 Delayed 인터페이스의 구현 클래스인 getDelay의 반환 값입니다. , 고정된 값이어야 합니다(최종). DelayQueue는 PriorityQueue를 사용하여 내부적으로 구현됩니다.
DelayQueue=BlockingQueue+PriorityQueue+Delayed
DelayQueue의 핵심 요소는 BlockingQueue, PriorityQueue 및 Delayed입니다. DelayQueue는 우선순위 큐(PriorityQueue)를 이용하여 구현한 BlockingQueue라고 할 수 있다. 우선순위 큐의 비교 기준 값은 시간이다.
기본 정의는 다음과 같습니다
public interface Comparable<T> { public int compareTo(T o); } public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); } public class DelayQueue<E extends Delayed> implements BlockingQueue<E> { private final PriorityQueue<E> q = new PriorityQueue<E>(); }
DelayQueue의 내부 구현에서는 우선 순위 대기열을 사용합니다. DelayQueue의 Offer 메소드가 호출되면 Delayed 객체가 우선순위 큐 q에 추가됩니다. 다음과 같습니다:
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(e); if (first == null || e.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } }
DelayQueue의 take 메소드는 우선순위 큐 q(peek) 중 첫 번째를 꺼냅니다. 지연 임계값에 도달하지 않으면 대기 처리가 수행됩니다. 다음과 같습니다:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); //wake up other takers return x; } } } } finally { lock.unlock(); } }
DelayQueue 인스턴스 애플리케이션
Ps: 호출 동작을 가지려면 DelayDeque에 저장된 요소가 Delayed 인터페이스를 상속해야 합니다. Delayed 인터페이스는 객체를 지연된 객체로 만들어 DelayQueue 클래스에 저장된 객체에 활성화 날짜를 제공합니다. 이 인터페이스는 다음 두 가지 방법을 시행합니다.
다음은 Delay를 사용하여 캐시를 구현하는 것입니다. 여기에는 총 세 가지 클래스가 포함됩니다: pair, DelayItem, Cache
Pair 클래스:
public class Pair<K, V> { public K first; public V second; public Pair() { } public Pair(K first, V second) { this.first = first; this.second = second; } }
다음은 Delay 인터페이스의 구현입니다.
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class DelayItem<T> implements Delayed { /** * Base of nanosecond timings, to avoid wrapping */ private static final long NANO_ORIGIN = System.nanoTime(); /** * Returns nanosecond time offset by origin */ final static long now() { return System.nanoTime() - NANO_ORIGIN; } /** * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied * entries. */ private static final AtomicLong sequencer = new AtomicLong(0); /** * Sequence number to break ties FIFO */ private final long sequenceNumber; /** * The time the task is enabled to execute in nanoTime units */ private final long time; private final T item; public DelayItem(T submit, long timeout) { this.time = now() + timeout; this.item = submit; this.sequenceNumber = sequencer.getAndIncrement(); } public T getItem() { return this.item; } public long getDelay(TimeUnit unit) { long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); return d; } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof DelayItem) { DelayItem x = (DelayItem) other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ?0 :((d < 0) ?-1 :1); } }
다음은 Cache의 구현입니다. put 및 get 메소드
import javafx.util.Pair; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; public class Cache<K, V> { private static final Logger LOG = Logger.getLogger(Cache.class.getName()); private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>(); private DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>(); private Thread daemonThread; public Cache() { Runnable daemonTask = new Runnable() { public void run() { daemonCheck(); } }; daemonThread = new Thread(daemonTask); daemonThread.setDaemon(true); daemonThread.setName("Cache Daemon"); daemonThread.start(); } private void daemonCheck() { if (LOG.isLoggable(Level.INFO)) LOG.info("cache service started."); for (; ; ) { try { DelayItem<Pair<K, V>> delayItem = q.take(); if (delayItem != null) { // 超时对象处理 Pair<K, V> pair = delayItem.getItem(); cacheObjMap.remove(pair.first, pair.second); // compare and remove } } catch (InterruptedException e) { if (LOG.isLoggable(Level.SEVERE)) LOG.log(Level.SEVERE, e.getMessage(), e); break; } } if (LOG.isLoggable(Level.INFO)) LOG.info("cache service stopped."); } // 添加缓存对象 public void put(K key, V value, long time, TimeUnit unit) { V oldValue = cacheObjMap.put(key, value); if (oldValue != null) q.remove(key); long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit); q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime)); } public V get(K key) { return cacheObjMap.get(key); } }
테스트 주요 메소드:
// 测试入口函数 public static void main(String[] args) throws Exception { Cache<Integer, String> cache = new Cache<Integer, String>(); cache.put(1, "aaaa", 3, TimeUnit.SECONDS); Thread.sleep(1000 * 2); { String str = cache.get(1); System.out.println(str); } Thread.sleep(1000 * 2); { String str = cache.get(1); System.out.println(str); } }
출력 결과는 다음과 같습니다.
aaaa null
위 결과를 보면 지연 시간이 초과되면 캐시에 있는 데이터가 자동으로 손실됩니다. 결과는 null이 됩니다.
2. 동시(수집) 대기열 - 비차단 대기열
비차단 대기열
우선 비차단 대기열이 무엇인지 간략하게 이해해야 합니다.
차단 대기열과 반대 , 비차단 대기열의 실행은 소비자의 대기열 제거이든 생산자의 합류든 차단되지 않습니다. 내부적으로 비차단 대기열은 CAS(비교 및 교체)를 사용하여 비차단 스레드 실행을 달성합니다.
비 차단 대기열의 간단한 작업
차단 대기열과 마찬가지로 비 차단 대기열의 일반적인 방법은 대기열 제거 및 대기열 추가입니다.
offer(): 대기열 입력 작업을 구현하고 스레드 실행을 방해하지 않는 Queue 인터페이스에서 상속된 메서드입니다. 삽입이 성공하면 대기열 제거 메서드가 반환됩니다.
poll(): 이동 헤드 노드 포인터를 반환하고 헤드 노드 요소를 반환하며, 큐가 비어 있으면 null이 반환됩니다.
peek(): 헤드 노드 포인터를 이동하고 헤드 노드 요소를 반환합니다. 대기열이 비어 있으면 null이 반환됩니다.
3. 비차단 알고리즘 CAS
먼저 비관적 잠금과 낙관적 잠금을 이해해야 합니다. 동시성 환경은 비관적입니다. 동시성 충돌이 발생하면 일관성이 파괴되므로 배타적 잠금을 통해 충돌을 완전히 금지해야 합니다. "문을 잠그지 않으면 말썽꾸러기가 침입해 난리를 만들 것이다"라는 고전적인 비유가 있는데, "문을 열고 한 번에 한 사람만 들여보내면 한 사람씩만 들어갈 수 있다"는 것이다. 그 사람을 주목해봐."
낙관적 잠금: 동시성 환경은 낙관적이라고 가정합니다. 즉, 동시성 충돌이 있더라도 충돌을 발견할 수 있고 손상을 일으키지 않을 것이므로 보호 기능을 추가할 수 없으며 결정됩니다. 동시성 충돌이 발견된 후 작업을 포기하거나 다시 시작합니다. 비유하자면 "문을 잠그지 않으면 말썽꾸러기들이 침입하더라도 그들이 당신을 파괴하고 싶어하는지 알 것입니다." 따라서 "모든 사람을 들여보내고 그들이 알아낼 때까지 기다리면 됩니다." 파괴하고 싶다." 결정을 내리다".
특히 일부 복잡한 시나리오에서는 낙관적 잠금의 성능이 비관적 잠금의 성능보다 더 높다고 일반적으로 알려져 있습니다. 이는 주로 비관적 잠금이 잠금 중에 손상을 일으키지 않는 특정 작업을 보호하는 반면, 낙관적 잠금에 대한 경쟁은 가장 작은 동시성 충돌에서만 발생하기 때문입니다. 이를 이해하기 위해 비관적 잠금을 사용하면 "잠금 세분성이 가장 작습니다. ". 그러나 낙관적 잠금의 설계는 더 복잡한 경우가 많습니다. 따라서 비관적 잠금은 복잡한 시나리오에서 자주 사용됩니다. 먼저 정확성을 확보하고, 필요하다면 성과를 추구하세요.
낙관적 잠금을 구현하려면 하드웨어 지원이 필요한 경우가 많습니다. 대부분의 프로세서는 "비교 및 교환" 의미를 구현하기 위해 CAS 명령을 구현합니다. 여기서 스왑은 "스왑 인"입니다. 설정)은 기본적인 낙관적 잠금을 구성합니다. CAS에는 3개의 피연산자가 포함되어 있습니다.
읽고 쓸 메모리 위치 V
비교할 값 A
쓸 예정 새 값 B
위치 V의 값이 A와 같은 경우에만 CAS는 위치 V의 값을 새 값 B로 원자적으로 업데이트합니다. 그렇지 않으면 작업이 수행되지 않습니다. 위치 V의 값이 A와 같은지 여부에 관계없이 V의 원래 값이 반환됩니다. 흥미로운 사실은 "동시성을 제어하기 위해 CAS를 사용하는 것"이 "낙관적 잠금을 사용하는 것"과 동일하지 않다는 것입니다. CAS는 낙관적 잠금과 비관적 잠금을 모두 달성하기 위한 수단일 뿐입니다. 낙관주의와 비관주의는 단지 동시성 제어 전략일 뿐입니다.
위 내용은 Java 멀티스레딩 및 동시성 인터뷰 질문(1~3개 질문, 답변 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!