从零开始实现lmax-Disruptor队列(三)多线程消费者WorkerPool原理解析

Java59

在v2版本的MyDisruptor实现多消费者、消费者组间依赖功能后。按照计划,v3版本的MyDisruptor需要支持多线程消费者的功能。

由于这篇文章是一系列博客的一部分,我们需要了解前一篇博客的内容,以便更好地理解这篇博客。

[En]

Since the article is part of a series of blogs, we need to know the content of the previous blog in order to better understand this blog.

  • 之前的版本中我们已经实现了单线程消费者串行的消费,但在某些场景下我们需要更快的消费速度,所以disruptor也提供了多线程的消费者机制。
  • 多线程消费者对外功能上和单线程消费者基本一样,也是全量的消费从序列0到序列N的完整事件,但内部却是局部并行乱序消费的。在一定的范围内,具体哪个线程消费哪个事件是通过CAS争抢随机获得的。
    从零开始实现lmax-Disruptor队列(三)多线程消费者WorkerPool原理解析

  • disruptor中多线程消费者的载体是WorkerPool。

  • 在V3版本的MyDisruptor中,MyWorkerPool和单线程消费者MyBatchEventProcessor一样,构造函数都是传入三个关键组件:RingBuffer、序列屏障mySequenceBarrier和用户自定义的事件处理器。
  • 和单线程消费者不同,多线程消费者允许传入一个用户自定义的事件处理器MyWorkHandler集合。传入的每个MyWorkHandler都会创建一个MyWorkProcessor对象将其封装、包裹起来(下文会展开介绍MyWorkProcessor)。
  • 虽然同为用户自定义的消费处理器接口,disruptor中WorkHandler和单线程消费者中传入的EventHandler有些不一样。其消费处理接口只传入了事件对象本身,并没有sequence和endOfBatch参数。
    主要原因是因为多线程消费者内部的消费是并行、乱序的,因此sequence序列号意义不大,且endOfBatch也无法准确定义。
  • WorkerPool对外提供了一个用于启动消费者的方法start,要求外部传入一个juc下的Executor实现用于启动所有的MyWorkProcessor任务。

```java
/*
* 多线程消费者(仿Disruptor.WorkerPool)
*
/
public class MyWorkerPool {

输入验证码查看隐藏内容

扫描二维码关注本站微信公众号 Johngo学长
或者在微信里搜索 Johngo学长
回复 svip 获取验证码
wechat Johngo学长