FixedThreadPool
package cn.itcast.hotel.Thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class FixedThreadPool {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService fixedThreadPool= Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
fixedThreadPool.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
fixedThreadPool.shutdown();
//
// fixedthread.submit();
// fixedthread.awaitTermination();
// fixedthread.invokeAll();
// 提交一个有返回值的任务,并获取返回的 Future 对象
Future<String> future = fixedThreadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
// 使用 Future 对象获取任务执行结果
System.out.println("Result of submitted task: " + future.get());
// 等待线程池中的所有任务执行完毕并关闭线程池
fixedThreadPool.shutdown();
fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
// 使用 invokeAll() 方法同时执行多个任务
List<Callable<String>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
}
List<Future<String>> results = fixedThreadPool.invokeAll(tasks);
// 输出所有任务执行结果
for (Future<String> result : results) {
System.out.println("Result of invoked task: " + result.get());
}
}
}
CaechThreaPool
package cn.itcast.hotel.Thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CaechThreaPool {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 创建一个可缓存的线程池
for (int i = 0; i < 10; i++) {
cachedThreadPool.execute(() -> {
System.out.println("Thread " + Thread.currentThread().getName() + " is executing task");
});
}
cachedThreadPool.shutdown(); // 关闭线程池
}
}
ScheduleThreaPool
package cn.itcast.hotel.Thread;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleThreaPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); // 创建一个大小为2的定时任务线程池
scheduledThreadPool.schedule(() -> {
System.out.println("Scheduled task executed once after 3 seconds");
}, 3, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(() -> {
System.out.println("Scheduled task executed every 2 seconds");
}, 0, 2, TimeUnit.SECONDS);
}
}
SingleThresPool
package cn.itcast.hotel.Thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThresPool {
public static void main(String[] args) {
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); // 创建一个单线程的线程池
for (int i = 0; i < 5; i++) {
singleThreadPool.execute(() -> {
System.out.println("Thread " + Thread.currentThread().getName() + " is executing task");
});
}
singleThreadPool.shutdown(); // 关闭线程池
}
}
Thread
@Test
void demop(){
Thread thread1 = new Thread(()->{
System.out.println("Thread 1 is running...");
});
thread1.start();
Thread thread2 = new Thread() {
public void run() {
System.out.println("Thread 2 is running...");
}
};
thread2.start();
}
线程池七大参数分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler
(1)corePoolSize:线程池中常驻核心线程数
(2)maximumPoolSize:线程池能够容纳同时执行的最大线程数
(3)keepAliveTime:多余的空闲线程存活时间
(4)unit:keepAliveTime的时间单位
(5)workQueue:任务队列,被提交但尚未执行的任务
(6)threadFactory:表示生成线程池中的工作线程的线程工厂
(7)handler:拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何拒绝。
从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释。
一、corePoolSize 线程池核心线程大小
线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。任务提交到线程池后,首先会检查当前线程数是否达到了corePoolSize,如果没有达到的话,则会创建一个新线程来处理这个任务。
二、maximumPoolSize 线程池最大线程数量
当前线程数达到corePoolSize后,如果继续有任务被提交到线程池,会将任务缓存到工作队列(后面会介绍)中。如果队列也已满,则会去创建一个新线程来出来这个处理。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。
三、keepAliveTime 空闲线程存活时间
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定
四、unit 空闲线程存活时间单位
keepAliveTime的计量单位
五、workQueue 工作队列
新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:
- ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
- LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而基本不会去创建新线程直到maxPoolSize(很难达到Interger.MAX这个数),因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
- SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
- PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
六、threadFactory 线程工厂
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
七、handler 拒绝策略
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:
- ①CallerRunsPolicy
- 该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
- ②AbortPolicy
- 该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
- DiscardPolicy
- 该策略下,直接丢弃任务,什么都不做。
- DiscardOldestPolicy
- 该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
Executors的弊端
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
导致线程一直存在致使OOM
所以使用ThreadPoolExecutor
@Slf4j
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int i = Runtime.getRuntime().availableProcessors();
//核心线程数目
executor.setCorePoolSize(i * 2);
//指定最大线程数
executor.setMaxPoolSize(i * 2);
//队列中最大的数目
executor.setQueueCapacity(i * 2 * 10);
//线程名称前缀
executor.setThreadNamePrefix("ThreadPoolTaskExecutor-");
//rejection-policy:当pool已经达到max size的时候,如何处理新任务
//CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
//对拒绝task的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//当调度器shutdown被调用时等待当前被调度的任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
//线程空闲后的最大存活时间
executor.setKeepAliveSeconds(60);
//加载
executor.initialize();
log.info("初始化线程池成功");
return executor;
}
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
//获取cpu核心数
int i = Runtime.getRuntime().availableProcessors();
//核心线程数
int corePoolSize = i * 2;
//最大线程数
int maximumPoolSize = i * 2;
//线程无引用存活时间
long keepAliveTime = 60;
//时间单位
TimeUnit unit = TimeUnit.SECONDS;
//任务队列,接收一个整型的参数,这个整型参数指的是队列的长度,
//ArrayBlockingQueue(int,boolean),boolean类型的参数是作为可重入锁的参数进行初始化,默认false,另外初始化了notEmpty、notFull两个信号量。
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(i * 2 * 10);
//1. 同步阻塞队列 (put,take),直接提交。直接提交策略表示线程池不对任务进行缓存。新进任务直接提交给线程池,当线程池中没有空闲线程时,创建一个新的线程处理此任务。
// 这种策略需要线程池具有无限增长的可能性。实现为:SynchronousQueue
//2. 有界队列。当线程池中线程达到corePoolSize时,新进任务被放在队列里排队等待处理。有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,
// 但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,
// 但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,
// CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
//3. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。
// 这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,
// 适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
//线程工厂
//defaultThreadFactory()
//返回用于创建新线程的默认线程工厂。
//privilegedThreadFactory()
//返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。
ThreadFactory threadFactory =Executors.defaultThreadFactory();
//拒绝执行处理器
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
//创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
return threadPoolExecutor;
}
}