Java程序中,常见有4种方式实现多线程
①继承Thread类
②实现Runnable接口
③实现Callable接口
④使用Executor框架
在JDK5之前,创建线程有2种方式,一种是继承Thread类,另外一种是实现Runnable接口。这2种方式在执行完任务之后都无法获取执行结果,如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。自Java 5起,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
方式①举例:继承Thread类,实现run()方法,调用start()方法启动线程
public class ThreadSample extends Thread {
@Override
public void run() {
System.out.println(this.getName() + " do some work...");
}
public static void main(String[] args) {
ThreadSample threadSample = new ThreadSample();
threadSample.setName("thread-a");
threadSample.start();
}
}
start()方法调用后并不是立即执行多线程代码,而是使得该线程变为Ready状态,等待CPU分配执行时间。
方式②举例:实现Runnable接口,实现run()方法,将实例对象传入Thread构造方法
public class ThreadSample implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " do some work...");
}
public static void main(String[] args) {
Thread threadSample = new Thread(new ThreadSample(), "thread-b");
threadSample.start();
}
}
方式③举例:实现Callable接口和FutureTask对象组合
public class ThreadSample implements Callable<Integer> {
@Override
public Integer call() {
int result = 0;
for (int i = 0; i <= 10; i++) {
result++;
}
return result;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1、实例化Callable对象
ThreadSample callableSample = new ThreadSample();
//2、创建装载线程的FutureTask对象
FutureTask<Integer> ft = new FutureTask<Integer>(callableSample);
//3、启动线程
Thread thread = new Thread(ft, "thread-callable");
thread.start();
//4、获取返回结果
Integer result = ft.get();
System.out.println("result = " + result);
}
}
与使用Runnable相比,Callable功能更强大
方式④举例:线程池实现
public class ThreadSample implements Callable<Integer> {
@Override
public Integer call() {
int result = 0;
for (int i = 0; i <= 10; i++) {
result++;
}
return result;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService services = Executors.newSingleThreadExecutor();
Future<Integer> future = services.submit(new ThreadSample());
System.out.println("result = " + future.get());
services.shutdown();
}
}
以上继承Thread类、实现Runnable接口、实现Callable接口三种方式中,理论上优先选用Runnable接口和Callable接口,如果有需要返回值则选用Callable接口实现方式。此外,无论何时,当看到这种形式的代码:
new Thread(runnable).start()
并且最终希望有一个更加灵活的执行策略时,都可以认真考虑使用Executor代替Thread。
在线程池中执行任务线程,比起每个任务创建一个线程,有很多优势。
Java线程池的核心实现类是ThreadPoolExecutor类,任务提交到线程池时,具体处理由ThreadPoolExecutor类的execute()方法执行。当一个新任务提交到线程池时,线程池的处理流程如下:
①判断核心线程池里的线程是否都在执行任务,如果不是,创建一个新的工作线程来执行任务。如果是,则进行下一步流程。
②判断阻塞队列是否已满,如果没满,则将新提交的任务存储在阻塞队列中。如果满,则进行下一步流程。
③判断线程池中的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果线程池中所有的线程都处于工作状态,则交给饱和策略来处理这个任务。
ThreadPoolExecutor通用的构造函数为:
参数说明如下
corePoolSize
maximuPoolSize
keepAliveTime
unit
workQueue
threadFactory
handler
饱和策略,也称拒绝策略。当有限队列满且线程池满的情况下,新的任务到达后,饱和策略将进行处理。有以下几种:
抛出RejectedExecutionException异常。默认策略。调用者可以捕获抛出的异常,进行相应的处理。
不会丢弃任务,也不会抛出异常,由向线程池提交任务的线程来执行该任务。
丢弃当前的任务
丢弃最旧的任务(最先提交而没有得到执行的任务),并执行当前任务。
ThreadPoolExecutor执行流程如下
①当新任务提交时,如果当前线程池中的线程数小于corePoolSize,则创建新的线程处理。
②如果线程池中的线程大于或等于corePoolSize,且BlockingQueue未满,则将新任务加入BlockingQueue。
③如果BlockingQueue已满,且线程池中的线程数小于maximumPoolSize,则创建新的工作线程来执行任务。
④如果当前运行的线程大于或等于maximumPoolSize,将执行饱和策略。即调用
RejectedExecutionHandler.rejectExecution()方法。
Executors提供了一些静态工厂方法创建的常见线程池。
创建一个固定长度的线程池,每当提交一个任务就创建一个线程,直到达到线程池最大长度,这时线程池长度不再变化。如果一个线程异常退出,线程池会补充一个新的线程。
创建一个可缓存的线程池,不会对池的长度做限制。如果线程池长度超过需求,它可以灵活地回收空闲的线程;当需求增加时,它可以灵活地添加新的线程。
创建一个单线程的executor,只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有一个新的取代它。
创建一个定长的线程池,支持定时执行任务。
这几种线程池中,newFixedThreadPool和newSingleThreadExecutor默认使用无线队列LinkedBlockingQueue。newCachedThreadPool使用了同步移交队列SynchronousQueue。newScheduledThreadPool使用了DelayedWorkQueue阻塞队列。
newCachedThreadPool的corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,其他几种线程池corePoolSize与maximumPoolSize一样大。
线程池有5种状态,在ThreadPoolExecutor 源码中有定义。
调用线程池的shutdown()或者shutdownNow()方法可以关闭线程池,遍历线程池中工作线程,逐个调用interrupt方法来中断线程。
Shutdown()方法与shutdownNow()的特点:
Shutdown()方法将线程池的状态设置为SHUTDOWN状态,只会中断空闲的工作线程。
shutdownNow()方法将线程池的状态设置为STOP状态,会中断所有工作线程,不管工作线程是否空闲。
调用两者中任何一种方法,都会使isShutdown()方法的返回值为true;线程池中所有的任务都关闭后,isTerminated()方法的返回值为true。
通常使用shutdown()方法关闭线程池,如果不要求任务一定要执行完,则可以调用shutdownNow()方法。
线程池合理的长度取决于所要执行的任务特征以及程序所部署的系统环境,一般根据这二者因素使用配置文件提供或者通过CPU核数N:
N = Runtime.getRuntime().availableProcessors();
动态计算而不是硬编码在代码中。主要是避免线程池过大或过小这两种极端情况。如果线程池过大,会导致CPU和内存资源竞争,频繁的上下文切换,任务延迟,甚至资源耗尽。如果线程池过小,会造成CPU和内存资源未充分利用,任务处理的吞吐量减小。
对于任务特征来说,需要分清楚是计算密集型任务,还是IO密集型任务或是混合型任务。
计算密集型也称为CPU密集型,意思就是该任务需要大量运算,而没有阻塞,CPU一直全速运行。CPU密集型任务只有在多核CPU上才可能得到加速,即scale up,通过多线程程序享受到增加CPU核数带来的好处。
IO密集型,即该任务需要大量的IO操作,例如网络连接、数据库连接等等,执行任务过程中会有大量的阻塞。在单线程上运行IO密集型任务会导致浪费大量的CPU运算能力浪费在等待。
对于计算密集型任务,原则是配置尽可能少的线程数,通常建议以下计算方式设置线程池大小来获得最优利用率:
N(线程数) = N(CPU核数)+ 1
对于IO密集型任务,考虑的因素会多一些,原则是因为较多的时间处于IO阻塞,不能处理新的任务,所有线程数尽可能大一些,通常建议是:
N(线程数) = 2 x N(CPU核数) + 1
或者更精确的:
N(线程数) = N(CPU核数) x U x (1 + W/C)
其中U表示CPU使用率,W/C表示IO等待时间与计算时间的比率,这个不需要太精确,只需要一个估算值。例如,4核CPU,CPU使用率80%,IO等待时间1秒,计算时间0.1秒,那么线程数为:4.8 x 11≈53。一些文章中还提到这种计算方式:
N(线程数) = N(CPU核数) x U / (1 - f)
其中U表示CPU使用率,f表示阻塞系数,即IO等待时间与任务执行总时间的比率:W/(W + C)。根据上面的例子计算出线程数为:4.8/0.09≈53。两种计算方式的结果是很相近的。
以上的计算方式和建议尽可以作为理论参考值,实际业务中可能并不完全按照这个计算值来设置。可以根据对线程池各项参数的监控,来确定一个合理的值。ThreadPoolExecutor提供的一些可用于获取监控的参数方法如下:
此外,可以通过继承ThreadPoolExecutor并重写它的 beforeExecute(),afterExecute() 和 terminated()方法,我们可以在任务执行前,执行后和线程池关闭前做一些统计、日志输出等等操作,以帮助我们更好地监控到线程池的运行状态。