首页 前端知识 异步线程池、异步执行器

异步线程池、异步执行器

2024-02-13 10:02:00 前端知识 前端哥 850 445 我要收藏

 

package org.example.thread;

public class Test {

    public static void main(String[] args) {
        AsyncProcessExecutor normalExecutor = AsyncProcessExecutorFactory.createNormalExecutor();
        normalExecutor.put(() -> {
            System.out.println("123");
            test();
        });
        normalExecutor.execute();
        System.out.println("00000000000000000000000000000");
    }

    private static void test(){
        try {
            Thread.sleep(1000);
            System.out.println("test...............");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

线程池代码

package org.example.thread;

/**
 * 异步进程 执行器
 *
 * @date 2021年7月15日13:43:37
 */
public interface AsyncProcessExecutor {


    /**
     * 存放任务
     *
     * @param task 任务
     * @return AsyncProcessExecutor
     */
    AsyncProcessExecutor put(final Runnable task);

    /**
     * 执行
     *
     * @return boolean
     */
    boolean execute();

}
package org.example.thread;

import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * 多线程锁执行器 正常处理
 *
 * @date 2020-12-10 10:36
 */
@Slf4j
public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor {

    /**
     * 线程池字典
     */
    private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap();

    /**
     * 线程Key
     */
    private final String key;

    /**
     * 任务队列
     */
    private final List<Runnable> taskList;

    /**
     * 执行器
     */
    private final AsyncProcessor processor;

    /**
     * 构造函数
     */
    public AsyncProcessExecutorByNormal() {
        this.key = "def";
        taskList = new ArrayList<>();
        processor = AsyncProcessExecutorByNormal.getProcessor(this.key);
    }

    /**
     * 构造函数
     *
     * @param key 线程池唯一Key
     */
    public AsyncProcessExecutorByNormal(String key) {
        this.key = key;
        taskList = new ArrayList<>();
        processor = AsyncProcessExecutorByNormal.getProcessor(this.key);
    }

    /**
     * 获得执行器
     *
     * @param key Key
     * @return AsyncProcessor
     */
    private synchronized static AsyncProcessor getProcessor(String key) {
        AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
        if (null == asyncProcessor) {
            asyncProcessor = new AsyncProcessor();
            asyncProcessor.init(key);
            EXECUTOR_MAP.put(key, asyncProcessor);
        }
        return asyncProcessor;
    }

    /**
     * 执行
     *
     * @param task 任务
     */
    @Override
    public AsyncProcessExecutorByNormal put(final Runnable task) {
        taskList.add(task);
        return this;
    }

    // ====================================

    /**
     * 执行 线程锁 等待查询结果 结果完成后继续执行
     *
     * @return boolean 最终直接结果
     */
    @Override
    public boolean execute() {
        if (CollUtil.isEmpty(this.taskList)) {
            return true;
        }

        for (Runnable task : this.taskList) {
            // 多线程执行任务
            this.execute(task);
        }

        // 返回执行结果
        return true;
    }

    /**
     * 执行指定的任务
     *
     * @param task 任务
     * @return boolean
     */
    private boolean execute(final Runnable task) {
        return processor.executeTask(task);
    }

}
package org.example.thread;

import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多线程锁执行器
 * 用于当前方法中复杂业务多线程处理,等待线程执行完毕后 获得统一结果
 * 2021年11月2日14:07:54 重构 多线程异步等待执行器
 *
 * @author 周鹏程
 * @date 2020-12-10 10:36
 */
@Slf4j
public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {

    /**
     * 线程池字典
     */
    private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap();

    /**
     * 线程Key
     */
    private final String key;
    /**
     * 任务队列
     */
    private final List<Callable<Object>> taskList;
    /**
     * 执行器
     */
    private final AsyncProcessor processor;
    /**
     * 任务执行计数器
     */
    private AtomicInteger count;


    /**
     * 构造函数
     */
    public AsyncProcessExecutorByWait() {
        this.key = "def";
        taskList = new ArrayList<>();
        processor = getProcessor(this.key);
    }

    /**
     * 构造函数
     *
     * @param key 线程池唯一Key
     */
    public AsyncProcessExecutorByWait(String key) {
        this.key = key;
        taskList = new ArrayList<>();
        processor = getProcessor(this.key);
    }

    /**
     * 获得执行器
     *
     * @param key Key
     * @return AsyncProcessor
     */
    private synchronized static AsyncProcessor getProcessor(String key) {
        AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
        if (null == asyncProcessor) {
            asyncProcessor = new AsyncProcessor();
            asyncProcessor.init(key);
            EXECUTOR_MAP.put(key, asyncProcessor);
        }
        return asyncProcessor;
    }

    /**
     * 放入执行任务
     * 特殊处理 Runnable 转换为 Callable
     *
     * @param task 任务
     */
    @Override
    public AsyncProcessExecutor put(final Runnable task) {
        taskList.add(Executors.callable(task));
        return this;
    }

    /**
     * 执行 线程锁 等待查询结果 结果完成后继续执行
     */
    @Override
    public boolean execute() {
        if (CollUtil.isEmpty(this.taskList)) {
            return true;
        }

        // 初始化锁参数
        count = new AtomicInteger(this.taskList.size());
        // 门闩 线程锁
        CountDownLatch latch = new CountDownLatch(this.taskList.size());

        for (Callable<Object> task : this.taskList) {
            // 回调减 门闩
            processor.executeTaskAndCallback(task, (result) -> {
                if (result.getSuccess()) {
                    count.decrementAndGet();
                }
                latch.countDown();
                return null;
            });
        }

        // 线程锁 等待查询结果 结果完成后继续执行
        try {
            latch.await();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            this.taskList.clear();
        }

        // 返回执行结果
        return count.get() == 0;
    }

}
package org.example.thread;

/**
 * 异步进程 执行器 工厂
 *
 * @date 2021年7月15日13:43:37
 */
public final class AsyncProcessExecutorFactory {

    private AsyncProcessExecutorFactory() {
    }

    /**
     * 创建等待执行器
     *
     * @return AsyncProcessExecutor
     */
    public static AsyncProcessExecutor createWaitExecutor() {
        return new AsyncProcessExecutorByWait();
    }

    /**
     * 创建等待执行器
     *
     * @param key KEY
     * @return AsyncProcessExecutor
     */
    public static AsyncProcessExecutor createWaitExecutor(String key) {
        return new AsyncProcessExecutorByWait(key);
    }

    /**
     * 创建正常执行器
     *
     * @return AsyncProcessExecutor
     */
    public static AsyncProcessExecutor createNormalExecutor() {
        return new AsyncProcessExecutorByNormal();
    }

    // =====================

    /**
     * 创建正常执行器
     *
     * @param key KEY
     * @return AsyncProcessExecutor
     */
    public static AsyncProcessExecutor createNormalExecutor(String key) {
        return new AsyncProcessExecutorByNormal(key);
    }

}
package org.example.thread;

import cn.hutool.core.util.StrUtil;
import com.google.common.util.concurrent.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
 * 自定义线程执行器 - 等待线程执行完毕不拒绝
 *
 * @author 周鹏程
 * @date 2020-10-08 10:24
 */
@Slf4j
public class AsyncProcessor {

    /**
     * 线程池名称格式
     */
    private static final String THREAD_POOL_NAME = "AsyncProcessorWaitPool-{}-%d";

    /**
     * 默认线程池关闭等待时间 秒
     */
    private static final int DEFAULT_WAIT_TIME = 10;

    /**
     * 线程池监听执行器
     */
    private ListeningExecutorService execute;

    /**
     * 初始化
     *
     * @param key 线程池标识
     */
    public void init(String key) {
        if (StringUtils.isBlank(key)) {
            return;
        }

        // 线程工厂名称
        String formatThreadPoolName = StrUtil.format(THREAD_POOL_NAME, key);

        // 创建 Executor
        // 此处默认最大值改为处理器数量的 4 倍
        try {
            // 监听执行器
            execute = MoreExecutors.listeningDecorator(
                    ThreadPoolFactory.createDefThreadPool(formatThreadPoolName));

            // 这里不会自动关闭线程, 当线程超过阈值时 抛异常
            // 关闭事件的挂钩
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                log.info("ProcessorWait 异步处理器关闭");

                execute.shutdown();

                try {
                    // 等待1秒执行关闭
                    if (!execute.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {
                        log.error("ProcessorWait 由于等待超时,异步处理器立即关闭");
                        execute.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    log.error("ProcessorWait 异步处理器关闭中断");
                    execute.shutdownNow();
                }

                log.info("ProcessorWait 异步处理器关闭完成");
            }));
        } catch (Exception e) {
            log.error("ProcessorWait 异步处理器初始化错误", e);
            throw new ExceptionInInitializerError(e);
        }
    }


    /**
     * 执行任务,不管是否成功<br>
     * 其实也就是包装以后的 {@link } 方法
     *
     * @param task 任务
     * @return boolean
     */
    public boolean executeTask(Runnable task) {
        try {
            execute.execute(task);
        } catch (RejectedExecutionException e) {
            log.error("AsyncProcessorWait 执行任务被拒绝", e);
            return false;
        }
        return true;
    }

    /**
     * 提交任务,并可以在稍后获取其执行情况<br>
     * 当提交失败时,会抛出 {@link }
     *
     * @param task 任务
     */
    public <T> void executeTaskAndCallback(Callable<T> task, Function<CallbackResult<T>, Void> callback) {
        ListenableFuture<T> future = execute.submit(task);
        Futures.addCallback(future, new FutureCallback<T>() {
            @Override
            public void onSuccess(T result) {
                CallbackResult<T> callbackResult = new CallbackResult<>();
                callbackResult.setSuccess(true);
                callbackResult.setResult(result);
                // 线程池失败后 返回该 Runnable
                callback.apply(callbackResult);
            }

            @Override
            public void onFailure(Throwable t) {
                log.error("线程名称:{} - 执行异常信息:{}", Thread.currentThread().getName(), t.getMessage());
                CallbackResult<T> callbackResult = new CallbackResult<>();
                callbackResult.setSuccess(false);
                callback.apply(callbackResult);
            }
        }, execute);
    }

    // =================

    /**
     * 回调结果
     *
     * @param <T>
     */
    @Data
    public static class CallbackResult<T> {

        /**
         * 状态
         */
        private Boolean success;

        /**
         * 结果
         */
        private T result;

    }
}
package org.example.thread;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
 * 单线程池
 *
 * @author 周鹏程
 * @date 2021/8/27 17:00
 */
@Slf4j
public final class SyncProcessSingleExecutor {

    private static final Map<String, ExecutorService> EXECUTOR_MAP = Maps.newConcurrentMap();

    private static final String KEY = "def";

    private SyncProcessSingleExecutor() {
    }

    /**
     * 执行器
     *
     * @param r 任务
     */
    public static synchronized void execute(Runnable r) {
        execute(KEY, r);
    }

    /**
     * 执行器
     *
     * @param key 唯一Key
     * @param r   任务
     */
    public static synchronized void execute(String key, Runnable r) {
        if (null == r) {
            return;
        }

        ExecutorService executorService = EXECUTOR_MAP.get(key);
        if (null == executorService) {
            executorService = ThreadUtil.newSingleExecutor();
            EXECUTOR_MAP.put(key, executorService);
        }

        executorService.execute(new TaskWrapper(r));
    }

    /**
     * Task 包装类<br>
     * 此类型的意义是记录可能会被 Executor 吃掉的异常<br>
     */
    private static class TaskWrapper implements Runnable {

        private final Runnable gift;

        public TaskWrapper(final Runnable target) {
            this.gift = target;
        }

        @Override
        public void run() {
            // 捕获异常,避免在 Executor 里面被吞掉了
            if (gift != null) {
                try {
                    gift.run();
                } catch (Exception e) {
                    String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
                    log.error(errMsg, e);
                }
            }
        }
    }
}
package org.example.thread;


import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 线程池工厂
 *
 * @date 2021/11/2 10:48
 */
public final class ThreadPoolFactory {

    /**
     * 默认最大并发数<br>
     */
    private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

    /**
     * 默认线程存活时间
     */
    private static final long DEFAULT_KEEP_ALIVE = 60L;

    /**
     * 默认队列大小
     */
    private static final int DEFAULT_SIZE = 1024;

    /**
     * 线程池名称格式
     */
    private static final String DEFAULT_THREAD_POOL_NAME = "ProcessPool-{}-%d";


    private ThreadPoolFactory() {
    }

    /**
     * 创建默认的线程池
     *
     * @return ThreadPoolExecutor
     */
    public static ThreadPoolExecutor createDefThreadPool() {
        return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
                TimeUnit.SECONDS, DEFAULT_SIZE, DEFAULT_THREAD_POOL_NAME, new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * 创建默认的线程池
     *
     * @param poolName 线程池名称
     * @return ThreadPoolExecutor
     */
    public static ThreadPoolExecutor createDefThreadPool(String poolName) {
        return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
                TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * 创建默认的线程池
     *
     * @param maxConcurrent 最大线程数
     * @param poolName      线程池名称
     * @return ThreadPoolExecutor
     */
    public static ThreadPoolExecutor createDefThreadPool(int maxConcurrent, String poolName) {
        return createInitThreadPool(maxConcurrent, maxConcurrent * 4, DEFAULT_KEEP_ALIVE,
                TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * 创建线程池
     *
     * @param coreConcurrent 核心线程数
     * @param maxConcurrent  最大线程数
     * @param keepAlive      线程存活时效
     * @param timeUnit       线程存活单位
     * @param queueSize      队列大小
     * @param poolName       线程池名称
     * @param handler        拒绝处理策略
     * @return ThreadPoolExecutor
     */
    public static ThreadPoolExecutor createInitThreadPool(final int coreConcurrent,
                                                          final int maxConcurrent,
                                                          final long keepAlive,
                                                          final TimeUnit timeUnit,
                                                          final int queueSize,
                                                          final String poolName,
                                                          final RejectedExecutionHandler handler
    ) {
        return new ThreadPoolExecutor(coreConcurrent, maxConcurrent, keepAlive, timeUnit,
                new LinkedBlockingDeque<>(queueSize),
                new ThreadFactoryBuilder().setNameFormat(poolName).build(),
                handler
        );
    }

}

 

转载请注明出处或者链接地址:https://www.qianduange.cn//article/1947.html
标签
评论
发布的文章

jQuery 下载与安装教程

2024-02-28 11:02:44

若依中jquey相关问题

2024-02-28 11:02:41

【JavaWeb】1.JQuery

2024-02-28 11:02:21

jQuery日历签到插件下载

2024-02-28 11:02:20

大家推荐的文章
会员中心 联系我 留言建议 回顶部
复制成功!