FixedThreadPool

package cn.itcast.hotel.Thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class FixedThreadPool {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService fixedThreadPool= Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; i++) {
            fixedThreadPool.execute(()->{
               System.out.println(Thread.currentThread().getName());
           });
        }
        fixedThreadPool.shutdown();
//
//        fixedthread.submit();
//        fixedthread.awaitTermination();
//        fixedthread.invokeAll();


        // 提交一个有返回值的任务,并获取返回的 Future 对象
        Future<String> future = fixedThreadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return Thread.currentThread().getName();
            }
        });

        // 使用 Future 对象获取任务执行结果
        System.out.println("Result of submitted task: " + future.get());

        // 等待线程池中的所有任务执行完毕并关闭线程池
        fixedThreadPool.shutdown();
        fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);



        // 使用 invokeAll() 方法同时执行多个任务
        List<Callable<String>> tasks = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            tasks.add(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName();
                }
            });
        }
        List<Future<String>> results = fixedThreadPool.invokeAll(tasks);

        // 输出所有任务执行结果
        for (Future<String> result : results) {
            System.out.println("Result of invoked task: " + result.get());
        }



    }

}

CaechThreaPool

package cn.itcast.hotel.Thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CaechThreaPool {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 创建一个可缓存的线程池
        for (int i = 0; i < 10; i++) {
            cachedThreadPool.execute(() -> {
            System.out.println("Thread " + Thread.currentThread().getName() + " is executing task");
            });
        }
        cachedThreadPool.shutdown(); // 关闭线程池

    }
}

ScheduleThreaPool

package cn.itcast.hotel.Thread;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduleThreaPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); // 创建一个大小为2的定时任务线程池

        scheduledThreadPool.schedule(() -> {
            System.out.println("Scheduled task executed once after 3 seconds");
        }, 3, TimeUnit.SECONDS);

        scheduledThreadPool.scheduleAtFixedRate(() -> {
            System.out.println("Scheduled task executed every 2 seconds");
        }, 0, 2, TimeUnit.SECONDS);
    }
}

SingleThresPool

package cn.itcast.hotel.Thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThresPool {
    public static void main(String[] args) {
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); // 创建一个单线程的线程池
        for (int i = 0; i < 5; i++) {
            singleThreadPool.execute(() -> {
            System.out.println("Thread " + Thread.currentThread().getName() + " is executing task");
            });
        }
        singleThreadPool.shutdown(); // 关闭线程池
    }
}

Thread

@Test
    void  demop(){
            Thread thread1 = new Thread(()->{
                System.out.println("Thread 1 is running...");
            });
            thread1.start();




            Thread thread2 = new Thread() {
                public void run() {
                    System.out.println("Thread 2 is running...");
                }
            };
            thread2.start();

    }

线程池七大参数分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler

(1)corePoolSize:线程池中常驻核心线程数

(2)maximumPoolSize:线程池能够容纳同时执行的最大线程数

(3)keepAliveTime:多余的空闲线程存活时间

(4)unit:keepAliveTime的时间单位

(5)workQueue:任务队列,被提交但尚未执行的任务

(6)threadFactory:表示生成线程池中的工作线程的线程工厂

(7)handler:拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何拒绝。

从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释。

一、corePoolSize 线程池核心线程大小

线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。任务提交到线程池后,首先会检查当前线程数是否达到了corePoolSize,如果没有达到的话,则会创建一个新线程来处理这个任务。

二、maximumPoolSize 线程池最大线程数量

当前线程数达到corePoolSize后,如果继续有任务被提交到线程池,会将任务缓存到工作队列(后面会介绍)中。如果队列也已满,则会去创建一个新线程来出来这个处理。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。

三、keepAliveTime 空闲线程存活时间

一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定

四、unit 空闲线程存活时间单位

keepAliveTime的计量单位

五、workQueue 工作队列

新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:

  1. ArrayBlockingQueue

基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。

  1. LinkedBlockingQuene

基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而基本不会去创建新线程直到maxPoolSize(很难达到Interger.MAX这个数),因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

  1. SynchronousQuene

一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

  1. PriorityBlockingQueue

具有优先级的无界阻塞队列,优先级通过参数Comparator实现。

六、threadFactory 线程工厂

创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等

七、handler 拒绝策略

当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:

  1. ①CallerRunsPolicy
  • 该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
  1. ②AbortPolicy
  • 该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
  1. DiscardPolicy
  • 该策略下,直接丢弃任务,什么都不做。
  1. DiscardOldestPolicy
  • 该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列

Executors的弊端

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

导致线程一直存在致使OOM

所以使用ThreadPoolExecutor


@Slf4j
@Configuration
public class ThreadPoolConfig {
    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        int i = Runtime.getRuntime().availableProcessors();
        //核心线程数目
        executor.setCorePoolSize(i * 2);
        //指定最大线程数
        executor.setMaxPoolSize(i * 2);
        //队列中最大的数目
        executor.setQueueCapacity(i * 2 * 10);
        //线程名称前缀
        executor.setThreadNamePrefix("ThreadPoolTaskExecutor-");
        //rejection-policy:当pool已经达到max size的时候,如何处理新任务
        //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        //对拒绝task的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //当调度器shutdown被调用时等待当前被调度的任务完成
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //线程空闲后的最大存活时间
        executor.setKeepAliveSeconds(60);
        //加载
        executor.initialize();
        log.info("初始化线程池成功");
        return executor;
    }

    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        //获取cpu核心数
        int i = Runtime.getRuntime().availableProcessors();
        //核心线程数
        int corePoolSize = i * 2;
        //最大线程数
        int maximumPoolSize = i * 2;
        //线程无引用存活时间
        long keepAliveTime = 60;
        //时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //任务队列,接收一个整型的参数,这个整型参数指的是队列的长度,
        //ArrayBlockingQueue(int,boolean),boolean类型的参数是作为可重入锁的参数进行初始化,默认false,另外初始化了notEmpty、notFull两个信号量。
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(i * 2 * 10);
        //1. 同步阻塞队列 (put,take),直接提交。直接提交策略表示线程池不对任务进行缓存。新进任务直接提交给线程池,当线程池中没有空闲线程时,创建一个新的线程处理此任务。
        // 这种策略需要线程池具有无限增长的可能性。实现为:SynchronousQueue
        //2. 有界队列。当线程池中线程达到corePoolSize时,新进任务被放在队列里排队等待处理。有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,
        // 但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,
        // 但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,
        // CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
        //3. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。
        // 这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,
        // 适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

        //线程工厂
        //defaultThreadFactory()
        //返回用于创建新线程的默认线程工厂。
        //privilegedThreadFactory()
        //返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。
        ThreadFactory threadFactory =Executors.defaultThreadFactory();
        //拒绝执行处理器
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
        //创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        return threadPoolExecutor;
    }
}

刚学java菜鸡,永劫无间蚀月,王者荣耀王者,金铲铲小铂金,第五人格菜鸡,原神开服玩家,星穹铁道菜鸡,崩坏的菜鸡,闪耀暖暖,和平精英,LOL,CSGO,以及三A大作收集者等等。。。