|
| 1 | +>最近工作中参与了一个随机数分发平台的设计,考虑如何才能实现该平台的高并发性能,在技术实现选型中首先参考了百度的[uid-generator](https://github.com/baidu/uid-generator),其采用了双`RingBuffer`的实现形式,估计uid-generator的双`RingBuffer`也是借鉴了Disruptor的实现思想吧。因此,本系列文章我们一起来探究学习下2011年获得了Duke’s 程序框架创新奖的`Disruptor`框架。 |
| 2 | +
|
| 3 | + |
| 4 | +# 1 前言 |
| 5 | +Martin Fowler在自己网站上写了一篇LMAX架构的文章,LMAX是一种运行在JVM平台上的新型零售金融交易平台,该平台能够以很低的延迟产生大量交易,大量交易是多少呢?单个线程达到了每秒处理6百万订单的TPS,虽然业务逻辑是纯内存操作,但每秒处理6百万订单的TPS已经高的惊人了。那么,是什么支撑了LMAX单个线程能达到每秒处理6百万订单呢?答案就是`Disruptor`。 |
| 6 | + |
| 7 | +`Disruptor`是一个开源的并发框架,其于2011年获得了Duke’s 程序框架创新奖,采用事件源驱动方式,能够在无锁的情况下实现网络的Queue并发操作。 |
| 8 | + |
| 9 | +# 2 Disruptor框架简介 |
| 10 | + |
| 11 | +`Disruptor`框架内部核心的数据结构是`Ring Buffer`,`Ring Buffer`是一个环形的数组,`Disruptor`框架以`Ring Buffer`为核心实现了异步事件处理的高性能架构;JDK的`BlockingQueue`相信大家都用过,其是一个阻塞队列,内部通过锁机制实现生产者和消费者之间线程的同步。跟`BlockingQueue`一样,`Disruptor`框架也是围绕`Ring Buffer`实现生产者和消费者之间数据的交换,只不过`Disruptor`框架性能更高,笔者曾经在同样的环境下拿`Disruptor`框架跟`ArrayBlockingQueue`做过性能测试,`Disruptor`框架处理数据的性能比`ArrayBlockingQueue`的快几倍。 |
| 12 | + |
| 13 | +`Disruptor`框架性能为什么会更好呢?其有以下特点: |
| 14 | + |
| 15 | +1. 预加载内存可以理解为使用了内存池; |
| 16 | +2. 无锁化 |
| 17 | +3. 单线程写 |
| 18 | +4. 消除伪共享 |
| 19 | +5. 使用内存屏障 |
| 20 | +6. 序号栅栏机制 |
| 21 | + |
| 22 | +# 3 相关概念 |
| 23 | + |
| 24 | + |
| 25 | + |
| 26 | + |
| 27 | +**Disruptor**:是使用`Disruptor`框架的核心类,持有`RingBuffer`、消费者线程池、消费者集合`ConsumerRepository`和消费者异常处理器`ExceptionHandler`等引用; |
| 28 | + |
| 29 | +**Ring Buffer**: `RingBuffer`处于`Disruptor`框架的中心位置,其是一个环形数组,环形数组的对象采用预加载机制创建且能重用,是生产者和消费者之间交换数据的桥梁,其持有`Sequencer`的引用; |
| 30 | + |
| 31 | + |
| 32 | +**Sequencer**: `Sequencer`是`Disruptor`框架的核心,实现了所有并发算法,用于生产者和消费者之间快速、正确地传递数据,其有两个实现类`SingleProducerSequencer`和`MultiProducerSequencer`。 |
| 33 | + |
| 34 | +**Sequence**:`Sequence`被用来标识`Ring Buffer`和消费者`Event Processor`的处理进度,每个消费者`Event Processor`和`Ring Buffer`本身都分别维护了一个`Sequence`,支持并发操作和顺序写,其也通过填充缓存行的方式来消除伪共享从而提高性能。 |
| 35 | + |
| 36 | +**Sequence Barrier**:`Sequence Barrier`即为序号屏障,通过追踪生产者的`cursorSequence`和每个消费者(` EventProcessor`)的`sequence`的方式来协调生产者和消费者之间的数据交换进度,其实现类`ProcessingSequenceBarrier`持有的`WaitStrategy`等待策略类是实现序号屏障的核心。 |
| 37 | + |
| 38 | +**Wait Strategy**:`Wait Strategy`是决定消费者如何等待生产者的策略方式,当消费者消费速度过快时,此时是不是要让消费者等待下,此时消费者等待是通过锁的方式实现还是无锁的方式实现呢? |
| 39 | + |
| 40 | +**Event Processor**:`Event Processor`可以理解为消费者线程,该线程会一直从`Ring Buffer`获取数据来消费数据,其有两个核心实现类:`BatchEventProcessor`和`WorkProcessor`。 |
| 41 | + |
| 42 | +**Event Handler**:`Event Handler`可以理解为消费者实现业务逻辑的`Handler`,被`BatchEventProcessor`类引用,在`BatchEventProcessor`线程的死循环中不断从`Ring Buffer`获取数据供`Event Handler`消费。 |
| 43 | + |
| 44 | +**Producer**:生产者,一般用`RingBuffer.publishEvent`来生产数据。 |
| 45 | + |
| 46 | + |
| 47 | + |
| 48 | + |
| 49 | + |
| 50 | +# 4 入门DEMO |
| 51 | +```java |
| 52 | +// LongEvent.java |
| 53 | +public class LongEvent |
| 54 | +{ |
| 55 | + private long value; |
| 56 | + |
| 57 | + public void set(long value) |
| 58 | + { |
| 59 | + this.value = value; |
| 60 | + } |
| 61 | + |
| 62 | + public long get() { |
| 63 | + return this.value; |
| 64 | + } |
| 65 | +} |
| 66 | +``` |
| 67 | + |
| 68 | +```java |
| 69 | +// LongEventFactory.java |
| 70 | +public class LongEventFactory implements EventFactory<LongEvent> |
| 71 | +{ |
| 72 | + @Override |
| 73 | + public LongEvent newInstance() |
| 74 | + { |
| 75 | + return new LongEvent(); |
| 76 | + } |
| 77 | +} |
| 78 | +``` |
| 79 | + |
| 80 | +```java |
| 81 | +// LongEventHandler.java |
| 82 | +public class LongEventHandler implements EventHandler<LongEvent> |
| 83 | +{ |
| 84 | + @Override |
| 85 | + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) |
| 86 | + { |
| 87 | + System.out.println(new Date() + ":Event-" + event.get()); |
| 88 | + } |
| 89 | +} |
| 90 | +``` |
| 91 | + |
| 92 | +```java |
| 93 | +// LongEventTranslatorOneArg.java |
| 94 | +public class LongEventTranslatorOneArg implements EventTranslatorOneArg<LongEvent, ByteBuffer> { |
| 95 | + @Override |
| 96 | + public void translateTo(LongEvent event, long sequence, ByteBuffer buffer) { |
| 97 | + event.set(buffer.getLong(0)); |
| 98 | + } |
| 99 | +} |
| 100 | +``` |
| 101 | + |
| 102 | +```java |
| 103 | +// LongEventMain.java |
| 104 | +public class LongEventMain |
| 105 | +{ |
| 106 | + public static void main(String[] args) throws Exception |
| 107 | + { |
| 108 | + int bufferSize = 1024; |
| 109 | + final Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>( |
| 110 | + new LongEventFactory(), |
| 111 | + bufferSize, |
| 112 | + Executors.newSingleThreadExecutor(), |
| 113 | + ProducerType.SINGLE, |
| 114 | + new YieldingWaitStrategy() |
| 115 | + ); |
| 116 | + |
| 117 | + disruptor.handleEventsWith(new LongEventHandler()); |
| 118 | + disruptor.start(); |
| 119 | + |
| 120 | + |
| 121 | + RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); |
| 122 | + ByteBuffer bb = ByteBuffer.allocate(8); |
| 123 | + for (long l = 0; true; l++) |
| 124 | + { |
| 125 | + bb.putLong(0, l); |
| 126 | + ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb); |
| 127 | + Thread.sleep(1000); |
| 128 | + } |
| 129 | + } |
| 130 | +} |
| 131 | +``` |
| 132 | +输出结果: |
| 133 | + |
| 134 | + |
| 135 | + |
| 136 | + |
| 137 | + |
| 138 | +参考:https://lmax-exchange.github.io/disruptor/user-guide/index.html |
| 139 | + |
| 140 | + |
| 141 | + |
| 142 | +**若您觉得不错,请无情的转发和点赞吧!** |
| 143 | + |
| 144 | +【源码笔记】Github地址: |
| 145 | + |
| 146 | +https://github.com/yuanmabiji/Java-SourceCode-Blogs |
| 147 | + |
0 commit comments