
urrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。
ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,前三个构造器都是调用了最后一个构造器,第四个构造器如下:
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 表示线程没有任务执行时最多保持多久时间会终止TimeUnit unit, // keepAliveTime的时间单位BlockingQueue<Runnable> workQueue, // 阻塞队列ThreadFactory threadFactory, // 用来创建线程的线程工厂RejectedExecutionHandler handler // 拒绝策略
);
构造器中各参数含义
corePoolSize:核心线程数。创建线程池后,默认情况下线程池中没有任何线程,而是等到有任务到来才创建线程执行任务。除非调用下面两个方法:
maximumPoolSize:线程池的最大线程数。表示在线程池中最多创建几个线程。
keepAliveTime:线程没有任务执行时最多保持多久会终止。
unit:keepAliveTime的单位。七种。
TimeUnit.DAYS; // 天
TimeUnit.HOURS; // 小时
TimeUnit.MINUTES; // 分钟
TimeUnit.SECONDS; // 秒
TimeUnit.MILLISECONDS; // 毫秒
TimeUnit.MICROSECONDS; // 微秒。即千分之一毫秒
TimeUnit.NANOSECONDS; // 纳秒。即千分之一微秒
workQueue:阻塞队列BlockingQueue。用来存储等待执行的任务。一般有以下几种选择:
ArrayBlockingQueue:使用较少。是一个基于数组结构的有界阻塞队列。
LinkedBlockingQueue:使用较多。一个基于链表结构的阻塞队列。吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法 wFixedThreadPool()使用了这个队列。
PriorityBlockingQueue:使用较少。一个具有优先级的无限阻塞队列。
SynchronousQueue:使用较多。一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 wCachedThreadPool 使用了这个队列。
线程池的排队策略(拒绝策略)与BlockingQueue有关。
threadFactory:线程工厂。用来创建线程。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
handler:拒绝策略。表示拒绝处理任务时的策略。四种取值:
java线程池继承体系
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的。
ExecutorService接口声明了一些方法:submit()、invokeAll()、invokeAny()、shutDown()等。
抽象类AbstractExecutorService基本实现了ExecutorService中声明的所有方法。
ThreadPoolExecutor中有几个非常重要的方法:execute()、submit()、shutdown()、shutdownNow()。
execute()是Executor接口中声明的方法,在ThreadPoolExecutor中进行了实现。这个方法是ThreadPoolExecutor的核心方法,通过该方法可以向线程池提交一个任务,交给线程池去处理。
submit()是ExecutorService中声明的方法,在AbstractExecutorService中就已经有了具体的实现,在ThreadPoolExecutor中并没有对其重写(直接通过父类继承过来)。这个方法也是提交任务交给线程池处理,区别就是参数传递的是Callable,能有返回值;而execute()的参数是Runnable,不能有返回值。submit()方法底层实际上还是调用了execute()方法,只不过它利用了 Future 来获取任务执行结果。
shutdown()和shutdownNow()是用来关闭线程池的。
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。shutdown 或 shutdownNow 方法来关闭线程池,它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。ThreadPoolExecutor中还有一些方法:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,在此不重点讨论。
另外还有两个不常用的方法,作用是动态调整线程池容量
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
RUNNING -> SHUTDOWN:如果调用了 shutdown()方法,则线程池处于 SHUTDOWN 状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕。(RUNNING or SHUTDOWN) -> STOP:如果调用了 shutdownNow()方法,则线程池处于 STOP 状态,此时线程池不能接受新的任务,并且会去尝试立即终止正在执行的任务。SHUTDOWN -> TIDYING:当线程池和队列都为空时,则线程池处于 TIDYING 状态。STOP -> TIDYING:当线程池为空时,则线程池处于 TIDYING 状态。TIDYING -> TERMINATED:当 terminated() 回调方法完成时,线程池处于 TERMINATED 状态。在ThreadPoolExecutor类中,最核心的方法是execute()方法(向线程池提交任务),虽然通过submit()也可以提交任务,但是实际上submit()方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可。
我们可以使用 execute 提交任务,但是 execute 方法没有返回值,所以无法判断任务是否被线程池执行成功。所以一个方法假设操作10次数据库,那么我们必须将最重要的、必须更新成功的几次数据库操作搞成同步;而不需要返回值、相对来说不是特别重要、不一定要必须更新成功的数据库操作可以搞成异步。异步有两种,MQ和多线程,本文专指多线程。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}
我们也可以使用 submit 方法来提交任务,它会返回一个 Future ,那么我们可以通过这个 Future 来判断任务是否执行成功。
通过 Future 的 get 方法来获取返回值,get 方法会阻塞住主线程直到任务完成。而使用 get(long timeout, TimeUnit unit) 方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {Object s = ();
} catch (InterruptedException e) {// 处理中断异常
} catch (ExecutionException e) {// 处理无法执行任务异常
} finally {// 关闭线程池executor.shutdown();
}
线程池任务执行原理。(网图)
可以直接使用new ThreadPoolExecutor创建线程池,可以使用Executors类中提供的几个静态方法来创建线程池(不推荐):
wCachedThreadPool(); //创建一个缓冲池,缓冲池容量(工作线程大小)大小为Integer.MAX_VALUE
wSingleThreadExecutor(); //创建容量为1的缓冲池
wFixedThreadPool(int); //创建固定容量大小的缓冲池
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
详细描述如下。主要看下面的详细描述即可。
newCachedThreadPool
概述:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
特点:
注意:在使用 CachedThreadPool 时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
实例
public class CachedThreadPoolDemo {public static void main(String[] args) {ExecutorService executorService = wCachedThreadPool();for (int i = 0; i < 10; i++) {final int index = i;try {Thread.sleep(index * 1000);} catch (InterruptedException e) {e.printStackTrace();}ute(() -> System.out.println(Thread.currentThread().getName() + " 执行,i = " + index));}}
}
newFixedThreadPool
概述:创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
特点:FixedThreadPool 是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
实例
public class FixedThreadPoolDemo {public static void main(String[] args) {ExecutorService executorService = wFixedThreadPool(3);for (int i = 0; i < 10; i++) {final int index = ute(() -> {try {System.out.println(Thread.currentThread().getName() + " 执行,i = " + index);Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}});}}
}
newSingleThreadExecutor
概述:创建一个单线程化的 Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。
特点:单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
实例:
public class SingleThreadExecutorDemo {public static void main(String[] args) {ExecutorService executorService = wSingleThreadExecutor();for (int i = 0; i < 10; i++) {final int index = ute(() -> {try {System.out.println(Thread.currentThread().getName() + " 执行,i = " + index);Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}});}}
}
newScheduleThreadPool
概述:创建一个线程池,可以安排任务在给定延迟后运行,或定期执行。
实例
public class ScheduledThreadPoolDemo {private static void delay() {ScheduledExecutorService scheduledThreadPool = wScheduledThreadPool(5);scheduledThreadPool.schedule(() -> System.out.println(Thread.currentThread().getName() + " 延迟 3 秒"), 3,TimeUnit.SECONDS);}private static void cycle() {ScheduledExecutorService scheduledThreadPool = wScheduledThreadPool(5);scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName() + " 延迟 1 秒,每 3 秒执行一次"), 1, 3,TimeUnit.SECONDS);}public static void main(String[] args) {delay();cycle();}
}
ThreadPoolTaskExecutor:Spring的线程池。
我们一般是不允许使用Executors创建线程池,而是通过自定义ThreadPoolExecutor的方式。因为FixedThreadPool和SingleThreadExecutor的队列长度是Integer.MAX_VALUE,可能会堆积大量请求造成OOM。而CachedThreadPool和ScheduledThreadPool线程池的大小是Integer.MAX_VALUE,可能会创建大量的线程造成OOM。使用线程池,一般都会使用自定义参数的ThreadPoolExecutor。
CPU数量*目标CPU使用率*(1+等待时间与计算时间的比值)。 slf4j.Slf4j;
import org.urrent.ThreadPoolTaskExecutor;import urrent.ThreadPoolExecutor;/*** 如果队列满了, 就在原始线程执行, 不允许丢弃*/
@Slf4j
public class CommonAsyncExecutor {private static ThreadPoolTaskExecutor executor;static {executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(32);executor.setKeepAliveSeconds(128);executor.setQueueCapacity(128);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();}public static void execute(Runnable runnable) {log.info(ute {}", runnable);executor.submit(runnable);}}
@Test
public void test1000() throws Exception{ute(new Runnable() {@Overridepublic void run() {method01();method02();method03();}});
}
线程池中线程数目PoolSize();
队列中等待执行的任务数目Queue().size();
已执行完的任务数目CompletedTaskCount();
@Configuration
public class TaskExecutorConfig implements AsyncConfigurer {// CPU核数 final static int nThreads = Runtime().availableProcessors();/*** Set the ThreadPoolExecutor's core pool size.*/private static final int CORE_POOL_SIZE = nThreads;/*** Set the ThreadPoolExecutor's maximum pool size.*/private static final int MAX_POOL_SIZE = nThreads;/*** Set the capacity for the ThreadPoolExecutor's BlockingQueue.*/private static final int QUEUE_CAPACITY = 1000;/*** 通过重写getAsyncExecutor方法,制定默认的任务执行由该方法产生* <p>* 配置类实现AsyncConfigurer接口并重写getAsyncExcutor方法,并返回一个ThreadPoolTaskExevutor* 这样我们就获得了一个基于线程池的TaskExecutor*/@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(CORE_POOL_SIZE);taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);taskExecutor.setQueueCapacity(QUEUE_CAPACITY);taskExecutor.initialize();taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());return taskExecutor;}
}
思路:为什么使用线程池 -> 线程池分类 -> 自定义线程池 -> 线程池各参数 -> 线程池执行原理。
本文发布于:2024-03-23 18:59:58,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/1711191598154893.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
| 留言与评论(共有 0 条评论) |