Disruptor是一個高效能的非同步處理框架,一個「生產者-消費者」模型。
RingBuffer是一種環形資料結構,包含一個指向下一個槽點的序號,可以在執行緒間傳遞數據。
在Disruptor框架中,生產者生產的資料叫做Event。
# 1.MyEvent:自訂對象,充當「生產者-消費者」模型中的資料。
2.MyEventFactory:實作EventFactory的接口,用於生產資料。
3.MyEventProducerWithTranslator:將資料儲存到自訂物件中並發布。
4.MyEventHandler:自訂消費者。
初次接觸Disruptor,認識停留在表面,零散,模糊,在此記一個簡單的示例,以便日後深入研究。
package com.disruptor.basic;public class LongEvent {private long value;public long getValue() {return value; }public void setValue(long value) {this.value = value; } }
package com.disruptor.basic;import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance() {// TODO Auto-generated method stubreturn new LongEvent(); } }
package com.disruptor.basic;import java.nio.ByteBuffer;import com.lmax.disruptor.EventTranslatorOneArg;import com.lmax.disruptor.RingBuffer;public class LongEventProducerWithTranslator {private final RingBuffer<LongEvent> ringBuffer;public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer; }private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {/** * event:包含有消费数据的对象; sequence:分配给目标对象的RingBuffer空间序号; * bb:包含有将要被存储到目标对象中的数据的容器 */public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {// TODO Auto-generated method stubevent.setValue(bb.getLong(0));// 将数据存储到目标对象中 } };public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb);// 发布,将数据推送给消费者 } }
package com.disruptor.basic;import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO Auto-generated method stubSystem.out.println("当前消费的数据="+event.getValue()); } }
package com.disruptor.basic;import java.nio.ByteBuffer;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.junit.Test;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.YieldingWaitStrategy;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;public class LongEventTest { @SuppressWarnings({ "unchecked", "deprecation" }) @Testpublic void test01() throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); EventFactory<LongEvent> factory = new LongEventFactory();int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,new YieldingWaitStrategy()); disruptor.handleEventsWith(new LongEventHandler()); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();// LongEventProducer producer = new// LongEventProducer(ringBuffer);LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8);// long startTime = System.currentTimeMillis();for (long a = 0; a < 100; a++) { bb.putLong(0, a); producer.onData(bb);/*if (a == 99) { long endTime = System.currentTimeMillis(); System.out.println("useTime=" + (endTime - startTime)); }*/Thread.sleep(100); }/*long endTime = System.currentTimeMillis(); System.out.println("useTime=" + (endTime - startTime));*/disruptor.shutdown(); executor.shutdown(); }/*@Test public void test02() { long startTime = System.currentTimeMillis(); for (long a = 0; a < 100; a++) { System.out.println(a); } long endTime = System.currentTimeMillis(); System.out.println("useTime=" + (endTime - startTime)); }*/}
以上是Disruptor的概述與使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!