1、DeplayQueue延时无界阻塞队列
在谈到DelayQueue的使用和原理的时候,我们首先介绍一下DelayQueue,DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。 (推荐学习:java面试题目)
DelayQueue阻塞队列在我们系统开发中也常常会用到,例如:缓存系统的设计,缓存中的对象,超过了空闲时间,需要从缓存中移出;任务调度系统,能够准确的把握任务的执行时间。我们可能需要通过线程处理很多时间上要求很严格的数据。
如果使用普通的线程,我们就需要遍历所有的对象,一个一个的检查看数据是否过期等,首先这样在执行上的效率不会太高,其次就是这种设计的风格也大大的影响了数据的精度。一个需要12:00点执行的任务可能12:01才执行,这样对数据要求很高的系统有更大的弊端。由此我们可以使用DelayQueue。
下面将会对DelayQueue做一个介绍,然后举个例子。并且提供一个Delayed接口的实现和Sample代码。DelayQueue是一个BlockingQueue,其特化的参数是Delayed。
(不了解BlockingQueue的同学,先去了解BlockingQueue再看本文)Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。
DelayQueue=BlockingQueue+PriorityQueue+Delayed
DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。
他们的基本定义如下
public interface Comparable<T> { public int compareTo(T o); } public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); } public class DelayQueue<E extends Delayed> implements BlockingQueue<E> { private final PriorityQueue<E> q = new PriorityQueue<E>(); }
DelayQueue 内部的实现使用了一个优先队列。当调用 DelayQueue 的 offer 方法时,把 Delayed 对象加入到优先队列 q 中。如下:
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(e); if (first == null || e.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } }
DelayQueue 的 take 方法,把优先队列 q 的 first 拿出来(peek),如果没有达到延时阀值,则进行 await处理。如下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); //wake up other takers return x; } } } } finally { lock.unlock(); } }
DelayQueue 实例应用
Ps:为了具有调用行为,存放到 DelayDeque 的元素必须继承 Delayed 接口。Delayed 接口使对象成为延迟对象,它使存放在 DelayQueue 类中的对象具有了激活日期。该接口强制执行下列两个方法。
以下将使用 Delay 做一个缓存的实现。其中共包括三个类Pair、DelayItem、Cache
Pair 类:
public class Pair<K, V> { public K first; public V second; public Pair() { } public Pair(K first, V second) { this.first = first; this.second = second; } }
以下是对 Delay 接口的实现:
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class DelayItem<T> implements Delayed { /** * Base of nanosecond timings, to avoid wrapping */ private static final long NANO_ORIGIN = System.nanoTime(); /** * Returns nanosecond time offset by origin */ final static long now() { return System.nanoTime() - NANO_ORIGIN; } /** * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied * entries. */ private static final AtomicLong sequencer = new AtomicLong(0); /** * Sequence number to break ties FIFO */ private final long sequenceNumber; /** * The time the task is enabled to execute in nanoTime units */ private final long time; private final T item; public DelayItem(T submit, long timeout) { this.time = now() + timeout; this.item = submit; this.sequenceNumber = sequencer.getAndIncrement(); } public T getItem() { return this.item; } public long getDelay(TimeUnit unit) { long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); return d; } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof DelayItem) { DelayItem x = (DelayItem) other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ?0 :((d < 0) ?-1 :1); } }
以下是 Cache 的实现,包括了 put 和 get 方法
import javafx.util.Pair; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; public class Cache<K, V> { private static final Logger LOG = Logger.getLogger(Cache.class.getName()); private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>(); private DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>(); private Thread daemonThread; public Cache() { Runnable daemonTask = new Runnable() { public void run() { daemonCheck(); } }; daemonThread = new Thread(daemonTask); daemonThread.setDaemon(true); daemonThread.setName("Cache Daemon"); daemonThread.start(); } private void daemonCheck() { if (LOG.isLoggable(Level.INFO)) LOG.info("cache service started."); for (; ; ) { try { DelayItem<Pair<K, V>> delayItem = q.take(); if (delayItem != null) { // 超时对象处理 Pair<K, V> pair = delayItem.getItem(); cacheObjMap.remove(pair.first, pair.second); // compare and remove } } catch (InterruptedException e) { if (LOG.isLoggable(Level.SEVERE)) LOG.log(Level.SEVERE, e.getMessage(), e); break; } } if (LOG.isLoggable(Level.INFO)) LOG.info("cache service stopped."); } // 添加缓存对象 public void put(K key, V value, long time, TimeUnit unit) { V oldValue = cacheObjMap.put(key, value); if (oldValue != null) q.remove(key); long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit); q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime)); } public V get(K key) { return cacheObjMap.get(key); } }
测试 main 方法:
// 测试入口函数 public static void main(String[] args) throws Exception { Cache<Integer, String> cache = new Cache<Integer, String>(); cache.put(1, "aaaa", 3, TimeUnit.SECONDS); Thread.sleep(1000 * 2); { String str = cache.get(1); System.out.println(str); } Thread.sleep(1000 * 2); { String str = cache.get(1); System.out.println(str); } }
输出结果为:
aaaa null
我们看到上面的结果,如果超过延时的时间,那么缓存中数据就会自动丢失,获得就为 null。
2、并发(Collection)队列-非阻塞队列
非阻塞队列
首先我们要简单的理解下什么是非阻塞队列:
与阻塞队列相反,非阻塞队列的执行并不会被阻塞,无论是消费者的出队,还是生产者的入队。在底层,非阻塞队列使用的是 CAS(compare and swap)来实现线程执行的非阻塞。
非阻塞队列简单操作
与阻塞队列相同,非阻塞队列中的常用方法,也是出队和入队。
offer():Queue 接口继承下来的方法,实现队列的入队操作,不会阻碍线程的执行,插入成功返回 true; 出队方法:
poll():移动头结点指针,返回头结点元素,并将头结点元素出队;队列为空,则返回 null;
peek():移动头结点指针,返回头结点元素,并不会将头结点元素出队;队列为空,则返回 null;
3、非阻塞算法CAS
首先我们需要了解悲观锁和乐观锁
悲观锁:假定并发环境是悲观的,如果发生并发冲突,就会破坏一致性,所以要通过独占锁彻底禁止冲突发生。有一个经典比喻,“如果你不锁门,那么捣蛋鬼就回闯入并搞得一团糟”,所以“你只能一次打开门放进一个人,才能时刻盯紧他”。
乐观锁:假定并发环境是乐观的,即虽然会有并发冲突,但冲突可发现且不会造成损害,所以,可以不加任何保护,等发现并发冲突后再决定放弃操作还是重试。可类比的比喻为,“如果你不锁门,那么虽然捣蛋鬼会闯入,但他们一旦打算破坏你就能知道”,所以“你大可以放进所有人,等发现他们想破坏的时候再做决定”。
通常认为乐观锁的性能比悲观所更高,特别是在某些复杂的场景。这主要由于悲观锁在加锁的同时,也会把某些不会造成破坏的操作保护起来;而乐观锁的竞争则只发生在最小的并发冲突处,如果用悲观锁来理解,就是“锁的粒度最小”。但乐观锁的设计往往比较复杂,因此,复杂场景下还是多用悲观锁。首先保证正确性,有必要的话,再去追求性能。
乐观锁的实现往往需要硬件的支持,多数处理器都都实现了一个CAS指令,实现“Compare And Swap”的语义(这里的swap是“换入”,也就是set),构成了基本的乐观锁。CAS包含3个操作数:
需要读写的内存位置V
进行比较的值A
拟写入的新值B
当且仅当位置V的值等于A时,CAS才会通过原子方式用新值B来更新位置V的值;否则不会执行任何操作。无论位置V的值是否等于A,都将返回V原有的值。一个有意思的事实是,“使用CAS控制并发”与“使用乐观锁”并不等价。CAS只是一种手段,既可以实现乐观锁,也可以实现悲观锁。乐观、悲观只是一种并发控制的策略。
Atas ialah kandungan terperinci java多线程和并发面试题目(1~3题,附答案). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Versi Mac WebStorm
Alat pembangunan JavaScript yang berguna

MantisBT
Mantis ialah alat pengesan kecacatan berasaskan web yang mudah digunakan yang direka untuk membantu dalam pengesanan kecacatan produk. Ia memerlukan PHP, MySQL dan pelayan web. Lihat perkhidmatan demo dan pengehosan kami.

SublimeText3 Linux versi baharu
SublimeText3 Linux versi terkini

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma