Disruptor核心API和应用模式

Posted by LiuXi on 2018-06-13

上篇文章Disruptor框架简介主要介绍了Disruptor框架中的一些概念及实现方式

本篇文章主要针对Disruptor这个类的一些核心API进行说明,包括以下几个方面:

  • 创建
  • 事件处理
  • 启动
  • 事件发送
  • 异常处理
  • 关闭

同时本篇文章也会介绍Disruptor框架在实际应用中的一些模式,包括以下几个方面:

  • 生产者端:支持单生产者和多生产者
  • 消费者端:支持多线程并发消费、事件广播
  • 消费者依赖关系构造

注:Disruptor版本3.4.2

1 核心API

1.1 创建

Disruptor的创建是通过构造函数来实例化Disruptor对象的,构造函数如下:

Disruptor构造函数
1
2
3
4
5
6
7
8
9
10
11
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}

上面的构造函数会初始化Disruptor对象包含的两个核心对象:

  • RingBuffer:创建RingBuffer对象需要指定4个基本属性
    • producerType:生产者类型(单生产者还是多生产者,通过ProducerType枚举指定,不同的类型,生产者对应的Sequencer不同)
    • eventFactory:生成事件对象的工厂类
    • ringBufferSize:RingBuffer底层数组的大小
    • waitStrategy:消费者等待策略
  • BasicExecutor : 通过线程创建工厂threadFactory构造线程池,用于执行消费者事件处理逻辑

1.2 事件处理

消费者针对事件的处理,主要有以下两个核心方法:

  • handleEventsWith(EventProcessor[])
    入参是一个EventProcessor数组

    该方法会将每个EventProcessor封装成EventProcessorInfo,存储到Disruptor的消费者事件处理集合ConsumerRepository中

    注意:事件会被EventProcessor数组中的每一个处理器处理

  • handleEventsWith(EventHandler[])
    入参是一个EventHandler数组

    该方法会将每个EventHandler封装成EventProcessorInfo,存储到Disruptor的消费者事件处理集合ConsumerRepository中
    与上面的方法不同的是:这个方法通过EventHandler构造成BatchEventProcessor,从而支持消费者端的批处理

    注意:事件会被EventHandler数组中的每一个处理器处理

  • handleEventsWithWorkerPool(WorkHandler[])
    入参是一个WorkHandler数组

    该方法会将每一个WorkHandler封装成一个WorkProcessor,然后将所有的WorkProcessor封装到一个WorkerPool里面
    最后将这个WorkerPool封装成WorkerPoolInfo,存储到Disruptor的消费者事件处理集合ConsumerRepository中

    注意:事件会被WorkHandler数组中的一个WorkHandler处理

总结:

  • 前两个方法适用于一个事件多个不同的逻辑处理器并行处理
  • 第三个方法使用于一个事件只需要被一个事件处理器抢占处理

可类比MQ的消费模式
方法处理逻辑可参照源码

1.3 启动

在上述事件处理中只是将处理器封装成ConsumerRepository存储到Disruptor中,而启动方法才是真正的调度线程,执行这个逻辑处理器

Disruptor启动方法
1
2
3
4
5
6
7
8
9
10
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}

return ringBuffer;
}

从源码可看出Disruptor启动,会通过线程池异步执行所有的ConsumerInfo,就会启动EventProcessor执行线程(EventProcessor继承自Runnable

我们可以通过EventProcessor的子类BatchEventProcessorWorkProcessorrun方法可以看出

异步线程启动后的处理逻辑就是循环获取RingBuffer的可处理序列,若可获取到序列,则处理事件;否则根据等待策略等待。

1.4 事件发送

生产者发送事件主要是调用publishEvent(EventTranslator)实现的,该方法封装了RingBuffer节点申请、事件初始化和RingBuffer序列提交

EventTranslator支持无参EventTranslator、一个参数EventTranslatorOneArg、两个参数EventTranslatorTwoArg、三个参数EventTranslatorThreeArg的多种版本

EventTranslator只处理事件对象的加工,而事件对象的创建(创建Disruptor对象时候提供的事件工厂类)和获取由Disruptor框架封装

可参照官方Getting Start – using-version-3-translators

1.5 异常处理

Disruptor框架中,消费者异步线程处理事件属于异常高发地段,那么Disruptor框架是如何处理和传递异常的呢?消费者事件处理异常是否回造成序列的消费失败,从而卡死RingBuffer环呢?

异常处理接口声明ExceptionHandler

下面来了解下Disruptor处理异常的方式。

Disruptor中声明了异常处理的接口ExceptionHandler,包含三个方法:

  • handleEventException:事件消费异常处理(重点关注)
  • handleOnStartException: 启动异常处理
  • handleOnShutdownException : 关闭异常处理

默认的异常处理类FatalExceptionHandler

我们再回过头开看看Disruptor这个类:

  • Disruptor里面包含了一个ExceptionHandlerWrapper类型的exceptionHandler对象,这是Disruptor异常处理的一个代理类
  • 真正的异常处理是由ExceptionHandlerWrapper这个代理类的目标对象delegate来实现的,其默认值为FatalExceptionHandler

至此我们知道*Disruptor的异常默认是由FatalExceptionHandler类实现的*。

看下FatalExceptionHandlerhandleEventException方法,消费者处理事件异常,异常处理器会抛出运行时异常。

异常处理的工作机制

从上面可知Disruptor对象创建的时候就初始化了异常处理代理类ExceptionHandlerWrapper

下面我们看下,在消费者处理事件的时候,异常处理类是如何生效的

1.2 事件处理1.3 启动两部分可知,事件处理主要是封装EventProcessor,而启动时候是将EventProcessor放在线程池中异步执行

那么还是回到EventProcessor线程执行的run方法

由于EventProcessor继承自Runnable,所以执行时候就看run方法

下面以EventProcessor的子类BatchEventProcessorrun方法为例来看看异常处理类的作用

BatchEventProcessorrun方法调用processEvents方法处理事件

BatchEventProcessor异常处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;

while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}

while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}

sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}

从上面源码可看出,在消费者处理事件的时候,会捕获异常,并交给exceptionHandler处理

从源码也能看出,默认的异常处理类FatalExceptionHandler在事件消费异常时候,会抛出运行时异常,那么当前sequence并不能被正常消费,这可能会造成RingBuffer环的卡死(无法消费,那么生产者也无法发送事件)

自定义处理异常

自定义异常其实就是实现ExceptionHandler接口,然后通过DisruptorsetDefaultExceptionHandler来设置到异常处理代理类ExceptionHandlerWrapper中,从而替换掉默认异常处理类

自定义异常并实现消费异常处理方法handleEventException可以根据实际应用场景来处理(打印异常、设置标志位等等),避免RingBuffer卡死的情形

自定义异常是为了让使用者掌握主动权,根据自己的实际场景来处理异常,在避免潜在问题的前提下,满足业务场景

Disruptor中提供了三个方法来设置自定义的异常处理类:

  • setDefaultExceptionHandler:对所有的EventHandlerWorkerPool都有效
  • handleExceptionsFor:对指定的EventHandler设置异常处理类
  • handleExceptionsWith: 对该方法被调用之后设置的EventHandler有效(这种方式使用起来容易出错,已废弃)
    可以参照Disruptor issue

1.6 关闭

Disruptor提供了关闭方法shutdown,该方法会等待所有的事件都被处理完成之后,再停止所有的事件处理器EventProcessor.halt

注意:

  • 该方法不会关闭消费者执行线程池也不会等待消费者执行线程完毕
  • 该方法被调用的时候,所有的事件发送需要都已经完成

2 应用模式

在开篇提到,Disruptor框架在实际应用中有多种模式,包括以下几个方面:

  • 生产者端:支持单生产者和多生产者
  • 消费者端:支持事件广播、多线程并发消费
  • 消费者依赖关系构造

生产者端

对于生产者来说,Disruptor框架支持单生产者也支持多生产者,其不同支持就是在创建Disruptor对象的时候,指定不同的ProducerType

消费者端

消费者端主要说明下事件广播和多线程并发消费

  • 事件广播

    事件广播其实就是针对一个事件,会有多个处理器来处理,多个处理器完成不同的功能,且互不影响

    1.2 事件处理中可知,这种情形调用handleEventsWith(EventHandler[])即可实现

  • 多线程并发消费

    多线程并发消费是指针对一个事件,事件会被提交到多个处理器组成的线程池执行,这个事件只会被一个线程执行。

    1.2 事件处理中可知,这种情形调用handleEventsWithWorkerPool(WorkHandler[])即可实现

    对于事件处理的一个特定逻辑,我们创建多个WorkHandler实例,即多个WorkHandler组成线程池,但是逻辑一致,从而实现多线程并发消费

消费者依赖关系构造

消费者的典型场景就是菱形结构,如下图(图片来自Dissecting the Disruptor: Wiring up the dependencies):

如上图:一个生产者P1,三个消费者C1、C2、C3

  • C1 和 C2 直接依赖RingBuffer,获取事件处理
  • C3也是直接依赖RingBuffer,但同时也直接依赖C1、C2先对数据处理完成,再从RingBuffer中获取事件处理
  • P1在发送事件的时候,依赖三个消费者的处理进度,即依赖C1、C2、C3三者的序列
  • C1、C2直接依赖生产者的进度来消费数据,即依赖RingBuffer的序列
  • C3不仅仅依赖RingBuffer的序列,同时还要依赖C1和C2的序列(例如:C1消费到了序列7, C2消费到了9,那么C3只能处理7之前的事件)

下面是这个接口的一个示例代码:

Disruptor菱形结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConsumerDependencyMain {
public static void main(String[] args) {
ThreadFactory productThreadFactory = generateThreadFactory("pool-");
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, 4, productThreadFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());

EventHandler<LongEvent> c1 = (event, sequence, endOfBatch) -> System.out.println(String.format("c1 handle sequence %s in %s", sequence, Thread.currentThread().getName()));
EventHandler<LongEvent> c2 = (event, sequence, endOfBatch) -> System.out.println(String.format("c2 handle sequence %s in %s", sequence, Thread.currentThread().getName()));
EventHandler<LongEvent> c3 = (event, sequence, endOfBatch) -> System.out.println(String.format("c3 handle sequence %s in %s", sequence, Thread.currentThread().getName()));
disruptor.handleEventsWith(c1, c2).then(c3);

disruptor.start();

IntStream.range(0, 20).forEach( i ->
disruptor.publishEvent((event, sequence) -> {
event.setValue(sequence);
System.out.println(String.format("p1 publish sequence %s in %s", sequence, Thread.currentThread().getName()));
})
);

disruptor.shutdown();
}

private static ThreadFactory generateThreadFactory(String threadNamePrefix) {
return new ThreadFactory() {
private volatile AtomicInteger counter = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, threadNamePrefix + counter.getAndIncrement());
}
};
}
}

其中自定义事件类LongEvent包含一个value字段

在多消费者场景下,可能c3会依赖c1和c2的处理结果,而c1和c2可能并发处理同一个事件对象,此时可能会产生并发写的情况
这种情况下,建议不同的消费者回写事件对象的不同字段来规避并发写的情况