首頁 >Java >java教程 >Disruptor的概述與使用

Disruptor的概述與使用

零下一度
零下一度原創
2017-06-27 10:25:532873瀏覽

 

一概述

1.Disruptor

Disruptor是一個高效能的非同步處理框架,一個「生產者-消費者」模型。

2.RingBuffer

RingBuffer是一種環形資料結構,包含一個指向下一個槽點的序號,可以在執行緒間傳遞數據。

3.Event

在Disruptor框架中,生產者生產的資料叫做Event。

二 Disruptor框架基本上構成

# 1.MyEvent:自訂對象,充當「生產者-消費者」模型中的資料。
  2.MyEventFactory:實作EventFactory的接口,用於生產資料。
  3.MyEventProducerWithTranslator:將資料儲存到自訂物件中並發布。
  4.MyEventHandler:自訂消費者。

三Demo

 初次接觸Disruptor,認識停留在表面,零散,模糊,在此記一個簡單的示例,以便日後深入研究。

1.自訂資料類別 

package com.disruptor.basic;public class LongEvent {private long value;public long getValue() {return value;
    }public void setValue(long value) {this.value = value;
    }

}

2.資料生產工廠(創建資料類別物件)

package com.disruptor.basic;import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance() {// TODO Auto-generated method stubreturn new LongEvent();
    }

}

3.資料來源(初始化資料物件並發布)

package com.disruptor.basic;import java.nio.ByteBuffer;import com.lmax.disruptor.EventTranslatorOneArg;import com.lmax.disruptor.RingBuffer;public class LongEventProducerWithTranslator {private final RingBuffer<LongEvent> ringBuffer;public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer;
    }private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {/** * event:包含有消费数据的对象; sequence:分配给目标对象的RingBuffer空间序号;
         * bb:包含有将要被存储到目标对象中的数据的容器         */public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {// TODO Auto-generated method stubevent.setValue(bb.getLong(0));// 将数据存储到目标对象中        }
    };public void onData(ByteBuffer bb) {
        ringBuffer.publishEvent(TRANSLATOR, bb);// 发布,将数据推送给消费者    }

}

4.消費者

package com.disruptor.basic;import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO Auto-generated method stubSystem.out.println("当前消费的数据="+event.getValue());
    }

}

5.測試類別

package com.disruptor.basic;import java.nio.ByteBuffer;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.junit.Test;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.YieldingWaitStrategy;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;public class LongEventTest {

    @SuppressWarnings({ "unchecked", "deprecation" })
    @Testpublic void test01() throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        EventFactory<LongEvent> factory = new LongEventFactory();int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,new YieldingWaitStrategy());
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();// LongEventProducer producer = new// LongEventProducer(ringBuffer);LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);// long startTime = System.currentTimeMillis();for (long a = 0; a < 100; a++) {
            bb.putLong(0, a);
            producer.onData(bb);/*if (a == 99) {
                long endTime = System.currentTimeMillis();
                System.out.println("useTime=" + (endTime - startTime));
            }*/Thread.sleep(100);
        }/*long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));*/disruptor.shutdown();
        executor.shutdown();
    }/*@Test
    public void test02() {
        long startTime = System.currentTimeMillis();
        for (long a = 0; a < 100; a++) {
            System.out.println(a);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));
    }*/}

 

以上是Disruptor的概述與使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn