并发编程之 ThreadPool 线程池
线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态 | value | 说明 |
---|---|---|
RUNNING(当线程池创建出来的初始状态) | 111 | 能接受任务,能执行阻塞任务 |
SHUTDOWN(调用 shutdown 方法) | 000 | 不接受新任务,能执行阻塞任务 肯定可以 執行正在執行的任務 |
STOP(调用 shutDownNow) | 001 | 不接受新任务,打断正在执行的任务,丢弃阻塞任务 |
TIDYING(中间状态) | 010 | 任务全部执行完,活动线程也没了 |
TERMINATED(终结状态) | 011 | 线程池终结 |
shutdown
java
// 线程池状态变为 SHUTDOWN,不会再接收新任务,但已提交任务会执行完,不会阻塞调用线程的执行。
void shutdown();
// 线程池状态变为 STOP,不会接收新任务,会将队列中的任务返回,并用 interrupt 的方式中断正在执行的任务。
List<Runnable> shutdownNow();
// 调用 shutdown 后,调用线程并不会等待所有任务运行结束,可以利用此方法等待。
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
构造方法 1
java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- 核心线程数
- 最大线程数(应急线程数||空闲线程)
- 针对空闲线程的存活时间 如果超时了则把空闲的线程杀掉
- 针对 3 的时间单位
- 任务存放的队列
- 线程工厂,主要是产生线程---给线程起个自定义名字
- 拒绝策略
工作方式
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。当线程数达到核心线程数上限,这时再加入任务,新加的任务会被加入队列当中去。前提是有界队列,任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程数目作为空闲线程来执行任务。如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。
工厂方法
Executors.newFixedThreadPool(n);
java
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
ExecutorService executorService = Executors.newFixedThreadPool(n);
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
- 适用于任务量已知,相对耗时的任务
Executors.newCachedThreadPool();
java
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ExecutorService executorService = Executors.newCachedThreadPool();
- 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,全部都是空闲线程 60s 后回收
- 一个可根据需要创建新线程的线程池,如果现有线程没有可用的,则创建一个新线程并添加到池中,如果有被使用完但是还没销毁的线程,就复用该线程。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源
- 这种线程池比较灵活,对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能
Executors.newSingleThreadExecutor();
java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ExecutorService executorService = Executors.newSingleThreadExecutor();
- 希望多个任务排队执行,线程数固定为 1,任务数多于 1 时,会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放。
- 与自己创建一个单线程串行执行任务的区别是:如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
- Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改。Executors.newFixedThreadPool(1) 初始时为 1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改;
Executors.newScheduledThreadPool(n);
定时任务
java
public static newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ExecutorService executorService = Executors.newScheduledThreadPool(2);
提交任务到线程池
execute 提交一个任务(无返回值)
java
void execute(Runnable command);
submit 提交一个任务(有返回值)
java
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
invokeAll 提交所有任务
java
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
只提交任何一个任务
提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
java
T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
代码示例
ThreadPoolExecutor 的使用
java
package com.mengweijin.learning.basic.thread;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 闲线程和核心线程的概念---空闲线程和核心线程都会从队列当中去获取任务 随机
* @author mengweijin
*/
@Slf4j
public class ThreadPoolExecutorDemo {
private static ThreadPoolExecutor executor;
@SneakyThrows
public static void main(String[] args) {
useThreadPoolExecutor();
executor.shutdown();
}
public static void useThreadPoolExecutor() {
AtomicInteger atomicInteger = new AtomicInteger(0);
// 懒惰性---不会在一开始就创建线程,他是有任务提交的时候才会创建线程
executor = new ThreadPoolExecutor(
1,
2,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (r) -> new Thread(r, "thread-" + atomicInteger.incrementAndGet()),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 3 ; i++) {
executor.execute(new MyTask(i));
}
}
static class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@SneakyThrows
@Override
public void run() {
log.debug("正在执行task{},当前线程池线程数目是:{}", taskNum, executor.getPoolSize());
TimeUnit.SECONDS.sleep(2);
log.debug("task{}执行完毕,当前线程池线程数目是:{}", taskNum, executor.getPoolSize());
}
}
}
submit、invokeAll、invokeAny 任务
java
package com.mengweijin.learning.basic.thread;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author mengweijin
*/
@Slf4j
public class ExecutorServiceDemo {
private static ExecutorService executorService = Executors.newFixedThreadPool(3);
@SneakyThrows
public static void main(String[] args) {
// submitRunnable();
// submitCallable();
// invokeAll();
invokeAny();
executorService.shutdown();
}
/**
* 提交使用 submit
*/
@SneakyThrows
public static void submitRunnable(){
executorService.submit(() -> {
log.debug("Print out 1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
/**
* 和上面的 newFixedThreadPoolRunnable 方法的区别是:
* 下面这个有返回值,Lambda 表达式会解析为 Callable 类,如果没有返回值,Lambda 会解析为 Runnable 类。
*/
@SneakyThrows
public static void submitCallable(){
Future<String> future = executorService.submit(() -> {
log.debug("Print out 1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
});
log.debug("start");
log.debug("result[{}]",future.get());
log.debug("end");
}
@SneakyThrows
public static void invokeAll(){
List<Future<Object>> futureList = executorService.invokeAll(Arrays.asList(
() -> {
log.debug("1");
return "1s";
},
() -> {
log.debug("2");
return "2s";
}
));
log.debug("main start");
futureList.forEach(future -> {
try {
log.debug(String.valueOf(future.get()));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
log.debug("main end");
}
@SneakyThrows
public static void invokeAny(){
int result = executorService.invokeAny(Arrays.asList(
() -> {
TimeUnit.MILLISECONDS.sleep(3000);
log.debug("11");
return 1;
},
() -> {
TimeUnit.MILLISECONDS.sleep(2000);
log.debug("22");
return 2;
},
() -> {
TimeUnit.MILLISECONDS.sleep(100);
log.debug("33");
return 3;
}
));
log.debug("main start");
log.debug(String.valueOf(result));
log.debug("main end");
}
}
定时任务 Executors.newScheduledThreadPool(n);
java
package com.mengweijin.learning.basic.thread;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author mengweijin
*/
@Slf4j
public class ScheduledExecutorServiceDemo {
private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
@SneakyThrows
public static void main(String[] args) {
log.debug("start......");
// delay();
// atFixedRate();
withFixedDelay();
TimeUnit.SECONDS.sleep(10);
scheduledExecutorService.shutdown();
log.debug("shutdown");
}
/**
* 延迟 3 秒执行(只会执行一次)
*/
public static void delay() {
scheduledExecutorService.schedule(() -> {
log.debug("run task...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 3, TimeUnit.SECONDS);
}
/**
* 延迟 3 秒开始执行,每隔 1 秒执行一次。
* 但是由于执行的任务需要花费 2 秒钟,所以 2 秒钟之后才开始下一个调度的执行,即变成了每隔 2 秒钟执行一次。
* 即任务执行需要花费的时间包含在 period 所设定的时间(这里是 1 秒)。
* 假如这里 period 设置为 5 秒,任务执行需要花费 2 秒,总共也只会延迟 5 秒执行,而不是延迟 7 秒。
*/
public static void atFixedRate() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
log.debug("run task...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 3, 1, TimeUnit.SECONDS);
}
/**
* 延迟 3 秒开始执行,每隔 1 秒执行一次。
* 但是由于执行的任务需要花费 2 秒钟,再加上 delay 的 1 秒,就变成了每隔 3 秒执行一次调度。
* 即任务执行完成后,再延迟 delay 设定的时间,才开始下一次调度的执行。
*/
public static void withFixedDelay() {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
log.debug("run task...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 3, 1, TimeUnit.SECONDS);
}
}