本篇文章主要介紹了java多執行緒訊息佇列的實作程式碼,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟著小編過來看看吧
本文介紹了java多執行緒訊息佇列的實作程式碼,分享給大家,希望對大家有幫助,順便也自己留個筆記
#1、定義一個佇列快取池:
//static修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被类的所有实例共享。 private static List<Queue> queueCache = new LinkedList<Queue>();
2、定義佇列緩衝池最大訊息數,如果達到該值,那麼佇列檢入會等待檢出低於該值時繼續進行。
private Integer offerMaxQueue = 2000;
3、定義檢出線程,如果佇列緩衝池沒有訊息,那麼檢出線程等待中
new Thread(){ public void run(){ while(true){ String ip = null; try { synchronized (queueCache) { Integer size = queueCache.size(); if(size==0){ //队列缓存池没有消息,等待。。。。 queueCache.wait(); } Queue queue = queueCache.remove(0); if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理 queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理, continue; }else{ ;//这里是处理该消息的操作。 } size = queueCache.size(); if(size<offerMaxQueue&&size>=0){ queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。 } } } catch (Exception e) { e.printStackTrace(); }finally{ try {//检出该消息队列的锁 unIpLock(queueStr); } catch (Execption e) {//捕获异常,不能让线程挂掉 e.printStackTrace(); } } } }.start();
4、檢入佇列
synchronized (queueCache) { while(true){ Integer size = queueCache.size(); if(size>=offerMaxQueue){ try { queueCache.wait(); continue;//继续执行等待中的检入任务。 } catch (InterruptedException e) { e.printStackTrace(); } }//IF if(size<=offerMaxQueue&&size>0){ queueCache.notifyAll(); } break;//检入完毕 }//while }
5、鎖定方法實作
/** * 锁 * @param ip * @return * @throws */ public Boolean isLock(String queueStr) { return this.redisManager.setnx(queueStr+"_lock", "LOCK", 10000)!=1; } //解锁 public void unIpLock(String queueStr) { if(ip!=null){ this.redisManager.del(queueStr+"_lock"); // lock.unlock(); } }
以上是Java中關於如何實作多執行緒訊息佇列的實例的詳細內容。更多資訊請關注PHP中文網其他相關文章!