这里我们通过CompletableFuture来实现一个多线程处理异步任务的例子。
这里我们创建10个任务提交到我们指定的线程池中执行,并等待这10个任务全部执行完毕。
每个任务的执行流程为第一次先执行加法,第二次执行乘法,如果发生异常则返回默认值,当10个任务执行完成后依次打印每个任务的结果。
public void demo() throws InterruptedException, ExecutionException, TimeoutException {
// 1、自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(5, 10,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
// 2、集合保存future对象
List<CompletableFuture<Integer>futures = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
CompletableFuture<Integerfuture = CompletableFuture
// 提交任务到指定线程池
.supplyAsync(() -this.addValue(finalI), executorService)
// 第一个任务执行结果在此处进行处理
.thenApplyAsync(k -this.plusValue(finalI, k), executorService)
// 任务执行异常时处理异常并返回默认值
.exceptionally(e -this.defaultValue(finalI, e));
// future对象添加到集合中
futures.add(future);
}
// 3、等待所有任务执行完成,此处最好加超时时间
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
for (CompletableFuture<Integerfuture : futures) {
Integer num = future.get();
System.out.println("任务执行结果为:" + num);
}
System.out.println("任务全部执行完成!");
}
private Integer addValue(Integer index) {
System.out.println("第" + index + "个任务第一次执行");
if (index == 3) {
int value = index / 0;
}
return index + 3;
}
private Integer plusValue(Integer index, Integer num) {
System.out.println("第" + index + "个任务第二次执行,上次执行结果:" + num);
return num * 10;
}
private Integer defaultValue(Integer index, Throwable e) {
System.out.println("第" + index + "个任务执行异常!" + e.getMessage());
e.printStackTrace();
return 10;
}