Administrator
发布于 2025-03-12 / 9 阅读
0
0

封装CompletableFuture并行处理工具类

封装CompletableFuture并行处理工具类

为了专注业务实现,我们采用 枚举 + CompletableFuture + 自定义线程池,封装了一套并行处理业务的工具类,便于在大数据和批处理场景中高效利用线程池。枚举天然具备单例特性,确保线程池的唯一性;同时,不同业务场景可以创建专属单例线程池,以实现任务隔离,防止资源相互干扰。

代码:

package com.pilot.meterage.web.utils;

import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.ObjectUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
 * 使用 CompletableFuture 实现简单的线程池
 *
 * @author yangp
 */
public enum CompletableFutureSimpleThreadPool {
    /**
     * 单例对象
     */
    INSTANCE;

    /**
     * 自定义线程池,用于执行任务
     */
    private final ThreadPoolExecutor singleThreadPool;

    /**
     * 初始化线程池,并设置线程池参数
     */
    CompletableFutureSimpleThreadPool() {
        // 初始化线程池,核心线程数为15,最大线程数为20,线程空闲超时时间为30秒
        singleThreadPool = new ThreadPoolExecutor(
                15, 20,
                30L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                r -> new Thread(r, "CompletableFutureUtils - " + r.hashCode())
        );
    }

    private static final Logger log = LoggerFactory.getLogger(CompletableFutureSimpleThreadPool.class);

    // JVM 关闭时自动关闭线程池
    static {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("[线程池] JVM 关闭前自动关闭线程池...");
            INSTANCE.shutdown();
        }));
    }

    /**
     * 执行多个任务(无返回值)
     *
     * @param tasks 任务数组
     */
    public static void executeTasks(@Nonnull Runnable... tasks) {
        if (ObjectUtil.isEmpty(tasks)) {
            logCheckTaskIsEmpty();
            return;
        }
        long start = logTaskStart();

        // 使用线程池并行执行多个任务
        CompletableFuture<?>[] futures = Arrays.stream(tasks)
                .map(task -> CompletableFuture.runAsync(() -> {
                    try {
                        task.run();
                    } catch (Exception e) {
                        log.error("任务执行失败", e);
                    }
                }, INSTANCE.singleThreadPool))
                .toArray(CompletableFuture[]::new);

        // 打印线程池状态
        logThreadPoolStatus();

        // 等待所有任务执行完成
        CompletableFuture.allOf(futures).join();

        logTaskEnd(start);

    }

    /**
     * 执行多个任务(有返回值,使用 CompletableFuture 作为任务)
     *
     * @param tasks 任务数组
     * @return 任务执行结果的列表
     */
    @SafeVarargs
    public static <T> List<T> executeCompletableFutures(@Nonnull CompletableFuture<T>... tasks) {
        List<T> results;
        if (ObjectUtil.isEmpty(tasks)) {
            logCheckTaskIsEmpty();
            return Collections.emptyList();
        }
        long start = logTaskStart();

        // 使用 allOf 等待所有任务完成
        CompletableFuture<Void> allOf = CompletableFuture.allOf(tasks);
        allOf.join();

        // 收集任务结果
        results = Arrays.stream(tasks)
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

        logTaskEnd(start);
        return results;
    }

    /**
     * 执行多个任务(有返回值,使用 Supplier 作为任务)
     *
     * @param tasks 任务数组
     * @return 任务执行结果的列表
     */
    @SafeVarargs
    public static <T> List<T> executeSuppliers(@Nonnull Supplier<T>... tasks) {
        List<T> results;
        if (ObjectUtil.isEmpty(tasks)) {
            logCheckTaskIsEmpty();
            return Collections.emptyList();
        }
        long start = logTaskStart();

        // 执行多个任务并收集结果
        List<CompletableFuture<T>> futures = Arrays.stream(tasks)
                .map(task -> CompletableFuture.supplyAsync(() -> {
                    try {
                        return task.get();
                    } catch (Exception e) {
                        log.error("任务执行失败", e);
                        return null; // 或者可以选择抛出 RuntimeException
                    }
                }, INSTANCE.singleThreadPool))
                .collect(Collectors.toList());

        // 打印线程池状态
        logThreadPoolStatus();

        // 获取所有任务的执行结果
        results = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

        logTaskEnd(start);

        return results;
    }

    /**
     * 记录线程池当前的状态,包括当前线程池大小和任务数量
     */
    private static void logThreadPoolStatus() {
        log.info("[线程池状态] 池大小:{},活跃线程数:{},排队任务数:{},总任务数:{},完成任务数:{}",
                INSTANCE.singleThreadPool.getPoolSize(),
                INSTANCE.singleThreadPool.getActiveCount(),
                INSTANCE.singleThreadPool.getQueue().size(),
                INSTANCE.singleThreadPool.getTaskCount(),
                INSTANCE.singleThreadPool.getCompletedTaskCount());
    }

    /**
     * 记录任务为空的情况
     */
    private static void logCheckTaskIsEmpty() {
        log.error("[线程池][执行任务] 任务为空");
    }

    /**
     * 记录任务开始执行
     */
    private static long logTaskStart() {
        long startTime = System.currentTimeMillis();
        log.info("[线程池][执行任务] 开始时间:{},开始执行任务", LocalDateTimeUtil.now());
        return startTime;
    }

    /**
     * 记录任务执行结束
     */
    private static void logTaskEnd(long start) {
        log.info("[线程池][执行任务] 结束时间:{},执行任务结束,耗时:{}ms", LocalDateTimeUtil.now(), System.currentTimeMillis() - start);
    }

    /**
     * 关闭线程池(单例线程池应该在 JVM 关闭时才被终止)
     */
    public void shutdown() {
        log.info("[线程池] 正在关闭线程池...");
        singleThreadPool.shutdown();
        try {
            if (!singleThreadPool.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                log.warn("[线程池] 线程池未在规定时间内关闭,强制关闭");
                singleThreadPool.shutdownNow();
            }
            log.info("[线程池] 线程池已关闭");
        } catch (InterruptedException e) {
            log.error("[线程池] 线程池关闭时被中断", e);
            singleThreadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 线程池关闭超时时间(单位:秒)
     */
    public static final long SHUTDOWN_TIMEOUT_SECONDS = 60;
}

测试

package com.pilot.meterage;

import com.pilot.meterage.web.MainApplication;
import com.pilot.meterage.web.utils.CompletableFutureSimpleThreadPool;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * @Author: yangp
 * @Date: 2024/11/12 14:41
 */
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MainApplication.class)
public class CompletableFutureTest {
    @Test
    public void test02() {
        Runnable run1 = () -> {
            // 模拟耗时
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("run1");
        };
        Runnable run2 = () -> {
            // 模拟耗时
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("run2");
        };
        Runnable run3 = () -> {
            // 模拟耗时
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("run3");
        };
        Runnable run4 = () -> {
            // 模拟耗时
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("run4");
        };

        CompletableFutureSimpleThreadPool.executeTasks(run1, run2, run3, run4);
        System.out.println("test02 执行完毕");
    }

    @Test
    public void test03() {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(CompletableFutureTest::getData1);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(CompletableFutureTest::getData2);
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(CompletableFutureTest::getData3);
        List<String> res = CompletableFutureSimpleThreadPool.executeCompletableFutures(future1, future2, future3);
        System.out.println(res);
        System.out.println("test03 执行完毕");
    }

    @Test
    public void test04() {
        List<String> res = CompletableFutureSimpleThreadPool.executeSuppliers(CompletableFutureTest::getData1, CompletableFutureTest::getData2, CompletableFutureTest::getData3);
        System.out.println(res);
        System.out.println("test04 执行完毕");
    }



    public static String getData1() {
        // 模拟耗时
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("data1执行完毕");
        return "data1";
    }

    public static String getData2() {
        // 模拟耗时
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("data2执行完毕");
        return "data2";
    }

    public static String getData3() {
        // 模拟耗时
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("data3执行完毕");
        return "data3";
    }
}


评论