读书笔记 - Java 多线程编程实战指南
第七章 Producer-Consumer(生产者-消费者)模式
模式介绍
- 数据的提供方可形象地称为数据的生产者,它“生产”了数据,而数据的加工方则相应地被称为消费者,它“消费”了数据。
- 实际上,生产者“生产”数据的速率和消费者“消费”数据的速率往往是不均衡的。为了避免数据的生产者和消费者中处理速率快的一方需要等待处理速率慢的一方,Producer-Consumer 模式通过在数据的生产者和消费者之间引入一个通道(Channel,暂时可以将其简单地理解为一个队列)对两者进行解藕(Decoupling):生产者将其“生产”的数据放入通道,消费者从相应通道中取出数据进行“消费”,生产者和消费者各自运行在各自的线程中,从而使双方处理速率互不影响。
- Producer-Consumer 模式可以看成设计模式的设计模式。许多模式都可以看成 Producer-Consumer 模式的一个实例。
-
模式架构
- Producer-Consumer 模式的核心是通过通道对数据(或任务)的生产者和消费者进行解藕,使二者不直接交互,从而使二者的处理速率相对来说互不影响。
-
实战案例
某内容管理系统需要支持对文档附件中的文件(格式包括 Word、PDF)进行全文索引(Full-text Search)。该系统中,附件会被上传到专用的文件服务器上,对附件进行全文索引的功能模块也是部署在文件服务器上的。因此,与一份文件相关联的附件被上传到文件服务器之后,我们还需要对这些附件生成相应的索引文件以供后面对附件进行全文索引时使用。对附件生成索引的过程包括文件 I/O(读取附件文件和写索引文件)和一些计算(如进行分词),该过程相对于将上传的附件保存到磁盘中而言也快不到哪里。因此,我们不希望对附件生成索引文件这个操作的快慢影响系统用户的体验(如增加了用户等待系统给出操作反馈的时间)。此时,Producer-Consumer 模式可以派上用场:我们可以把负责附件存储的线程看作生产者,其“产品”是一个已保存到磁盘的文件。另外,我们引入一个负责对已存储的附件文件生成响应索引文件的线程,该线程就相当于消费者,它“消费”了上传到文件服务器的附件文件。
本案例 Demo 详见 multithreading 工程的 com.Producer_Consumer 包
-
Producer-Consumer 模式的评价与现实考量
- Producer-Consumer 模式使得“产品”的生产者和消费者各自的处理能力(速率)相对来说互不影响。生产者只需要将其“生产”的“产品”放入通道中就可以继续处理,而不必等待相应的“产品”被消费者处理完毕。而消费者运行在其自身的工作者线程中,它只管从通道中去“产品”进行处理,而不必关心这些“产品”由谁“生产”以及如何“生产”这些细节。因而消费者的处理能力相对来说又不影响生产者,同时又与生产者是松耦合(Loose Coupling)的关系。另一方面,当消费者处理能力比生产者处理能力大的时候,可能出现通道为空的情形,此时消费者的工作者线程会被暂挂直到生产者“生产”了新的“产品”。此时出现了事实上的消费者等待生产者的情形。类似地,当消费者的处理能力小于生产者的处理能力时,通道可能会满,导致生产者线程被暂挂直到消费者“消费”了通道中的部分“产品”而腾出了存储空间。
- 通道挤压
Producer-Consumer 模式中,消费者的处理能力往往低于生产者的处理能力。此情形下随着时间的推移,通道中存储的“产品”会越来越多而出现积压,这好比工厂的生产能力比较大,但是其生产的产品的销售情况不容乐观。为了更好地平衡生产者和消费者的处理能力,我们需要对消费者处理过慢的情形进行一定的处理。常见的方法包括一下两种:
- 使用有界阻塞队列。使用有界阻塞队列(如 ArrayBlockingQueue 和带容量限制的 LinkedBlockingQueue)作为 Channel 参与者的实现可以实现消费者处理压力“反弹”给生产者的效果。
- 使用带流量控制的无界阻塞队列。使用无界阻塞队列(如不带容量限制 LinkedBlockingQueue)作为 Channel 参与者的实现也可以实现平衡生产者和消费者的处理能力。这通常是借助流量控制实现的,即对同一时间内可以有多少个生产者线程往通道中存储“产品”进行限制,从而达到平衡生产者和消费者的处理能力的目的。
- 工作窃取算法
Producer-Consumer 模式中的通道通常可以使用队列来实现。一个通道可以对应一个或者多个队列实例。本章案例中,一个通道仅对应一个队列(ArrayBlockingQueue)实例。这意味着,如果有多个消费者从该通道中获取“产品”,那么这些消费者的工作者线程实际上是在共享同一个队列实例,而这会导致锁的竞争,即修改队列的头指针需要获得的锁而导致的竞争。如果一个通道实例对应多个队列实例,那么就可以实现多个消费者线程从通道中取“产品”的时候访问的是各自的队列实例。此时,各个消费者线程修改队列的头指针并不会导致锁竞争。
一个通道实例对应多个队列实例的时候,当一个消费者线程处理完该线程对应的队列中的“产品”时,它可以继续从其他消费者线程对应的队列中取出“产品”进行处理,这样就不会导致该消费者线程闲置,并减轻其他消费者线程的负担。
- 线程的停止
一个具体的 Producer-Consumer 模式实现通常可以看做一个服务。如果该服务中的 Producer 参与者实例也有工作者线程,那么该服务的停止就涉及 Producer 参与者 和 Consumer 参与者的两种工作者线程的停止。此时,我们需要注意这两种线程的停止顺序:如果先停止 Consumer 参与者的工作者线程则会导致 Producer 参与者新“生产”的“产品”无法被处理。如果先停止 Producer 参与者的工作者线程又可能使 Consumer 参与者的工作者线程处于空等待。并且,停止 Consumer 参与者的工作者线程前是否考虑要等待其处理完所有待处理的“产品”或者将这些“产品”做个备份也是问题。总的来说,我们可以借助 Tow-phase Termination 模式来先停止 Producer 参与者的工作者线程。当某个服务的所有 Producer 参与者的工作者线程都停止之后,再停止该服务涉及的 Consumer 参与者的工作者线程。
- 高性能高可靠的 Producer-Consumer 模式实现
如果应用程序对准备采用 Producer-Consumer 模式实现的服务有较高的性能和可靠性的要求,那么不妨考虑使用开源的 Producer-Consumer 模式实现库 LMAX Disruptor。
-
Java 标准库实例
Java 标准库中的类 java.io.PipedOutStream 和 java.io.PipedInputStream 允许一个线程以 I/O 的形势输出数据给另外一个线程。这里,java.io.PipedOutStream 和 java.io.PipedInputStream 分别相当于 Producer-Consumer 模式的 Producer 参与者和 Consumer 参与者。而 java.io.PipedOutStream 内部维护的缓冲区则相当于 Producer-Consumer 模式的 Channel 参与者。