Home >Java >javaTutorial >Overview and use of Disruptor

Overview and use of Disruptor

零下一度
零下一度Original
2017-06-27 10:25:532873browse

Overview

1. Disruptor

Disruptor is a High-performance asynchronous processing framework, a "producer-consumer" model.

2.RingBuffer

RingBuffer is a ring data structure that contains a sequence number pointing to the next slot and can be passed between threads data.

3.Event

In the Disruptor framework, the data produced by the producer is called Event.

2 The basic composition of the Disruptor framework

1.MyEvent: Custom object, acting as data in the "producer-consumer" model.
2.MyEventFactory: Implements the interface of EventFactory and is used to produce data.
3.MyEventProducerWithTranslator: Store data in a custom object and publish it.
4.MyEventHandler: Custom consumer.

三Demo

When I first came into contact with Disruptor, my understanding was superficial, scattered and vague. I will write down a simple example here so that I can go deeper in the future. Research.

1. Customized data class

package com.disruptor.basic;public class LongEvent {private long value;public long getValue() {return value;
    }public void setValue(long value) {this.value = value;
    }

}

2. Data production factory (create Data class object)

package com.disruptor.basic;import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance() {// TODO Auto-generated method stubreturn new LongEvent();
    }

}

3. Data source (initialize data object and publish)

package com.disruptor.basic;import java.nio.ByteBuffer;import com.lmax.disruptor.EventTranslatorOneArg;import com.lmax.disruptor.RingBuffer;public class LongEventProducerWithTranslator {private final RingBuffer<LongEvent> ringBuffer;public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer;
    }private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {/** * event:包含有消费数据的对象; sequence:分配给目标对象的RingBuffer空间序号;
         * bb:包含有将要被存储到目标对象中的数据的容器         */public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {// TODO Auto-generated method stubevent.setValue(bb.getLong(0));// 将数据存储到目标对象中        }
    };public void onData(ByteBuffer bb) {
        ringBuffer.publishEvent(TRANSLATOR, bb);// 发布,将数据推送给消费者    }

}

4. Consumer

package com.disruptor.basic;import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO Auto-generated method stubSystem.out.println("当前消费的数据="+event.getValue());
    }

}

5. Test class

package com.disruptor.basic;import java.nio.ByteBuffer;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.junit.Test;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.YieldingWaitStrategy;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;public class LongEventTest {

    @SuppressWarnings({ "unchecked", "deprecation" })
    @Testpublic void test01() throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        EventFactory<LongEvent> factory = new LongEventFactory();int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,new YieldingWaitStrategy());
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();// LongEventProducer producer = new// LongEventProducer(ringBuffer);LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);// long startTime = System.currentTimeMillis();for (long a = 0; a < 100; a++) {
            bb.putLong(0, a);
            producer.onData(bb);/*if (a == 99) {
                long endTime = System.currentTimeMillis();
                System.out.println("useTime=" + (endTime - startTime));
            }*/Thread.sleep(100);
        }/*long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));*/disruptor.shutdown();
        executor.shutdown();
    }/*@Test
    public void test02() {
        long startTime = System.currentTimeMillis();
        for (long a = 0; a < 100; a++) {
            System.out.println(a);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));
    }*/}

The above is the detailed content of Overview and use of Disruptor. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn