在Java中,创建线程有三种方式,一是继承Thread类,二是实现Runnable接口,三是使用Callable及Future接口。前两种方式的缺点是在线程任务执行结束后,无法获取执行结果。一般只能采用共享变量或共享存储区以及线程通信的方式实现获得任务结果的目的(不推荐)。第3种方式是Java1.5开始提供的,Callable和Future结合使用使线程具有返回值的功能,其中Callable用来执行任务,产生结果,而Future用来获得结果。
1、Callable和Future的介绍
1.1 Callable
Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务。和Runnable的主要区别如下:
- Callable接口定义的方法是call(),而Runnable接口定义的方法是run()。
- 接口Callable的call()方法可以有返回值,但Runnable接口的run()方法没有返回值。
- Callable接口的call()方法可以声明抛出异常,而Runnable接口的run()方法不可以声明抛出异常。
- 运行Callable任务后可拿到一个Future对象,Future表示异步计算的结果(返回值就是通过Future接口获得的)。
Callable接口与Runnable接口是否相似,我们可以查看源码如下:
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
可以看到,与Runnable接口不同之处在于,call方法带有泛型返回值V。
1.2 Future
Future提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑。对于多线程,如果线程A要等待线程B的结果,那么线程A没必要去等待B的执行结果,而是可以先拿到一个未来的Future,等B有结果时再取真实的结果。
- 通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。
- Future取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的。
- Callable要采用ExecutorSevice的submit方法提交,返回的future对象可以取消任务。
Future常用方法如下:
V get() :获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。
boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。
boolean isCanceller() :如果任务完成前被取消,则返回true。
boolean cancel(boolean mayInterruptRunning) :如果任务还没开始,执行cancel(...)方法将返回false;如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回true;当任务已经启动,执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false;当任务已经完成,执行cancel(...)方法将返回false。mayInterruptRunning参数表示是否中断执行中的线程。
通过方法分析可以看出来Future提供了3种功能:
- 能够中断执行中的任务。
- 判断任务是否执行完成。
- 获取任务执行完成后额结果。
2、无返回值线程的实现方式
2.1 继承Thread类
Thread类是在java.lang包中定义的, 继承Thread类必须重写run()方法。创建好线程类之后就可以创建线程对象了,然后通过start()方法去启动线程。Runnable的中文意思是“任务”,顾名思义,通过实现Runnable接口,我们定义了一个子任务,然后将子任务交由Thread去执行。
- 注意,不是调用run()方法启动线程,run方法中只是定义需要执行的任务,如果调用run方法,即相当于在主线程中执行run方法,跟普通的方法调用没有任何区别,此时并不会创建一个新的线程来执行定义的任务。
- 查看Thread类的源代码会发现Thread类其实是实现了Runnable接口的。
public class Test {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
}
}
class MyThread extends Thread{
private static int num = 0;
public MyThread(){
num++;
}
@Override
public void run() {
System.out.println("主动创建的第"+num+"个线程");
}
}
在上面代码中,通过调用start()方法,就会创建一个新的线程了。为了分清start()方法调用和run()方法调用的区别,请看下面一个例子:
public class Test {
public static void main(String[] args) {
System.out.println("主线程ID:"+Thread.currentThread().getId());
MyThread thread1 = new MyThread("thread1");
thread1.start();
MyThread thread2 = new MyThread("thread2");
thread2.run();
}
}
class MyThread extends Thread{
private String name;
public MyThread(String name){
this.name = name;
}
@Override
public void run() {
System.out.println("name:"+name+" 子线程ID:"+Thread.currentThread().getId());
}
}
运行结果如下:
从输出结果可以得出以下结论:
- thread1和thread2的线程ID不同,thread2和主线程ID相同,说明通过run方法调用并不会创建新的线程,而是在主线程中直接运行run方法,跟普通的方法调用没有任何区别;
- 虽然thread1的start方法调用在thread2的run方法前面调用,但是先输出的是thread2的run方法调用的相关信息,说明新线程创建的过程不会阻塞主线程的后续执行。
Thread类相关方法
//当前线程可转让cpu控制权,让别的就绪状态线程运行(切换)
public static Thread.yield()
//暂停一段时间
public static Thread.sleep()
//在一个线程中调用other.join(),将等待other执行完后才继续本线程。
public join()
//后两个函数皆可以被打断
public interrupte()
2.2 实现Runnable接口
在Java中创建线程除了继承Thread类之外,还可以通过实现Runnable接口来实现类似的功能,实现Runnable接口必须重写其run方法。
- 注意,这种方式必须将Runnable作为Thread类的参数,然后通过Thread的start方法来创建一个新线程来执行该子任务。
- 如果调用Runnable的run方法的话,是不会创建新线程的,这和普通的方法调用没有任何区别。
public class Test {
public static void main(String[] args) {
System.out.println("主线程ID:"+Thread.currentThread().getId());
MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable);
thread.start();
}
}
class MyRunnable implements Runnable{
public MyRunnable() {
}
@Override
public void run() {
System.out.println("子线程ID:"+Thread.currentThread().getId());
}
}
在Java中,以上2种方式都可以用来创建线程去执行子任务,具体选择哪一种方式要看自己的需求。直接继承Thread类的话,可能比实现Runnable接口看起来更加简洁,但是由于Java只允许单继承,所以如果自定义类需要继承其他类,则只能选择实现Runnable接口
3、有返回值线程的实现方式
在java中可以使用ExecutorService、Callable、Future实现有返回结果的多线程。ExecutorService、Callable、Future都是属于Executor框架中的功能类。返回结果的线程是在JDK1.5中引入的新特征,确实很实用,有了这种特征我就不需要再为了得到返回值而大费周折了,而且即便实现了也可能漏洞百出。
- 想要详细了解Executor框架的可以访问 Java并发编程之异步执行器Executor框架概述,这里面对该框架做了很详细的解释。
可返回值的任务必须实现Callable接口,执行Callable任务后,可以获取到一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了,再结合线程池接口ExecutorService就可以实现有返回结果的多线程了。
3.1 submit(Callable<T>)
的使用
/**
* 有返回值的线程
*/
@SuppressWarnings("unchecked")
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("----程序开始运行----");
Date date1 = new Date();
int taskSize = 5;
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(taskSize);
// 创建多个有返回值的任务
List<Future> list = new ArrayList<Future>();
for (int i = 0; i < taskSize; i++) {
Callable c = new MyCallable(i + " ");
// 执行任务并获取Future对象
Future f = pool.submit(c);
// System.out.println(">>>" + f.get().toString());
list.add(f);
}
// 关闭线程池
pool.shutdown();
// 获取所有并发任务的运行结果
for (Future f : list) {
// 从Future对象上获取任务的返回值,并输出到控制台
System.out.println(">>>" + f.get().toString());
}
Date date2 = new Date();
System.out.println("----程序结束运行----,程序运行时间【" + (date2.getTime() - date1.getTime()) + "毫秒】");
}
}
class MyCallable implements Callable<Object> {
private String taskNum;
MyCallable(String taskNum) {
this.taskNum = taskNum;
}
public Object call() throws Exception {
System.out.println(">>>" + taskNum + "任务启动");
Date dateTmp1 = new Date();
Thread.sleep(1000);
Date dateTmp2 = new Date();
long time = dateTmp2.getTime() - dateTmp1.getTime();
System.out.println(">>>" + taskNum + "任务终止");
return taskNum + "任务返回运行结果,当前任务时间【" + time + "毫秒】";
}
}
代码说明:
- 上述代码中Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
- ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。
- future模式是并发模式的一种,可以有两种形式,即无阻塞和阻塞,分别是isDone和get。其中Future对象用来存放该线程的返回值以及状态。
ExecutorService e = Executors.newFixedThreadPool(3);
//submit方法有多重参数版本,及支持callable也能够支持runnable接口类型.
Future future = e.submit(new myCallable());
future.isDone() //return true,false 无阻塞
future.get() // return 返回值,阻塞直到该线程运行结束
3.2 submit(Runnable)和isDone()的使用
方法submit()不仅可以传入Callable对象,也可以传入Runnable对象,说明submit方法支持有返回值和无返回值的功能。如果submit方法传入Callable接口则可以有返回值,如果传入Runnable则无返回值,打印的结果就是null。方法get()具有阻塞特性,而isDone()方法具有无阻塞的特性。boolean isDone()是判断任务是否执行完。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 如果submit方法传入Callable接口则可以有返回值,如果传入Runnable则无返回值,打印的结果就是null。方法get()具有阻塞特性,而isDone()方法无阻塞特性
*/
public class FutureDemo {
//Callable对象
public static class MyCallable implements Callable<String>{
@Override
public String call() throws Exception {
//模拟任务执行时间
Thread.sleep(1000);
return Thread.currentThread().getName()+"执行完毕";
}
}
//main方法
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
//Future是submit()方法的返回值
Future future = executorService.submit(new MyCallable());
//是否执行完毕 MyCallable任务模拟执行任务时间1s
System.out.println("任务是否执行完毕:" + future.isDone());
//从Future中取得返回值
System.out.println("任务的返回值:"+future.get());
//是否执行完毕
System.out.println("任务是否执行完毕:" + future.isDone());
}
}
执行结果如下:
任务是否执行完毕:false
任务的返回值:pool-1-thread-1执行完毕
任务是否执行完毕:true
3.3 submit(Runnable,T result)的使用
方法submit(Runnable,T result)的第2个参数result可以作为执行结果的返回值,而不需要使用get()方法来进行获得。
- Runnable对象没有返回值,submit(Runnable, T result)的result对象可以作为返回值,通过get()获取。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class FutureDemo {
//user对象,引用类型
public static class User {
private String name;
private String age;
public User(String name, String age) {
this.name = name;
this.age = age;
}
// getter和setter方法略
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age='" + age + '\'' +
'}';
}
}
public static class MyRunnable implements Runnable{
String result = "结果2";
public MyRunnable(User u) {
super();
u.setName("更改后姓名");
u.setAge("更改后年龄");
}
@Override
public void run() {
try {
//模拟任务执行时间
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
User user = new User("原姓名","原年龄");
//Future是submit()方法的返回值
Future<User> future = executorService.submit(new MyRunnable(user),user);
//从Future中取得返回值
System.out.println("任务的返回值:"+future.get().toString());
}
}
submit(Runnable, T result)对象将预先传给submit()方法的参数作为返回值返回了。本例中,我们用了User这个对象,实现了赋值再传给Runnable对象,当任务执行时,对User对象的值进行了修改,此时作为submit()参数的User对象值就改变了,从而实现了Runnable可以返回值的效果。执行结果如下:
pool-1-thread-1执行完毕
任务的返回值:User{name='更改后姓名', age='更改后年龄'}
3.4 方法cancel和isCancelled的使用
- 方法
cancel(boolean mayInterruptIfRunning)
的参数 mayInterruptIfRunning 的作用是:如果线程正在运行则是否中断正在运行的线程,在代码中需要使用if(Thread.currentThread().isInterrupted())
进行配合。 - 方法
cancel()
的返回值代表发送取消任务的命令是否成功完成。 cancel(boolean )
方法取消不了已经执行完的任务,cancel(true)
可以取消正在执行的任务,cancel(false)
取消不了正在执行的任务。cancel(boolean )
方法的取消只是将线程的isInterrupted进行了更改,并不是正在意义的中断。
public class FutureDemo {
//Callable对象
public static class MyCallable implements Callable<String>{
String name;
public MyCallable(String name) {
super();
this.name = name;
}
@Override
public String call() {
try {
//模拟任务执行时间
Thread.sleep(1000);
System.out.println(name+"执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName()+"执行完毕";
}
}
//main方法
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,3,10,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
//Future是submit()方法的返回值
Future future1 = threadPoolExecutor.submit(new MyCallable("future1"));
Thread.sleep(1100);
//此时future1对应任务已经执行完,无法取消
System.out.println("future1取消方法是否执行成功:" + future1.cancel(true)+";是否已经取消:" + future1.isCancelled());
Future future2 = threadPoolExecutor.submit(new MyCallable("future2"));
Thread.sleep(500);
//此时future2对应任务正在执行,cancel(true)表示可以取消正在执行任务
System.out.println("future2取消方法是否执行成功:" + future2.cancel(true)+";是否已经取消:" + future2.isCancelled());
Future future3 = threadPoolExecutor.submit(new MyCallable("future3"));
Thread.sleep(1500);
//此时future3对应任务正在执行,cancel(false)表示无法取消正在执行任务
System.out.println("future3取消方法是否执行成功:" + future3.cancel(false)+";是否已经取消:" + future3.isCancelled());
}
}
执行结果如下:
future1执行完毕
future1取消方法是否执行成功:false;是否已经取消:false
future2取消方法是否执行成功:true;是否已经取消:true
//MyCallable代码有sleep,所以在正确取消任务,线程isInterrupted为true之后,会抛异常
java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at com.example.demo.FutureDemo$MyCallable.call(FutureDemo.java:18)
at com.example.demo.FutureDemo$MyCallable.call(FutureDemo.java:7)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
future3执行完毕
future3取消方法是否执行成功:false;是否已经取消:false
3.5 方法get(long timeout,TimeUnit unit)的使用
方法 get(long timeout,TimeUnit unit)
的作用是在指定的最大时间内等待获得返回值。
public class FutureDemo {
//Callable对象
public static class MyCallable implements Callable<String>{
String name;
public MyCallable(String name) {
super();
this.name = name;
}
@Override
public String call() {
try {
//模拟任务执行时间
Thread.sleep(10000);
System.out.println(name+"执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName()+"执行完毕";
}
}
//main方法
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = null;
try {
threadPoolExecutor = new ThreadPoolExecutor(1,3,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
//Future是submit()方法的返回值
Future future1 = threadPoolExecutor.submit(new MyCallable("future1"));
System.out.println(future1.get(5L,TimeUnit.SECONDS));
} catch (Exception e) {
e.printStackTrace();
}
}
}
本例中线程任务执行时间是10s中,而get方法只等待5s中,到时间后,没有等待返回值抛异常:TimeoutException。执行结果如下:
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204)
at com.example.demo.FutureDemo.main(FutureDemo.java:35)
future1执行完毕
4、Callable异常处理
接口Callable 任务在执行时有可能会出现异常,在异常出现后Callable如何处理要区分具体情况:
- Callable 任务抛异常,主线程没有调用get()方法时,异常不会被主线程捕获。
- Callable 任务抛异常,主线程调用get()方法时,异常可以被主线程捕获。
4.1 主线程未调用get()方法
Callable 任务抛异常,主线程没有调用get()方法时,异常不会被主线程捕获。
public class FutureDemo {
//Callable对象
public static class MyCallable implements Callable<String>{
String name;
public MyCallable(String name) {
super();
this.name = name;
}
@Override
public String call() throws Exception{
//模拟任务执行时间
Thread.sleep(1000);
//模拟异常,call()方法直接将异常抛出
Integer.parseInt("a");
System.out.println(name+"执行完毕");
return Thread.currentThread().getName()+"执行完毕";
}
}
//main方法
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = null;
try {
threadPoolExecutor = new ThreadPoolExecutor(1,3,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
//Future是submit()方法的返回值
Future future1 = threadPoolExecutor.submit(new MyCallable("future1"));
//System.out.println(future1.get(5L,TimeUnit.SECONDS));
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果:控制台为空!
4.2 主线程调用get()方法
Callable 任务抛异常,主线程调用get()方法时,异常可以被主线程捕获。
public class FutureDemo {
//Callable对象
public static class MyCallable implements Callable<String>{
String name;
public MyCallable(String name) {
super();
this.name = name;
}
@Override
public String call() throws Exception{
//模拟任务执行时间
Thread.sleep(1000);
//模拟异常,call()方法直接将异常抛出
Integer.parseInt("a");
System.out.println(name+"执行完毕");
return Thread.currentThread().getName()+"执行完毕";
}
}
//main方法
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = null;
try {
threadPoolExecutor = new ThreadPoolExecutor(1,3,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
//Future是submit()方法的返回值
Future future1 = threadPoolExecutor.submit(new MyCallable("future1"));
System.out.println(future1.get(5L,TimeUnit.SECONDS));
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果如下:
java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "a"
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
at com.example.demo.FutureDemo.main(FutureDemo.java:32)
Caused by: java.lang.NumberFormatException: For input string: "a"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at com.example.demo.FutureDemo$MyCallable.call(FutureDemo.java:19)
at com.example.demo.FutureDemo$MyCallable.call(FutureDemo.java:7)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
5、自定义拒绝策略
自定义拒绝策略 RejectedExecutionHandler 接口的主要作用是当线程池关闭后依然有任务要执行时,可以实现一些处理。
public class FutureDemo {
public static class MyRejectedExecutionHandler implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
System.out.println(((FutureTask)runnable).toString() + "被拒绝");
}
}
//Callable对象
public static class MyCallable implements Callable<String>{
String name;
public MyCallable(String name) {
super();
this.name = name;
}
@Override
public String call() throws Exception{
//模拟任务执行时间
Thread.sleep(1000);
System.out.println(name+"执行完毕");
return Thread.currentThread().getName()+"执行完毕";
}
}
//main方法
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = null;
try {
threadPoolExecutor = new ThreadPoolExecutor(1,3,10, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
//使用自定义的拒绝策略
threadPoolExecutor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
//Future是submit()方法的返回值
Future future1 = threadPoolExecutor.submit(new MyCallable("future1"));
Future future2 = threadPoolExecutor.submit(new MyCallable("future2"));
Future future3 = threadPoolExecutor.submit(new MyCallable("future3"));
threadPoolExecutor.shutdown();
Future future4 = threadPoolExecutor.submit(new MyCallable("future4"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果如下:
java.util.concurrent.FutureTask@5010be6[Not completed, task = com.example.demo.FutureDemo$MyCallable@2e0fa5d3]被拒绝
future1执行完毕
future2执行完毕
future3执行完毕
6、总结
- 接口 future 实现类是futureTask.java ,而且在使用线程池时,默认的情况下也是使用 futureTask.java 类作为接口 Future 实现类,也就是说,如果在使用 future和Callable情况下,使用 Future 接口也就是在使用 futureTask.java类。
- Callable 接口与 Runnable 接口在对比时主要的优点是,Callable 接口可以通过 Future取得返回值,但是需要注意的是,Future 接口调用 get()方法取得处理的结果值时是阻塞性的,也就是如果调用Future 对象的 get()方法时,任务尚未执行完成,则调用 get()方法时一直堵塞到此任务完成时为止,如果是这样的效果,则前面先执行的任务一旦耗时很多,则后面的任务调用 get()方法就呈阻塞状态,也就是排队进行等待,大大影响运行效率,也就是主线程并不能保证首先获得的是最先完成任务的返回值,这就future 的缺点,影响效率。
- 根据这个特性, JDKl.5 提供了CompletionService 接口可以解决这个问题。
评论区