java手撸线程池

正文

如何新建线程池

根据《阿里巴巴java开发手册》中关于线程的规约,建议我们手动 ThreadPoolExecutor 来创建线程池

上图已经说明了为什么这么规范?我在这里再说明以下,比如我们创建一个固定任务的线程池

1
ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);

但是由于源码中 newFixedThreadPool 使用的 LinkedBlockingQueue 并没有固定大小,这个队列就是保存的就是我们提交的任务

1
2
3
4
5
6
7
8
9
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

这样设置会有如下影响

1
2
3
4
当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize。
由于1,使用无界队列时 maximumPoolSize 将是一个无效参数。 
由于1和2,使用无界队列时 keepAliveTime 将是一个无效参数。
由于使用无界队列,运行中的 FixedThreadPool(未执行方法 shutdown() 或 shutdownNow() )不会拒绝任务(不会调用 RejectedExecutionHandler.rejectedExecution 方法)。

在我们自定义的时候我们先来看看 jdk 给我们提供的工具类是如何新建的
1、newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。(线程最大并发数不可控制)
2、newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3、newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。
4、newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newScheduledThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

所以根据上面的各个线程池的模板,我们可以根据业务的特性构建我们自己的线程池
下面这个线程池 newFixedThreadPool 适用于提交已经固定的线程数量,这些线程内部是死循环消费队列来处理事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.creambing.utils.encrypt;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* FileName: ExecutorServiceUtils
* Author: creambing
* Date: 2019-11-04 19:18
* Description:
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改时间 版本号 描述
*/
public class ExecutorServiceUtils {

public static ExecutorService newFixedThreadPool(int nThreads, String threadName) {
return newThreadPool(nThreads, nThreads, 0l
, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(nThreads),
new NamedThreadFactory(threadName),
new ThreadPoolExecutor.AbortPolicy());
}

public static ExecutorService newThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}


static class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

private NamedThreadFactory(String namePrefix) {
this.namePrefix = namePrefix + "-" +
poolNumber.getAndIncrement();
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}

参数讲解

1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

ThreadPoolExecutor 的拒绝机制可以通过 setRejectedExecutionHandler() 方法动态修改
AbortPolicy:放弃任务执行,并抛出RejectedException异常,通过捕获该异常,可以执行相应的处理方法
CallerRunPolicy:调用者运行,该机制会将任务会退给调用者,从而降低新任务的流量。他会在调用了execute的线程中执行任务,从而阻止主线程向线程池提交新的任务,该机制通常适用Socket服务处理,但对Servlet服务并不适用,因为Servlet本身即为多线程机制,主线程的阻塞无法达到减缓请求速度的效果
DiscardPolicy:抛弃任务
DiscardOldPolicy:抛弃最旧的任务,即抛弃下一个将被执行的任务
还可以实现RejecetExecutionHandler接口,自定义任务的拒绝方法

修改线程池线程名字

当我们想给线程池中的线程命名时,除了实现 ThreadFactory 接口并传入,另外就是在线程类的 run 方法中修改 Thread 的名字,这个方法不够灵活,不建议使用

1
Thread.currentThread().setName("xxx"+"-"+Thread.currentThread().getId());

参考资料

Cream Bing wechat
subscribe to my blog by scanning my public wechat account