

| package org.example.thread; |
| |
| public class Test { |
| |
| public static void main(String[] args) { |
| AsyncProcessExecutor normalExecutor = AsyncProcessExecutorFactory.createNormalExecutor(); |
| normalExecutor.put(() -> { |
| System.out.println("123"); |
| test(); |
| }); |
| normalExecutor.execute(); |
| System.out.println("00000000000000000000000000000"); |
| } |
| |
| private static void test(){ |
| try { |
| Thread.sleep(1000); |
| System.out.println("test..............."); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
复制
线程池代码
| package org.example.thread; |
| |
| |
| |
| |
| |
| |
| public interface AsyncProcessExecutor { |
| |
| |
| |
| |
| |
| |
| |
| |
| AsyncProcessExecutor put(final Runnable task); |
| |
| |
| |
| |
| |
| |
| boolean execute(); |
| |
| } |
复制
| package org.example.thread; |
| |
| import cn.hutool.core.collection.CollUtil; |
| import com.google.common.collect.Maps; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| |
| |
| |
| |
| |
| |
| @Slf4j |
| public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor { |
| |
| |
| |
| |
| private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap(); |
| |
| |
| |
| |
| private final String key; |
| |
| |
| |
| |
| private final List<Runnable> taskList; |
| |
| |
| |
| |
| private final AsyncProcessor processor; |
| |
| |
| |
| |
| public AsyncProcessExecutorByNormal() { |
| this.key = "def"; |
| taskList = new ArrayList<>(); |
| processor = AsyncProcessExecutorByNormal.getProcessor(this.key); |
| } |
| |
| |
| |
| |
| |
| |
| public AsyncProcessExecutorByNormal(String key) { |
| this.key = key; |
| taskList = new ArrayList<>(); |
| processor = AsyncProcessExecutorByNormal.getProcessor(this.key); |
| } |
| |
| |
| |
| |
| |
| |
| |
| private synchronized static AsyncProcessor getProcessor(String key) { |
| AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key); |
| if (null == asyncProcessor) { |
| asyncProcessor = new AsyncProcessor(); |
| asyncProcessor.init(key); |
| EXECUTOR_MAP.put(key, asyncProcessor); |
| } |
| return asyncProcessor; |
| } |
| |
| |
| |
| |
| |
| |
| @Override |
| public AsyncProcessExecutorByNormal put(final Runnable task) { |
| taskList.add(task); |
| return this; |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| @Override |
| public boolean execute() { |
| if (CollUtil.isEmpty(this.taskList)) { |
| return true; |
| } |
| |
| for (Runnable task : this.taskList) { |
| |
| this.execute(task); |
| } |
| |
| |
| return true; |
| } |
| |
| |
| |
| |
| |
| |
| |
| private boolean execute(final Runnable task) { |
| return processor.executeTask(task); |
| } |
| |
| } |
复制
| package org.example.thread; |
| |
| import cn.hutool.core.collection.CollUtil; |
| import com.google.common.collect.Maps; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| @Slf4j |
| public class AsyncProcessExecutorByWait implements AsyncProcessExecutor { |
| |
| |
| |
| |
| private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap(); |
| |
| |
| |
| |
| private final String key; |
| |
| |
| |
| private final List<Callable<Object>> taskList; |
| |
| |
| |
| private final AsyncProcessor processor; |
| |
| |
| |
| private AtomicInteger count; |
| |
| |
| |
| |
| |
| public AsyncProcessExecutorByWait() { |
| this.key = "def"; |
| taskList = new ArrayList<>(); |
| processor = getProcessor(this.key); |
| } |
| |
| |
| |
| |
| |
| |
| public AsyncProcessExecutorByWait(String key) { |
| this.key = key; |
| taskList = new ArrayList<>(); |
| processor = getProcessor(this.key); |
| } |
| |
| |
| |
| |
| |
| |
| |
| private synchronized static AsyncProcessor getProcessor(String key) { |
| AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key); |
| if (null == asyncProcessor) { |
| asyncProcessor = new AsyncProcessor(); |
| asyncProcessor.init(key); |
| EXECUTOR_MAP.put(key, asyncProcessor); |
| } |
| return asyncProcessor; |
| } |
| |
| |
| |
| |
| |
| |
| |
| @Override |
| public AsyncProcessExecutor put(final Runnable task) { |
| taskList.add(Executors.callable(task)); |
| return this; |
| } |
| |
| |
| |
| |
| @Override |
| public boolean execute() { |
| if (CollUtil.isEmpty(this.taskList)) { |
| return true; |
| } |
| |
| |
| count = new AtomicInteger(this.taskList.size()); |
| |
| CountDownLatch latch = new CountDownLatch(this.taskList.size()); |
| |
| for (Callable<Object> task : this.taskList) { |
| |
| processor.executeTaskAndCallback(task, (result) -> { |
| if (result.getSuccess()) { |
| count.decrementAndGet(); |
| } |
| latch.countDown(); |
| return null; |
| }); |
| } |
| |
| |
| try { |
| latch.await(); |
| } catch (Exception e) { |
| log.error(e.getMessage(), e); |
| } finally { |
| this.taskList.clear(); |
| } |
| |
| |
| return count.get() == 0; |
| } |
| |
| } |
复制
| package org.example.thread; |
| |
| |
| |
| |
| |
| |
| public final class AsyncProcessExecutorFactory { |
| |
| private AsyncProcessExecutorFactory() { |
| } |
| |
| |
| |
| |
| |
| |
| public static AsyncProcessExecutor createWaitExecutor() { |
| return new AsyncProcessExecutorByWait(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| public static AsyncProcessExecutor createWaitExecutor(String key) { |
| return new AsyncProcessExecutorByWait(key); |
| } |
| |
| |
| |
| |
| |
| |
| public static AsyncProcessExecutor createNormalExecutor() { |
| return new AsyncProcessExecutorByNormal(); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| public static AsyncProcessExecutor createNormalExecutor(String key) { |
| return new AsyncProcessExecutorByNormal(key); |
| } |
| |
| } |
复制
| package org.example.thread; |
| |
| import cn.hutool.core.util.StrUtil; |
| import com.google.common.util.concurrent.*; |
| import lombok.Data; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.commons.lang3.StringUtils; |
| |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| |
| |
| |
| |
| |
| |
| |
| @Slf4j |
| public class AsyncProcessor { |
| |
| |
| |
| |
| private static final String THREAD_POOL_NAME = "AsyncProcessorWaitPool-{}-%d"; |
| |
| |
| |
| |
| private static final int DEFAULT_WAIT_TIME = 10; |
| |
| |
| |
| |
| private ListeningExecutorService execute; |
| |
| |
| |
| |
| |
| |
| public void init(String key) { |
| if (StringUtils.isBlank(key)) { |
| return; |
| } |
| |
| |
| String formatThreadPoolName = StrUtil.format(THREAD_POOL_NAME, key); |
| |
| |
| |
| try { |
| |
| execute = MoreExecutors.listeningDecorator( |
| ThreadPoolFactory.createDefThreadPool(formatThreadPoolName)); |
| |
| |
| |
| Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
| log.info("ProcessorWait 异步处理器关闭"); |
| |
| execute.shutdown(); |
| |
| try { |
| |
| if (!execute.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) { |
| log.error("ProcessorWait 由于等待超时,异步处理器立即关闭"); |
| execute.shutdownNow(); |
| } |
| } catch (InterruptedException e) { |
| log.error("ProcessorWait 异步处理器关闭中断"); |
| execute.shutdownNow(); |
| } |
| |
| log.info("ProcessorWait 异步处理器关闭完成"); |
| })); |
| } catch (Exception e) { |
| log.error("ProcessorWait 异步处理器初始化错误", e); |
| throw new ExceptionInInitializerError(e); |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| public boolean executeTask(Runnable task) { |
| try { |
| execute.execute(task); |
| } catch (RejectedExecutionException e) { |
| log.error("AsyncProcessorWait 执行任务被拒绝", e); |
| return false; |
| } |
| return true; |
| } |
| |
| |
| |
| |
| |
| |
| |
| public <T> void executeTaskAndCallback(Callable<T> task, Function<CallbackResult<T>, Void> callback) { |
| ListenableFuture<T> future = execute.submit(task); |
| Futures.addCallback(future, new FutureCallback<T>() { |
| @Override |
| public void onSuccess(T result) { |
| CallbackResult<T> callbackResult = new CallbackResult<>(); |
| callbackResult.setSuccess(true); |
| callbackResult.setResult(result); |
| |
| callback.apply(callbackResult); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| log.error("线程名称:{} - 执行异常信息:{}", Thread.currentThread().getName(), t.getMessage()); |
| CallbackResult<T> callbackResult = new CallbackResult<>(); |
| callbackResult.setSuccess(false); |
| callback.apply(callbackResult); |
| } |
| }, execute); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| @Data |
| public static class CallbackResult<T> { |
| |
| |
| |
| |
| private Boolean success; |
| |
| |
| |
| |
| private T result; |
| |
| } |
| } |
复制
| package org.example.thread; |
| |
| import cn.hutool.core.thread.ThreadUtil; |
| import cn.hutool.core.util.StrUtil; |
| import com.google.common.collect.Maps; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| |
| |
| |
| |
| |
| |
| |
| @Slf4j |
| public final class SyncProcessSingleExecutor { |
| |
| private static final Map<String, ExecutorService> EXECUTOR_MAP = Maps.newConcurrentMap(); |
| |
| private static final String KEY = "def"; |
| |
| private SyncProcessSingleExecutor() { |
| } |
| |
| |
| |
| |
| |
| |
| public static synchronized void execute(Runnable r) { |
| execute(KEY, r); |
| } |
| |
| |
| |
| |
| |
| |
| |
| public static synchronized void execute(String key, Runnable r) { |
| if (null == r) { |
| return; |
| } |
| |
| ExecutorService executorService = EXECUTOR_MAP.get(key); |
| if (null == executorService) { |
| executorService = ThreadUtil.newSingleExecutor(); |
| EXECUTOR_MAP.put(key, executorService); |
| } |
| |
| executorService.execute(new TaskWrapper(r)); |
| } |
| |
| |
| |
| |
| |
| private static class TaskWrapper implements Runnable { |
| |
| private final Runnable gift; |
| |
| public TaskWrapper(final Runnable target) { |
| this.gift = target; |
| } |
| |
| @Override |
| public void run() { |
| |
| if (gift != null) { |
| try { |
| gift.run(); |
| } catch (Exception e) { |
| String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage()); |
| log.error(errMsg, e); |
| } |
| } |
| } |
| } |
| } |
复制
| package org.example.thread; |
| |
| |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.RejectedExecutionHandler; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| |
| |
| |
| |
| |
| public final class ThreadPoolFactory { |
| |
| |
| |
| |
| private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2; |
| |
| |
| |
| |
| private static final long DEFAULT_KEEP_ALIVE = 60L; |
| |
| |
| |
| |
| private static final int DEFAULT_SIZE = 1024; |
| |
| |
| |
| |
| private static final String DEFAULT_THREAD_POOL_NAME = "ProcessPool-{}-%d"; |
| |
| |
| private ThreadPoolFactory() { |
| } |
| |
| |
| |
| |
| |
| |
| public static ThreadPoolExecutor createDefThreadPool() { |
| return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, |
| TimeUnit.SECONDS, DEFAULT_SIZE, DEFAULT_THREAD_POOL_NAME, new ThreadPoolExecutor.CallerRunsPolicy()); |
| } |
| |
| |
| |
| |
| |
| |
| |
| public static ThreadPoolExecutor createDefThreadPool(String poolName) { |
| return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, |
| TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy()); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| public static ThreadPoolExecutor createDefThreadPool(int maxConcurrent, String poolName) { |
| return createInitThreadPool(maxConcurrent, maxConcurrent * 4, DEFAULT_KEEP_ALIVE, |
| TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy()); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| public static ThreadPoolExecutor createInitThreadPool(final int coreConcurrent, |
| final int maxConcurrent, |
| final long keepAlive, |
| final TimeUnit timeUnit, |
| final int queueSize, |
| final String poolName, |
| final RejectedExecutionHandler handler |
| ) { |
| return new ThreadPoolExecutor(coreConcurrent, maxConcurrent, keepAlive, timeUnit, |
| new LinkedBlockingDeque<>(queueSize), |
| new ThreadFactoryBuilder().setNameFormat(poolName).build(), |
| handler |
| ); |
| } |
| |
| } |
复制