首頁  >  文章  >  Java  >  分析Java中阻塞佇列的範例

分析Java中阻塞佇列的範例

PHPz
PHPz轉載
2023-05-09 21:43:061333瀏覽

    1. 什麼是阻塞佇列

     阻塞佇列是一種特殊的佇列,和資料結構中普通的佇列一樣,也遵守進階先出的原則同時,阻塞佇列是一種能確保執行緒安全的資料結構,並且具有以下兩種特性:當佇列滿的時候,繼續向佇列插入元素就會讓佇列阻塞,直到有其他執行緒從佇列中取走元素;當隊列為空的時候,繼續出隊列也會讓隊列阻塞,直到有其他線程往隊列中插入元素

    #補充:線程阻塞的意思是指此時不會被執行,即作業系統在此時不會把這個執行緒調度到CPU上去執行了

    2. 阻塞佇列的程式碼使用

    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.BlockingDeque;
    public class Test {
        public static void main(String[] args) throws InterruptedException {
            //不能直接newBlockingDeque,因为它是一个接口,要向上转型
            //LinkedBlockingDeque内部是基于链表方式来实现的
            BlockingDeque<String> queue=new LinkedBlockingDeque<>(10);//此处可以指定一个具体的数字,这里的的10代表队列的最大容量
            queue.put("hello");
            String elem=queue.take();
            System.out.println(elem);
            elem=queue.take();
            System.out.println(elem);
        }
    }

    注意: put方法帶有阻塞功能,但是offer不具有,所以一般用put方法(能使用offer方法的原因是BlockingDeque繼承了Queue

    分析Java中阻塞佇列的範例

    ##列印結果如上所示,當列印了hello後,佇列為空,程式碼執行到
    elem=queue.take();就不會繼續往下執行了,此時執行緒進入阻塞等待狀態,什麼也不會列印了,直到有其他執行緒給佇列中放入新的元素為止

    3. 生產者消費者模型

    生產者消費者模型是在伺服器開發和後端開發中比較常用的程式設計手段,一般用於解耦合和削峰填谷。

    高耦合度:兩個程式碼模組的關聯關係比較高

    高內聚:一個程式碼模組內各個元素彼此結合的緊密
    因此,我們一般追求高內聚低耦合,這樣會加快執行效率,而使用生產者消費者模型就可以解耦合

    (1)應用一:解耦合

    我們以實際生活中的情況為例,這裡有兩台伺服器:A伺服器和B伺服器,當A伺服器傳輸資料給B時,要是直接傳輸的話,那麼不是A向B推送數據,就是B從A中拉取數據,都是需要A和B直接交互,所以A和B存在依賴關係(A和B的耦合度比較高)。未來如果伺服器需要擴展,例如增加C伺服器,讓A給C傳數據,那麼改動就比較複雜,且會降低效率。這時我們可以加一個佇列,這個佇列為阻塞佇列,如果A把資料寫到佇列裡,B從中取,那麼佇列相當於是中轉站(或說交易場所),A相當於生產者(提供資料), B相當於消費者(接收資料),此時就構成了生產者消費者模型,這樣會讓程式碼耦合度更低,維護更方便,執行效率更高。

    分析Java中阻塞佇列的範例

    在電腦中,生產者充當其中一組線程,而消費者充當另一組線程,而交易場所就可以使用阻塞隊列了

    (2)應用二:削峰填谷

    分析Java中阻塞佇列的範例

    實際生活中

    在河道中大壩算是一個很重要的組成部分了,如果沒有大壩,大家試想一下結果:當汛期來臨後上游的水很大時,下游就會湧入大量的水發生水災讓莊稼被淹沒;而旱期的話下游的水會很少可能會引發旱災。若有大壩的話,汛期時大壩把多餘的水存到大壩中,關閘蓄水,讓上游的水按一定速率往下流,避免突然一波大雨把下游淹了,這樣下游不至於出現水災。旱期時水壩把之前儲存好的水放出來,還是讓讓水以一定速率往下流,避免下流太缺水,這樣既可以避免汛期發生洪澇又可以避免旱期發生旱災了。
    峰:相當於汛期
    谷:相當於旱期
    電腦中
    這樣的情況在電腦中也是很典型的,尤其是在伺服器開發中,網關通常會把網路中的請求轉發給業務伺服器,例如一些商品伺服器,使用者伺服器,商家伺服器(存放商家的資訊),直播伺服器。但因為網路過來的請求數量是多是少不可控,相當於上游的水,如果突然來了一大波請求,網關即使能扛得住,後續的很多伺服器收到很多請求也會崩潰(處理一個請求涉及到一系列的資料庫操作,因為資料庫相關操作效率本身比較低,這樣請求多了就處理不過來了,因此就會崩潰)

    分析Java中阻塞佇列的範例

    所以实际情况中网关和业务服务器之间往往用一个队列来缓冲,这个队列就是阻塞队列(交易场所),用这个队列来实现生产者(网关)消费者(业务服务器)模型,把请求缓存到队列中,后面的消费者(业务服务器)按照自己固定的速率去读请求。这样当请求很多时,虽然队列服务器可能会稍微受到一定压力,但能保证业务服务器的安全。

    (3)相关代码

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class TestDemo {
        public static void main(String[] args) {
            // 使用一个 BlockingQueue 作为交易场所
            BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
            // 此线程作为消费者
            Thread customer = new Thread() {
                @Override
                public void run() {
                    while (true) {
                        // 取队首元素
                        try {
                            Integer value = queue.take();
                            System.out.println("消费元素: " + value);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            customer.start();
            // 此线程作为生产者
            Thread producer = new Thread() {
                @Override
                public void run() {
                    for (int i = 1; i <= 10000; i++) {
                        System.out.println("生产了元素: " + i);
                        try {
                            queue.put(i);
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            producer.start();
            try {
                customer.join();
                producer.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    分析Java中阻塞佇列的範例

    打印如上(此代码是让生产者通过sleep每过1秒生产一个元素,而消费者不使用sleep,所以每当生产一个元素时,消费者都会立马消费一个元素)

    4.阻塞队列和生产者消费者模型功能的实现

    在学会如何使用BlockingQueue后,那么如何自己去实现一个呢?
    主要思路:

    • 1.利用数组

    • 2.head代表队头,tail代表队尾

    • 3.head和tail重合后到底是空的还是满的判断方法:专门定义一个size记录当前队列元素个数,入队列时size加1出队列时size减1,当size为0表示空,为数组最大长度就是满的(也可以浪费一个数组空间用head和tail重合表示空,用tail+1和head重合表示满,但此方法较为麻烦,上一个方法较为直观,因此我们使用上一个方法)

    public class Test2 {
        static class BlockingQueue {
        private int[] items = new int[1000];    // 此处的1000相当于队列的最大容量, 此处暂时不考虑扩容的问题.
        private int head = 0;//定义队头
        private int tail = 0;//定义队尾
        private int size = 0;//数组大小
        private Object locker = new Object();
    
        // put 用来入队列
        public void put(int item) throws InterruptedException {
            synchronized (locker) {
                while (size == items.length) {
                    // 队列已经满了,阻塞队列开始阻塞
                    locker.wait();
                }
                items[tail] = item;
                tail++;
                // 如果到达末尾, 就回到起始位置.
                if (tail >= items.length) {
                    tail = 0;
                }
                size++;
                locker.notify();
            }
        }
        // take 用来出队列
        public int take() throws InterruptedException {
            int ret = 0;
            synchronized (locker) {
                while (size == 0) {
                    // 对于阻塞队列来说, 如果队列为空, 再尝试取元素, 就要阻塞
                    locker.wait();
                }
                ret = items[head];
                head++;
                if (head >= items.length) {
                    head = 0;
                }
                size--;
                // 此处的notify 用来唤醒 put 中的 wait
                locker.notify();
            }
            return ret;
        }
    }
    
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue queue = new BlockingQueue();
            // 消费者线程
            Thread consumer = new Thread() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            int elem = queue.take();
                            System.out.println("消费元素: " + elem);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            consumer.start();
    
            // 生产者线程
            Thread producer = new Thread() {
                @Override
                public void run() {
                    for (int i = 1; i < 10000; i++) {
                        System.out.println("生产元素: " + i);
                        try {
                            queue.put(i);
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            producer.start();
            consumer.join();
            producer.join();
        }
    }

    分析Java中阻塞佇列的範例

    运行结果如上。
    注意:

    • 1.wait和notify的正确使用

    • 2.put和take都会产生阻塞情况,但阻塞条件是对立的,wait不会同时触发(put唤醒take阻塞,take唤醒put阻塞)

    以上是分析Java中阻塞佇列的範例的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述:
    本文轉載於:yisu.com。如有侵權,請聯絡admin@php.cn刪除