在v3版本的MyDisruptor实现多线程消费者后。按照计划,v4版本的MyDisruptor需要支持线程安全的多线程生产者功能。
由于这篇文章是一系列博客的一部分,我们需要了解前一篇博客的内容,以便更好地理解这篇博客。
[En]
Since the article is part of a series of blogs, we need to know the content of the previous blog in order to better understand this blog.
- 之前的v3版本实现了多线程消费者,提供并发消费的能力以加速消费速度。同理,disruptor也提供了多线程生产者机制以支持更快的生产速度。
- disruptor的多线程生产者机制,其本质是提供了一个线程安全的生产者序列器。
线程安全的生产者序列器允许多个线程并发的通过next方法申请可用的生产序列号和publish发布序列号,内部通过cas机制保证每个生产者线程拿到的序列号是独一无二的。 - disruptor的多线程生产者中通过AvailableBuffer数组机制,巧妙地避免了多个线程发布时并发的修改可用的最大生产者序列。
不是根据单个Sequence对象来标识,而是通过整个数组来标识最大的可用生产者序列,每个生产者线程在发布时只会更新自己的下标值,并且不会出现覆盖竞争。[En]
Instead of identifying according to a single sequence object, but through an entire array to identify the largest available producer sequence, each producer thread will only update its own subscript value when publishing, and there will be no overwriting competition.
在开始介绍disruptor的实现方式之前,可以站在设计者的角度先大致思考一下如何设计一个线程安全的生产者序列器(其功能、使用方法最好和单线程生产者序列器保持一致)。
-
可以参考多线程消费者,在next方法中通过cas的争抢来实现。
-
disruptor的生产者生产时是分为两个阶段的,首先通过next方法获取可用的序列号,然后通过publish发布序列号,令生产完成的序列号对消费者可见,消费者监听到生产者序列号的变化便会进行对应的消费。
-
举个例子,当前生产者已成功发布了序列号11,线程a通过next方法获取到了序列号12,线程b获取到了13,线程c获取到了14。此时线程c生产完毕后,如果按照常规的思路直接更新当前生产者序列为14的话是不行的。
因为这样消费者会认为14以及之前的12、13都已经发布完成,会错误的消费实际还未完成生产的序列号为12、13的事件。 -
上述情况下,如果引入最小生产者序列号机制,那么虽然线程c生产完了序列14的事件,但对外可见的最小生产者序列号依然是11,不会有问题。