在高并发场景下,常见的设计模式可能存在线程安全问题,比如传统的单例模式就是一个典型。另外,为了充分发挥多核优势,高并发程序常常将大的任务分割成一些规模较小的任务,以便各个击破、分而治之,这就出现了一些高并发场景下特有的设计模式,比如ForkJoin模式等。博文将详细介绍在高并发场景常用的几种模式∶线程安全的单例模式、ForkJoin模式、生产者-消费者模式、Master-Worker模式和Future模式。
一、线程安全的单例模式单例模式是常见的一种设计模式,一般用于全局对象管理,比如XML读写实例、系统配置实例、任务调度实例、数据库连接池实例等。
1.1 饿汉式单例设计模式 1.2 懒汉式单例设计模式 1.3 Double check单例设计模式 二、Master-Worker设计模式Master-Worker模式是一种常见的高并发模式,它的核心思想是任务的调度和执行分离,调度任务的角色为Master,执行任务的角色为Worker,Master负责接收、分配任务和合并(Merge)任务结果,Worker负责执行任务。Master-Worker模式是一种归并类型的模式。
举一个例子,在TCP服务端的请求处理过程中,大量的客户端连接相当于大量的任务,Master需要将这些任务存储在一个任务队列中,然后分发给各个Worker,每个Worker是一个工作线程,负责完成连接的传输处理。Master-Worker模式的整体结构如图8-1所示。
Master-Worker模式的核心思想为分而治之,Master角色负责接收和分配任务,Worker角色负责执行任务和结果回填,具体如图8-2所示。
实际上,高性能传输模式Reactor模式就是Master-Worker模式在传输领域的一种应用。基于Java的NIO技术,Netty设计了一套优秀的、高性能Reactor(反应器)模式的具体实现。在Netty中,EventLoop反应器内部有一个线程负责Java NIO选择器的事件轮询,然后进行对应的事件分发。事件分发的目标就是Netty的Handler处理程序(含用户定义的业务处理程序)。Netty服务器程序中需要设置两个EventLoopGroup轮询组,一个组负责新连接的监听和接收,另一个组负责IO传输事件的轮询与分发,两个轮询组的职责具体如下:
- 负责新连接的监听和接收的EventLoopGroup轮询组中的反应器完成查询通道的新连接IO事件查询,这些反应器有点像负责招工的包工头,因此该轮询组可以形象地称为“包工头”(Boss)轮询组。
- 另一个轮询组中的反应器完成查询所有子通道的IO事件,并且执行对应的Handler处理程序完成IO处理,例如数据的输入和输出(有点像搬砖),这个轮询组可以形象地称为“工人”(Worker)轮询组。
Nginx服务器是Master-Worker模式(更准确地说是Reactor模式)在高性能服务器领域的一种应用。Nginx是一个高性能的HTTP和反向代理Web服务器,Nginx因其高稳定性、丰富的功能集、内存消耗少、并发能力强而闻名全球,目前得到非常广泛的使用,比如百度、京东、新浪、网易、腾讯、淘宝等都是它的用户。
Nginx在启动后会以daemon方式在后台运行,它的后台进程有两类∶―类称为Master进程(相当于管理进程),另一类称为Worker进程(工作进程)。Nginx的进程结构图如图8-4所示。
Nginx的Master进程主要负责调度Worker进程,比如加载配置、启动工作进程、接收来自外界的信号、向各Worker进程发送信号、监控Worker进程的运行状态等。Master进程负责创建监听套接口,交由Worker进程进行连接监听。Worker进程主要用来处理网络事件,当一个Worker进程在接收一条连接通道之后,就开始读取请求、解析请求、处理请求,处理完成产生的数据后,再返回给客户端,最后断开连接通道。
三、Fork-join设计模式 3.1 ForkJoin的原理“分而治之”是一种思想,所谓“分而治之”,就是把一个复杂的算法问题按一定的“分解”方法分为规模较小的若干部分,然后逐个解决,分别找出各部分的解,最后把各部分的解组成整个问题的解。
“分而治之”思想在软件体系结构设计、模块化设计、基础算法中得到了非常广泛的应用。许多基础算法都运用了“分而治之”的思想,比如二分查找、快速排序等。Master-Worker模式是“分而治之”思想的一种应用,本节所介绍的ForkJoin模式则是“分而治之”思想的另一种应用。与Master-Worker模式不同,ForkJoin模式没有Master角色,其所有的角色都是Worker,ForkJoin模式中的Worker将大的任务分割成小的任务,一直到任务的规模足够小,可以使用很简单、直接的方式来完成。
ForkJoin模式先把一个大任务分解成许多个独立的子任务,然后开启多个线程并行去处理这些子任务。有可能子任务还是很大而需要进一步分解,最终得到足够小的任务。
JUC包提供了一套ForkJoin框架的实现,具体以ForkJoinPool线程池的形式提供,并且该线程池在Java 8的Lambda并行流框架中充当着底层框架的角色。JUC包的ForkJoin框架包含如下组件:
- ForkJoinPool:执行任务的线程池,继承了AbstractExecutorService类。
- ForkJoinWorkerThread:执行任务的工作线程(ForkJoinPool线程池中的线程)。每个线程都维护着一个内部队列,用于存放“内部任务”该类继承了Thread类。
- ForkJoinTask:用于ForkJoinPool的任务抽象类,实现了Future接口。
- RecursiveTask:带返回结果的递归执行任务,是ForkJoinTask的子类,在子任务带返回结果时使用。
- RecursiveAction∶不返回结果的递归执行任务,是ForkJoinTask的子类,在子任务不带返回结果时使用。
ForkJoinPool的构造
ForkJoinPool里三个重要的角色:
- ForkJoinWorkerThread(下文简称worker):包装Thread;
- WorkQueue:任务队列,双向;
- ForkJoinTask:worker执行的对象,实现了Future。两种类型,一种叫submission,另一种就叫task。
ForkJoinPool和ThreadPoolExecutor都是继承自AbstractExecutorService抽象类,所以它和ThreadPoolExecutor的使用几乎没有多少区别,除了任务变成了ForkJoinTask以外。
这里又运用到了一种很重要的设计原则——开闭原则——对修改关闭,对扩展开放。
可见整个线程池体系一开始的接口设计就很好,新增一个线程池类,不会对原有的代码造成干扰,还能利用原有的特性。
ForkJoinPool使用数组保存所有WorkQueue,每个worker有属于自己的WorkQueue,但不是每个WorkQueue都有对应的worker。没有worker的WorkQueue:保存的是submission,来自外部提交,在WorkQueue[]的下标是偶数;属于worker的WorkQueue:保存的是task,在WorkQueue[]的下标是奇数。
// Instance fields
volatile long ctl; // main pool control
volatile int runState; // lockable status
final int config; // parallelism, mode
int indexSeed; // to generate worker index
volatile WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
volatile AtomicLong stealCounter; // also used as sync monitor
ForkJoinPool有两种获取方法,通过commonPool方法或者通过构造方法创建一个对象。前者是内部的一个静态变量,也就是说在一个进程中共享该ForkJoinPool。下面是构造方法:
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np [] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) t;;) {
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r >> 17; r ^= r
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?