利用Java的Executor框架运行多线程
对于原生的线程处理,通常面对一些复杂应用时,我们很容易陷入到线程的一些繁琐的细节当中,代码出问题的风险特别大。好在从JDK1.5版本开始,Java直接为我们提供了一个专门用于并发线程的包java.util.concurrent,帮助很多对多线程理解得不透彻的程序员更加轻松地使用多线程。
实验简介
对于原生的线程处理,通常面对一些复杂应用时,我们很容易陷入到线程的一些繁琐的细节当中,代码出问题的风险特别大。好在从JDK1.5版本开始,Java直接为我们提供了一个专门用于并发线程的包java.util.concurrent,帮助很多对多线程理解得不透彻的程序员更加轻松地使用多线程。
其中Eexecutor接口作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,并用Runnable来表示任务,同时还提供了对线程生命周期的支持,线程池的自动管理,以及统计信息收集,应用程序管理机制和性能监视等机制。
本节实验主要为大家介绍Executor框架中多线程的用法,为我们后续的性能测试脚本开发和场景设计中对于多线程这门关键技术提供更稳定的解决方案。
实验目的
1.理解Executor框架的核心体系结构及关键对象。
2.熟练运用Executor框架管理线程池。
3.熟练运用Executor框架处理多线程及并发策略。
实验流程
1.线程池。
对于数据库连接,我们经常听到数据库连接池这个概念。因为建立数据库连接时非常耗时的一个操作,其中涉及到网络I/O的一些操作。因此人们就想到通过一个连接池来管理与数据库的连接。需要连接的话,就从连接池里取一个。当使用完了,就“关闭”连接,这不是正在意义上的关闭,只是把连接放回到我们的池里,供其他人继续使用。所以对于线程,也有了线程池这个概念,其中的原理和数据库连接池是差不多的。
线程池的作用就是用来限制系统中使用线程的数量以及更好的使用线程。根据系统的运行情况,可以自动或手动设置线程数量,达到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了,又会浪费系统的资源。用线程池配置数量,其他线程排队等候。当一个任务执行完毕后,就从队列中取一个新任务运行,如果没有新任务,那么这个线程将等待。如果来了一个新任务,但是没有空闲线程的话,那么把任务加入到等待队列中。这种线程池管理的方式可以更好地复用已经创建的线程,避免频繁的线程资源回收和新建,其执行效率也将更高。
2.Executor关键对象。
Executor | 一个接口,其定义了一个接收Runnable对象的方法executor。 |
ExecutorService | 线程池接口, 是一个比Executor使用更广泛的子类接口,并且提供了生命周期管理的方法。 |
ThreadPoolExecutor | ExecutorService的默认实现。 |
ScheduledExecutorService | 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。 |
ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。 |
3.创建单线程的线程池。
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
package com.woniuxy.thread;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class ExecutorDemo {
public static void main(String[] args) { ExecutorDemo exeDemo = new ExecutorDemo(); exeDemo.singlePool(); }
// 单线程线程池 public void singlePool() { // 创建一个单线程的线程池对象esSingle ExecutorService esSingle = Executors.newSingleThreadExecutor(); for (int i=0; i<10; i++) { // 调用execute方法执行线程。Runnable虽然是一个接口,无法直接实例化, // 但在内部匿名实例中实例化的同时给出了其run()方法的实现,这样是允许的。 esSingle.execute(new Runnable() { public void run() { System.out.println("当前执行线程为:" + Thread.currentThread().getName()); } }); } } } |
上述代码执行后,进程并不会结束,因为线程池并没有被回收,线程仍处理待命的状态。除非我们调用该线程池的shutdown()方法强制关闭当前线程池。
4.创建固定大小的线程池。
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
public void fixedPool() { ExecutorService esFixed = Executors.newFixedThreadPool(5); for (int i=0; i<10; i++) { esFixed.execute(new Runnable() { public void run() { System.out.println("当前执行线程为:" + Thread.currentThread().getName()); } }); } esFixed.shutdown(); // 强制关闭当前线程池 } |
5.创建可缓存的线程池。
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1)。
public void cachedPool() { ExecutorService esCached = Executors.newCachedThreadPool(); for (int i=0; i<10; i++) { esCached.execute(new Runnable() { public void run() { System.out.println("当前执行线程为:" + Thread.currentThread().getName()); } }); } // esCached.shutdown(); } |
6.创建定时任务的线程池。
该线程池支持定时以及周期性执行任务的需求,在性能测试中非常有用,演示代码如下:
public void scheduledPool() { System.out.println("线程开始运行时间为:" + new Date().toLocaleString()); ScheduledExecutorService esScheduled = Executors.newScheduledThreadPool(5); // 暂停3秒钟后运行一次线程 esScheduled.schedule(new Runnable() { public void run() { System.out.println("暂停3秒后输出一次:" + new Date().toLocaleString()); } }, 3, TimeUnit.SECONDS); // 暂停5秒后,以后每3秒钟运行一次线程,重复直到关闭 esScheduled.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("延迟5秒后每3秒输出:" + new Date().toLocaleString()); } }, 5, 3, TimeUnit.SECONDS); } |
7.使用ThreadPoolExecutor创建线程池。
事实上,上述四种线程池,其最终都是通过实例化ThreadPoolExecutor并提供不同的参数来完成线程池的创建的。所以说,ThreadPoolExecutor是所有线程池构建方法中的核心类,我们也可以直接实例化ThreadPoolExecutor类来完成线程池的创建:
public void threadPool() { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 100, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue(10)); for (int i=0; i<20; i++) { tpe.execute(new Runnable() { public void run() { System.out.println("当前执行线程为:" + Thread.currentThread().getName()); } }); } } |
一方面,我们可以通过查看ThreadPoolExecutor的其中一个构造方法的参数说明来进一步了解线程池创建的方式。其构造方法的定义为:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } |
(1)corePoolSize:默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。在没有设置allowCoreThreadTimeOut为true情况下,核心线程不会销毁。
(2)maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。
(3)keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize 时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。即当线程池中的线程数大于corePoolSize 时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了 allowCoreThreadTimeOut()方法,在线程池中的线程数不大于corePoolSize 时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
(4)unit:参数keepAliveTime的时间单位。可以设置天,时,分,秒,毫秒,微秒,纳秒。
(5)workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下三种选择:ArrayBlockingQueue表示有大小限制的队列,LinkedBlockingQueue表示无大小限制的队列,SynchronousQueue表示有等待线程时才会加入到队列中。
另外一方面,我们不妨通过查看源代码的方式,来看看Executors类的几个创建线程池的方法,最终调用的其实都是ThreadPoolExecutor来完成线程池的创建。比如创建可缓存的线程池,我们可以看到其线程池构造过程中指定的最小线程数为0,最大线程数为整数的最大值,60秒的状态保持时间,创建是一个可同步的队列。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } |
再如创建固定大小的线程池,其最小线程数和最大线程数均为指定的线程数量,这便可以实现固定的线程数大小。其实现代码为:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } |
虽然我们知道ThreadPoolExecutor的实现原理,但是仍然建议大家使用Executors对象中预先已经定义好的几个方法来完成线程池的创建,因为绝大部分情况下,Executors对象的四个方法已经足够我们使用。
思考练习
1.尝试使用原生的线程执行方式实现一个固定周期的线程执行。