您当前的位置: 首页 > 

并发编程系列教程(12) - Disruptor框架

杨林伟 发布时间:2019-10-25 17:49:03 ,浏览量:2

代码已上传到Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/Java-ThreadDemo

1. 什么是Disruptor

Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。

Disruptor 它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。

Disruptor 是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。

在使用之前,首先说明disruptor主要功能加以说明,你可以理解为他是一种高效的"生产者-消费者"模型。也就性能远远高于传统的BlockingQueue容器。

在JDK的多线程与并发库一文中, 提到了BlockingQueue实现了生产者-消费者模型 BlockingQueue是基于锁实现的, 而锁的效率通常较低. 有没有使用CAS机制实现的生产者-消费者。

Disruptor就是这样,使用观察者模式, 主动将消息发送给消费者, 而不是等消费者从队列中取; 在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue。

2. Disruptor实现特征

另一个关键的实现低延迟的细节就是在Disruptor中利用无锁的算法,所有内存的可见性和正确性都是利用内存屏障或者CAS操作。使用CAS来保证多线程安全,与大部分并发队列使用的锁相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不必像锁一样需要操作系统提供支持,所以每次调用不需要在用户态与内核态之间切换,也不需要上下文切换。

只有一个用例中锁是必须的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的实现方法就是使用Condition实现消费者在新事件到来前等待。许多低延迟系统使用忙等待去避免Condition的抖动,然而在系统忙等待的操作中,性能可能会显著降低,尤其是在CPU资源严重受限的情况下,例如虚拟环境下的WEB服务器。

3. Disruptor实现生产与消费者

1.项目中添加依赖:


	
		com.lmax
		disruptor
		3.2.1
	

2.首先声明一个Event来包含需要传递的数据

public class LongEvent {
    private Long value;

    public Long getValue() {
        return value;
    }

    public void setValue(Long value) {
        this.value = value;
    }
}

3.需要让Disruptor为我们创建事件,我们同时还声明了一个EventFactory来实例化Event对象。

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory {
    @Override
    public LongEvent newInstance() {
        return  new LongEvent();
    }
}

4.事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端。

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("消费者:"+event.getValue());
    }

}

5.定义生产者发送事件

import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;

public class LongEventProducer {
    
    public final RingBuffer ringBuffer;

    public LongEventProducer(RingBuffer ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 1.ringBuffer 事件队列 下一个槽
        long sequence = ringBuffer.next();
        Long data = null;
        
        try {
            //2.取出空的事件队列
            LongEvent longEvent = ringBuffer.get(sequence);
            data = byteBuffer.getLong(0);
            
            //3.获取事件队列传递的数据
            longEvent.setValue(data);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } finally {
            System.out.println("生产这准备发送数据");
            //4.发布事件
            ringBuffer.publish(sequence);

        }
    }
}

6.main函数执行调用

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DisruptorDemo {

    public static void main(String[] args) {

        // 1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理
        ExecutorService executor = Executors.newCachedThreadPool();

        // 2.创建工厂
        EventFactory eventFactory = new LongEventFactory();

        // 3.创建ringBuffer 大小
        int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方

        // 4.创建Disruptor
        Disruptor disruptor = new Disruptor(eventFactory, ringBufferSize, executor,
                ProducerType.SINGLE, new YieldingWaitStrategy());

        // 5.连接消费端方法
        disruptor.handleEventsWith(new LongEventHandler());

        // 6.启动
        disruptor.start();

        // 7.创建RingBuffer容器
        RingBuffer ringBuffer = disruptor.getRingBuffer();

        // 8.创建生产者
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        // 9.指定缓冲区大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 1; i             
关注
打赏
1688896170
查看更多评论
0.4802s