package org.example.thread;
public class Test {
public static void main(String[] args) {
AsyncProcessExecutor normalExecutor = AsyncProcessExecutorFactory.createNormalExecutor();
normalExecutor.put(() -> {
System.out.println("123");
test();
});
normalExecutor.execute();
System.out.println("00000000000000000000000000000");
}
private static void test(){
try {
Thread.sleep(1000);
System.out.println("test...............");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程池代码
package org.example.thread;
/**
* 异步进程 执行器
*
* @date 2021年7月15日13:43:37
*/
public interface AsyncProcessExecutor {
/**
* 存放任务
*
* @param task 任务
* @return AsyncProcessExecutor
*/
AsyncProcessExecutor put(final Runnable task);
/**
* 执行
*
* @return boolean
*/
boolean execute();
}
package org.example.thread;
import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 多线程锁执行器 正常处理
*
* @date 2020-12-10 10:36
*/
@Slf4j
public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor {
/**
* 线程池字典
*/
private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap();
/**
* 线程Key
*/
private final String key;
/**
* 任务队列
*/
private final List<Runnable> taskList;
/**
* 执行器
*/
private final AsyncProcessor processor;
/**
* 构造函数
*/
public AsyncProcessExecutorByNormal() {
this.key = "def";
taskList = new ArrayList<>();
processor = AsyncProcessExecutorByNormal.getProcessor(this.key);
}
/**
* 构造函数
*
* @param key 线程池唯一Key
*/
public AsyncProcessExecutorByNormal(String key) {
this.key = key;
taskList = new ArrayList<>();
processor = AsyncProcessExecutorByNormal.getProcessor(this.key);
}
/**
* 获得执行器
*
* @param key Key
* @return AsyncProcessor
*/
private synchronized static AsyncProcessor getProcessor(String key) {
AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
if (null == asyncProcessor) {
asyncProcessor = new AsyncProcessor();
asyncProcessor.init(key);
EXECUTOR_MAP.put(key, asyncProcessor);
}
return asyncProcessor;
}
/**
* 执行
*
* @param task 任务
*/
@Override
public AsyncProcessExecutorByNormal put(final Runnable task) {
taskList.add(task);
return this;
}
// ====================================
/**
* 执行 线程锁 等待查询结果 结果完成后继续执行
*
* @return boolean 最终直接结果
*/
@Override
public boolean execute() {
if (CollUtil.isEmpty(this.taskList)) {
return true;
}
for (Runnable task : this.taskList) {
// 多线程执行任务
this.execute(task);
}
// 返回执行结果
return true;
}
/**
* 执行指定的任务
*
* @param task 任务
* @return boolean
*/
private boolean execute(final Runnable task) {
return processor.executeTask(task);
}
}
package org.example.thread;
import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多线程锁执行器
* 用于当前方法中复杂业务多线程处理,等待线程执行完毕后 获得统一结果
* 2021年11月2日14:07:54 重构 多线程异步等待执行器
*
* @author 周鹏程
* @date 2020-12-10 10:36
*/
@Slf4j
public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {
/**
* 线程池字典
*/
private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap();
/**
* 线程Key
*/
private final String key;
/**
* 任务队列
*/
private final List<Callable<Object>> taskList;
/**
* 执行器
*/
private final AsyncProcessor processor;
/**
* 任务执行计数器
*/
private AtomicInteger count;
/**
* 构造函数
*/
public AsyncProcessExecutorByWait() {
this.key = "def";
taskList = new ArrayList<>();
processor = getProcessor(this.key);
}
/**
* 构造函数
*
* @param key 线程池唯一Key
*/
public AsyncProcessExecutorByWait(String key) {
this.key = key;
taskList = new ArrayList<>();
processor = getProcessor(this.key);
}
/**
* 获得执行器
*
* @param key Key
* @return AsyncProcessor
*/
private synchronized static AsyncProcessor getProcessor(String key) {
AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
if (null == asyncProcessor) {
asyncProcessor = new AsyncProcessor();
asyncProcessor.init(key);
EXECUTOR_MAP.put(key, asyncProcessor);
}
return asyncProcessor;
}
/**
* 放入执行任务
* 特殊处理 Runnable 转换为 Callable
*
* @param task 任务
*/
@Override
public AsyncProcessExecutor put(final Runnable task) {
taskList.add(Executors.callable(task));
return this;
}
/**
* 执行 线程锁 等待查询结果 结果完成后继续执行
*/
@Override
public boolean execute() {
if (CollUtil.isEmpty(this.taskList)) {
return true;
}
// 初始化锁参数
count = new AtomicInteger(this.taskList.size());
// 门闩 线程锁
CountDownLatch latch = new CountDownLatch(this.taskList.size());
for (Callable<Object> task : this.taskList) {
// 回调减 门闩
processor.executeTaskAndCallback(task, (result) -> {
if (result.getSuccess()) {
count.decrementAndGet();
}
latch.countDown();
return null;
});
}
// 线程锁 等待查询结果 结果完成后继续执行
try {
latch.await();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
this.taskList.clear();
}
// 返回执行结果
return count.get() == 0;
}
}
package org.example.thread;
/**
* 异步进程 执行器 工厂
*
* @date 2021年7月15日13:43:37
*/
public final class AsyncProcessExecutorFactory {
private AsyncProcessExecutorFactory() {
}
/**
* 创建等待执行器
*
* @return AsyncProcessExecutor
*/
public static AsyncProcessExecutor createWaitExecutor() {
return new AsyncProcessExecutorByWait();
}
/**
* 创建等待执行器
*
* @param key KEY
* @return AsyncProcessExecutor
*/
public static AsyncProcessExecutor createWaitExecutor(String key) {
return new AsyncProcessExecutorByWait(key);
}
/**
* 创建正常执行器
*
* @return AsyncProcessExecutor
*/
public static AsyncProcessExecutor createNormalExecutor() {
return new AsyncProcessExecutorByNormal();
}
// =====================
/**
* 创建正常执行器
*
* @param key KEY
* @return AsyncProcessExecutor
*/
public static AsyncProcessExecutor createNormalExecutor(String key) {
return new AsyncProcessExecutorByNormal(key);
}
}
package org.example.thread;
import cn.hutool.core.util.StrUtil;
import com.google.common.util.concurrent.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* 自定义线程执行器 - 等待线程执行完毕不拒绝
*
* @author 周鹏程
* @date 2020-10-08 10:24
*/
@Slf4j
public class AsyncProcessor {
/**
* 线程池名称格式
*/
private static final String THREAD_POOL_NAME = "AsyncProcessorWaitPool-{}-%d";
/**
* 默认线程池关闭等待时间 秒
*/
private static final int DEFAULT_WAIT_TIME = 10;
/**
* 线程池监听执行器
*/
private ListeningExecutorService execute;
/**
* 初始化
*
* @param key 线程池标识
*/
public void init(String key) {
if (StringUtils.isBlank(key)) {
return;
}
// 线程工厂名称
String formatThreadPoolName = StrUtil.format(THREAD_POOL_NAME, key);
// 创建 Executor
// 此处默认最大值改为处理器数量的 4 倍
try {
// 监听执行器
execute = MoreExecutors.listeningDecorator(
ThreadPoolFactory.createDefThreadPool(formatThreadPoolName));
// 这里不会自动关闭线程, 当线程超过阈值时 抛异常
// 关闭事件的挂钩
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("ProcessorWait 异步处理器关闭");
execute.shutdown();
try {
// 等待1秒执行关闭
if (!execute.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {
log.error("ProcessorWait 由于等待超时,异步处理器立即关闭");
execute.shutdownNow();
}
} catch (InterruptedException e) {
log.error("ProcessorWait 异步处理器关闭中断");
execute.shutdownNow();
}
log.info("ProcessorWait 异步处理器关闭完成");
}));
} catch (Exception e) {
log.error("ProcessorWait 异步处理器初始化错误", e);
throw new ExceptionInInitializerError(e);
}
}
/**
* 执行任务,不管是否成功<br>
* 其实也就是包装以后的 {@link } 方法
*
* @param task 任务
* @return boolean
*/
public boolean executeTask(Runnable task) {
try {
execute.execute(task);
} catch (RejectedExecutionException e) {
log.error("AsyncProcessorWait 执行任务被拒绝", e);
return false;
}
return true;
}
/**
* 提交任务,并可以在稍后获取其执行情况<br>
* 当提交失败时,会抛出 {@link }
*
* @param task 任务
*/
public <T> void executeTaskAndCallback(Callable<T> task, Function<CallbackResult<T>, Void> callback) {
ListenableFuture<T> future = execute.submit(task);
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
CallbackResult<T> callbackResult = new CallbackResult<>();
callbackResult.setSuccess(true);
callbackResult.setResult(result);
// 线程池失败后 返回该 Runnable
callback.apply(callbackResult);
}
@Override
public void onFailure(Throwable t) {
log.error("线程名称:{} - 执行异常信息:{}", Thread.currentThread().getName(), t.getMessage());
CallbackResult<T> callbackResult = new CallbackResult<>();
callbackResult.setSuccess(false);
callback.apply(callbackResult);
}
}, execute);
}
// =================
/**
* 回调结果
*
* @param <T>
*/
@Data
public static class CallbackResult<T> {
/**
* 状态
*/
private Boolean success;
/**
* 结果
*/
private T result;
}
}
package org.example.thread;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* 单线程池
*
* @author 周鹏程
* @date 2021/8/27 17:00
*/
@Slf4j
public final class SyncProcessSingleExecutor {
private static final Map<String, ExecutorService> EXECUTOR_MAP = Maps.newConcurrentMap();
private static final String KEY = "def";
private SyncProcessSingleExecutor() {
}
/**
* 执行器
*
* @param r 任务
*/
public static synchronized void execute(Runnable r) {
execute(KEY, r);
}
/**
* 执行器
*
* @param key 唯一Key
* @param r 任务
*/
public static synchronized void execute(String key, Runnable r) {
if (null == r) {
return;
}
ExecutorService executorService = EXECUTOR_MAP.get(key);
if (null == executorService) {
executorService = ThreadUtil.newSingleExecutor();
EXECUTOR_MAP.put(key, executorService);
}
executorService.execute(new TaskWrapper(r));
}
/**
* Task 包装类<br>
* 此类型的意义是记录可能会被 Executor 吃掉的异常<br>
*/
private static class TaskWrapper implements Runnable {
private final Runnable gift;
public TaskWrapper(final Runnable target) {
this.gift = target;
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift != null) {
try {
gift.run();
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
}
}
}
}
}
package org.example.thread;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池工厂
*
* @date 2021/11/2 10:48
*/
public final class ThreadPoolFactory {
/**
* 默认最大并发数<br>
*/
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/**
* 默认线程存活时间
*/
private static final long DEFAULT_KEEP_ALIVE = 60L;
/**
* 默认队列大小
*/
private static final int DEFAULT_SIZE = 1024;
/**
* 线程池名称格式
*/
private static final String DEFAULT_THREAD_POOL_NAME = "ProcessPool-{}-%d";
private ThreadPoolFactory() {
}
/**
* 创建默认的线程池
*
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createDefThreadPool() {
return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, DEFAULT_SIZE, DEFAULT_THREAD_POOL_NAME, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 创建默认的线程池
*
* @param poolName 线程池名称
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createDefThreadPool(String poolName) {
return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 创建默认的线程池
*
* @param maxConcurrent 最大线程数
* @param poolName 线程池名称
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createDefThreadPool(int maxConcurrent, String poolName) {
return createInitThreadPool(maxConcurrent, maxConcurrent * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 创建线程池
*
* @param coreConcurrent 核心线程数
* @param maxConcurrent 最大线程数
* @param keepAlive 线程存活时效
* @param timeUnit 线程存活单位
* @param queueSize 队列大小
* @param poolName 线程池名称
* @param handler 拒绝处理策略
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createInitThreadPool(final int coreConcurrent,
final int maxConcurrent,
final long keepAlive,
final TimeUnit timeUnit,
final int queueSize,
final String poolName,
final RejectedExecutionHandler handler
) {
return new ThreadPoolExecutor(coreConcurrent, maxConcurrent, keepAlive, timeUnit,
new LinkedBlockingDeque<>(queueSize),
new ThreadFactoryBuilder().setNameFormat(poolName).build(),
handler
);
}
}