Skip to content

并发编程之 ThreadPool 线程池

线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态value说明
RUNNING(当线程池创建出来的初始状态)111能接受任务,能执行阻塞任务
SHUTDOWN(调用 shutdown 方法)000不接受新任务,能执行阻塞任务 肯定可以 執行正在執行的任務
STOP(调用 shutDownNow)001不接受新任务,打断正在执行的任务,丢弃阻塞任务
TIDYING(中间状态)010任务全部执行完,活动线程也没了
TERMINATED(终结状态)011线程池终结

shutdown

java
// 线程池状态变为 SHUTDOWN,不会再接收新任务,但已提交任务会执行完,不会阻塞调用线程的执行。
void shutdown();

// 线程池状态变为 STOP,不会接收新任务,会将队列中的任务返回,并用 interrupt 的方式中断正在执行的任务。
List<Runnable> shutdownNow();

// 调用 shutdown 后,调用线程并不会等待所有任务运行结束,可以利用此方法等待。
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

构造方法 1

java
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  1. 核心线程数
  2. 最大线程数(应急线程数||空闲线程)
  3. 针对空闲线程的存活时间 如果超时了则把空闲的线程杀掉
  4. 针对 3 的时间单位
  5. 任务存放的队列
  6. 线程工厂,主要是产生线程---给线程起个自定义名字
  7. 拒绝策略

工作方式

线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。当线程数达到核心线程数上限,这时再加入任务,新加的任务会被加入队列当中去。前提是有界队列,任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程数目作为空闲线程来执行任务。如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。

工厂方法

Executors.newFixedThreadPool(n);

java
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

ExecutorService executorService = Executors.newFixedThreadPool(n);
  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务
  • 适用于任务量已知,相对耗时的任务

Executors.newCachedThreadPool();

java
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

ExecutorService executorService = Executors.newCachedThreadPool();
  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,全部都是空闲线程 60s 后回收
  • 一个可根据需要创建新线程的线程池,如果现有线程没有可用的,则创建一个新线程并添加到池中,如果有被使用完但是还没销毁的线程,就复用该线程。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源
  • 这种线程池比较灵活,对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能

Executors.newSingleThreadExecutor();

java
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

ExecutorService executorService = Executors.newSingleThreadExecutor();
  • 希望多个任务排队执行,线程数固定为 1,任务数多于 1 时,会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放。
  • 与自己创建一个单线程串行执行任务的区别是:如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改。Executors.newFixedThreadPool(1) 初始时为 1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改;

Executors.newScheduledThreadPool(n);

定时任务

java
public static newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

ExecutorService executorService = Executors.newScheduledThreadPool(2);

提交任务到线程池

execute 提交一个任务(无返回值)

java
void execute(Runnable command);

submit 提交一个任务(有返回值)

java
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);

invokeAll 提交所有任务

java
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

只提交任何一个任务

提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

java
T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

代码示例

ThreadPoolExecutor 的使用

java
package com.mengweijin.learning.basic.thread;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 闲线程和核心线程的概念---空闲线程和核心线程都会从队列当中去获取任务 随机
 * @author mengweijin
 */
@Slf4j
public class ThreadPoolExecutorDemo {
    private static ThreadPoolExecutor executor;
    @SneakyThrows
    public static void main(String[] args) {
        useThreadPoolExecutor();
        executor.shutdown();
    }

    public static void useThreadPoolExecutor() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        // 懒惰性---不会在一开始就创建线程,他是有任务提交的时候才会创建线程
        executor = new ThreadPoolExecutor(
                1,
                2,
                1,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), (r) -> new Thread(r, "thread-" + atomicInteger.incrementAndGet()),
                new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 3 ; i++) {
            executor.execute(new MyTask(i));
        }
    }

    static class MyTask implements Runnable {
        private int taskNum;
        public MyTask(int num) {
            this.taskNum = num;
        }
        @SneakyThrows
        @Override
        public void run() {
            log.debug("正在执行task{},当前线程池线程数目是:{}", taskNum, executor.getPoolSize());
            TimeUnit.SECONDS.sleep(2);
            log.debug("task{}执行完毕,当前线程池线程数目是:{}", taskNum, executor.getPoolSize());
        }
    }
}

submit、invokeAll、invokeAny 任务

java
package com.mengweijin.learning.basic.thread;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author mengweijin
 */
@Slf4j
public class ExecutorServiceDemo {
    private static ExecutorService executorService = Executors.newFixedThreadPool(3);
    @SneakyThrows
    public static void main(String[] args) {
//        submitRunnable();
//        submitCallable();
//        invokeAll();
        invokeAny();

        executorService.shutdown();
    }

    /**
     * 提交使用 submit
     */
    @SneakyThrows
    public static void submitRunnable(){
        executorService.submit(() -> {
            log.debug("Print out 1");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 和上面的 newFixedThreadPoolRunnable 方法的区别是:
     * 下面这个有返回值,Lambda 表达式会解析为 Callable 类,如果没有返回值,Lambda 会解析为 Runnable 类。
     */
    @SneakyThrows
    public static void submitCallable(){
        Future<String> future = executorService.submit(() -> {
            log.debug("Print out 1");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "success";
        });

        log.debug("start");
        log.debug("result[{}]",future.get());
        log.debug("end");
    }

    @SneakyThrows
    public static void invokeAll(){
        List<Future<Object>> futureList = executorService.invokeAll(Arrays.asList(
                () -> {
                    log.debug("1");
                    return "1s";
                },
                () -> {
                    log.debug("2");
                    return "2s";
                }
        ));
        log.debug("main start");
        futureList.forEach(future -> {
            try {
                log.debug(String.valueOf(future.get()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });
        log.debug("main end");
    }

    @SneakyThrows
    public static void invokeAny(){
        int result = executorService.invokeAny(Arrays.asList(
                () -> {
                    TimeUnit.MILLISECONDS.sleep(3000);
                    log.debug("11");
                    return 1;
                },
                () -> {
                    TimeUnit.MILLISECONDS.sleep(2000);
                    log.debug("22");
                    return 2;
                },
                () -> {
                    TimeUnit.MILLISECONDS.sleep(100);
                    log.debug("33");
                    return 3;
                }
        ));
        log.debug("main start");
        log.debug(String.valueOf(result));
        log.debug("main end");
    }
}

定时任务 Executors.newScheduledThreadPool(n);

java
package com.mengweijin.learning.basic.thread;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author mengweijin
 */
@Slf4j
public class ScheduledExecutorServiceDemo {
    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    @SneakyThrows
    public static void main(String[] args) {
        log.debug("start......");

//        delay();
//        atFixedRate();
        withFixedDelay();

        TimeUnit.SECONDS.sleep(10);
        scheduledExecutorService.shutdown();
        log.debug("shutdown");
    }

    /**
     * 延迟 3 秒执行(只会执行一次)
     */
    public static void delay() {
        scheduledExecutorService.schedule(() -> {
            log.debug("run task...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 3, TimeUnit.SECONDS);
    }

    /**
     * 延迟 3 秒开始执行,每隔 1 秒执行一次。
     * 但是由于执行的任务需要花费 2 秒钟,所以 2 秒钟之后才开始下一个调度的执行,即变成了每隔 2 秒钟执行一次。
     * 即任务执行需要花费的时间包含在 period 所设定的时间(这里是 1 秒)。
     * 假如这里 period 设置为 5 秒,任务执行需要花费 2 秒,总共也只会延迟 5 秒执行,而不是延迟 7 秒。
     */
    public static void atFixedRate() {
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            log.debug("run task...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 3, 1, TimeUnit.SECONDS);
    }

    /**
     * 延迟 3 秒开始执行,每隔 1 秒执行一次。
     * 但是由于执行的任务需要花费 2 秒钟,再加上 delay 的 1 秒,就变成了每隔 3 秒执行一次调度。
     * 即任务执行完成后,再延迟 delay 设定的时间,才开始下一次调度的执行。
     */
    public static void withFixedDelay() {
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            log.debug("run task...");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 3, 1, TimeUnit.SECONDS);
    }
}