diff --git a/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java b/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java index 75c510d74..021d0b4fc 100644 --- a/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java +++ b/adapter/adapter-common/src/main/java/org/dromara/dynamictp/adapter/common/AbstractDtpAdapter.java @@ -50,11 +50,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import static java.util.stream.Collectors.toList; import static org.dromara.dynamictp.common.constant.DynamicTpConst.PROPERTIES_CHANGE_SHOW_STYLE; import static org.dromara.dynamictp.core.notifier.manager.NotifyHelper.updateNotifyInfo; +import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; /** * AbstractDtpAdapter related @@ -162,12 +165,21 @@ protected void enhanceOriginExecutor(String tpName, ThreadPoolExecutor executor, ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy(executor); try { ReflectionUtil.setFieldValue(fieldName, targetObj, proxy); - executors.put(tpName, new ExecutorWrapper(tpName, proxy)); + putAndFinalize(tpName, executor, proxy); } catch (IllegalAccessException e) { log.error("DynamicTp adapter, enhance {} failed.", tpName, e); } } + protected void putAndFinalize(String tpName, ExecutorService origin, Executor targetForWrapper) { + executors.put(tpName, new ExecutorWrapper(tpName, targetForWrapper)); + shutdownOriginalExecutor(origin); + } + + protected void shutdownOriginalExecutor(ExecutorService executor) { + shutdownGracefulAsync(executor, getTpPrefix(), 5); + } + protected void doRefresh(ExecutorWrapper executorWrapper, List platforms, TpExecutorProps props) { diff --git a/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/alibaba/AlibabaDubboDtpAdapter.java b/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/alibaba/AlibabaDubboDtpAdapter.java index 02c5dc5af..871a2e49d 100644 --- a/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/alibaba/AlibabaDubboDtpAdapter.java +++ b/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/alibaba/AlibabaDubboDtpAdapter.java @@ -24,7 +24,6 @@ import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter; import org.dromara.dynamictp.common.properties.DtpProperties; import org.dromara.dynamictp.common.spring.ApplicationContextHolder; -import org.dromara.dynamictp.core.support.ExecutorWrapper; import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy; import org.springframework.beans.factory.InitializingBean; @@ -58,7 +57,7 @@ public void afterPropertiesSet() throws Exception { while (!registered.get()) { try { Thread.sleep(1000); - final DtpProperties dtpProperties = ApplicationContextHolder.getBean(DtpProperties.class); + DtpProperties dtpProperties = ApplicationContextHolder.getBean(DtpProperties.class); this.initialize(); this.refresh(dtpProperties); } catch (Throwable e) { } @@ -79,11 +78,9 @@ protected void initialize() { Map executorMap = dataStore.get(EXECUTOR_SERVICE_COMPONENT_KEY); if (MapUtils.isNotEmpty(executorMap) && registered.compareAndSet(false, true)) { executorMap.forEach((k, v) -> { - val name = genTpName(k); - ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) v); + val proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) v); executorMap.replace(k, proxy); - val executorWrapper = new ExecutorWrapper(name, proxy); - executors.put(name, executorWrapper); + putAndFinalize(genTpName(k), (ExecutorService) v, proxy); }); } } diff --git a/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/ApacheDubboDtpAdapter.java b/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/ApacheDubboDtpAdapter.java index 8d2a0affd..1fe01ab8b 100644 --- a/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/ApacheDubboDtpAdapter.java +++ b/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/ApacheDubboDtpAdapter.java @@ -32,7 +32,6 @@ import org.dromara.dynamictp.common.properties.DtpProperties; import org.dromara.dynamictp.common.spring.ApplicationContextHolder; import org.dromara.dynamictp.common.util.ReflectionUtil; -import org.dromara.dynamictp.core.support.ExecutorWrapper; import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy; import org.springframework.context.ApplicationEvent; @@ -95,8 +94,7 @@ protected void initialize() { executorMap.forEach((k, v) -> { ThreadPoolExecutor proxy = getProxy((ThreadPoolExecutor) v); executorMap.replace(k, proxy); - String tpName = genTpName(k); - executors.put(tpName, new ExecutorWrapper(tpName, proxy)); + putAndFinalize(genTpName(k), (ExecutorService) v, proxy); }); } return; @@ -122,8 +120,7 @@ protected void initialize() { executorMap.forEach((k, v) -> { ThreadPoolExecutor proxy = getProxy(v); executorMap.replace(k, proxy); - String tpName = genTpName(k.toString()); - executors.put(tpName, new ExecutorWrapper(tpName, proxy)); + putAndFinalize(genTpName(k.toString()), (ExecutorService) v, proxy); }); } } diff --git a/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/EagerThreadPoolExecutorProxy.java b/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/EagerThreadPoolExecutorProxy.java index 3d49fe85c..407d616a3 100644 --- a/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/EagerThreadPoolExecutorProxy.java +++ b/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/EagerThreadPoolExecutorProxy.java @@ -28,8 +28,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; - /** * EagerThreadPoolExecutorProxy related * @@ -56,7 +54,6 @@ public EagerThreadPoolExecutorProxy(EagerThreadPoolExecutor executor) { this.rejectHandlerType = getRejectedExecutionHandler().getClass().getSimpleName(); setRejectedExecutionHandler(RejectHandlerGetter.getProxy(getRejectedExecutionHandler())); ((TaskQueue) getQueue()).setExecutor(this); - shutdownGracefulAsync(executor, "dubbo", 5); } @Override diff --git a/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/MotanDtpAdapter.java b/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/MotanDtpAdapter.java index eec1ed5c1..2f66ded94 100644 --- a/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/MotanDtpAdapter.java +++ b/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/MotanDtpAdapter.java @@ -31,7 +31,6 @@ import org.dromara.dynamictp.common.properties.DtpProperties; import org.dromara.dynamictp.common.spring.ApplicationContextHolder; import org.dromara.dynamictp.common.util.ReflectionUtil; -import org.dromara.dynamictp.core.support.ExecutorWrapper; import java.util.List; import java.util.Objects; @@ -92,7 +91,7 @@ protected void initialize() { String tpName = TP_PREFIX + "#" + nettyServer.getUrl().getPort(); try { ReflectionUtil.setFieldValue(EXECUTOR_FIELD, nettyServer, proxy); - executors.put(tpName, new ExecutorWrapper(tpName, proxy)); + putAndFinalize(tpName, executor, proxy); } catch (IllegalAccessException ex) { log.error("DynamicTp adapter, enhance {} failed.", tpName, ex); } diff --git a/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/StandardThreadExecutorProxy.java b/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/StandardThreadExecutorProxy.java index 1fb8cbc99..1f852d057 100644 --- a/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/StandardThreadExecutorProxy.java +++ b/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/StandardThreadExecutorProxy.java @@ -29,8 +29,6 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.TimeUnit; -import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; - /** * @author hanli * @since 1.1.4 @@ -61,7 +59,6 @@ public StandardThreadExecutorProxy(StandardThreadExecutor executor) { throw new RuntimeException(e); } } - shutdownGracefulAsync(executor, "motan", 5); } @Override diff --git a/adapter/adapter-rocketmq/src/main/java/org/dromara/dynamictp/adapter/rocketmq/RocketMqDtpAdapter.java b/adapter/adapter-rocketmq/src/main/java/org/dromara/dynamictp/adapter/rocketmq/RocketMqDtpAdapter.java index eb5943acc..aa1a6df8a 100644 --- a/adapter/adapter-rocketmq/src/main/java/org/dromara/dynamictp/adapter/rocketmq/RocketMqDtpAdapter.java +++ b/adapter/adapter-rocketmq/src/main/java/org/dromara/dynamictp/adapter/rocketmq/RocketMqDtpAdapter.java @@ -112,6 +112,8 @@ public void adaptProducerExecutors() { if (Objects.nonNull(executor)) { ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy(executor); producer.setAsyncSenderExecutor(proxy); + shutdownOriginalExecutor(executor); + String proKey = TP_PREFIX + "#producer#" + defaultMQProducer.getProducerGroup(); executors.put(proKey, new ExecutorWrapper(proKey, proxy)); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java index 092f4a136..ac2184d98 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java @@ -25,10 +25,10 @@ import org.dromara.dynamictp.core.DtpRegistry; import org.dromara.dynamictp.core.executor.DtpExecutor; import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor; +import org.dromara.dynamictp.core.executor.eager.TaskQueue; import org.dromara.dynamictp.core.plugin.DtpInterceptorRegistry; import org.dromara.dynamictp.core.support.DynamicTp; import org.dromara.dynamictp.core.support.ExecutorWrapper; -import org.dromara.dynamictp.core.executor.eager.TaskQueue; import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -50,6 +50,8 @@ import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; +import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; + /** * BeanPostProcessor that handles all related beans managed by Spring. * @@ -121,14 +123,14 @@ private Object registerAndReturnCommon(Object bean, String beanName) { private Object doRegisterAndReturnCommon(Object bean, String poolName) { if (bean instanceof ThreadPoolTaskExecutor) { - val proxy = new ThreadPoolExecutorProxy(((ThreadPoolTaskExecutor) bean).getThreadPoolExecutor()); + val proxy = newProxy(poolName, ((ThreadPoolTaskExecutor) bean).getThreadPoolExecutor()); try { ReflectionUtil.setFieldValue("threadPoolExecutor", bean, proxy); } catch (IllegalAccessException ignored) { } DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE); return bean; } - val proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) bean); + val proxy = newProxy(poolName, (ThreadPoolExecutor) bean); DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE); return proxy; } @@ -142,4 +144,10 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException { public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } + + private ThreadPoolExecutorProxy newProxy(String name, ThreadPoolExecutor originExecutor) { + val proxy = new ThreadPoolExecutorProxy(originExecutor); + shutdownGracefulAsync(originExecutor, name, 0); + return proxy; + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolExecutorProxy.java b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolExecutorProxy.java index b098fc2b7..d265e9ecf 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolExecutorProxy.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolExecutorProxy.java @@ -28,8 +28,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; - /** * ThreadPoolExecutor Proxy * @@ -56,7 +54,6 @@ public ThreadPoolExecutorProxy(ThreadPoolExecutor executor) { executor.getThreadFactory(), executor.getRejectedExecutionHandler()); this.rejectHandlerType = getRejectedExecutionHandler().getClass().getSimpleName(); setRejectedExecutionHandler(RejectHandlerGetter.getProxy(getRejectedExecutionHandler())); - shutdownGracefulAsync(executor, "", 5); } @Override @@ -72,7 +69,6 @@ protected void beforeExecute(Thread t, Runnable r) { AwareManager.beforeExecute(this, t, r); } - @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); diff --git a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/AbstractWebServerDtpAdapter.java b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/AbstractWebServerDtpAdapter.java index c9d42fcf9..3623759ef 100644 --- a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/AbstractWebServerDtpAdapter.java +++ b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/AbstractWebServerDtpAdapter.java @@ -22,7 +22,6 @@ import org.dromara.dynamictp.common.properties.DtpProperties; import org.dromara.dynamictp.common.spring.ApplicationContextHolder; import org.dromara.dynamictp.core.converter.ExecutorConverter; -import org.dromara.dynamictp.core.support.ExecutorWrapper; import org.springframework.boot.web.context.WebServerApplicationContext; import org.springframework.boot.web.context.WebServerInitializedEvent; import org.springframework.boot.web.server.WebServer; @@ -63,10 +62,9 @@ protected void initialize() { if (executors.get(getTpName()) == null) { ApplicationContext applicationContext = ApplicationContextHolder.getInstance(); WebServer webServer = ((WebServerApplicationContext) applicationContext).getWebServer(); - ExecutorWrapper wrapper = enhanceAndGetExecutorWrapper(webServer); - executors.put(getTpName(), wrapper); + doEnhance(webServer); log.info("DynamicTp adapter, web server {} executor init end, executor: {}", - getTpName(), ExecutorConverter.toMainFields(wrapper)); + getTpName(), ExecutorConverter.toMainFields(executors.get(getTpName()))); } } @@ -75,10 +73,9 @@ protected String getTpName() { } /** - * Enhance and get thread pool executor wrapper. + * Do enhance. * * @param webServer webServer - * @return the Executor instance */ - protected abstract ExecutorWrapper enhanceAndGetExecutorWrapper(WebServer webServer); + protected abstract void doEnhance(WebServer webServer); } diff --git a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/jetty/JettyDtpAdapter.java b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/jetty/JettyDtpAdapter.java index 9d952ffa7..22f249dbb 100644 --- a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/jetty/JettyDtpAdapter.java +++ b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/jetty/JettyDtpAdapter.java @@ -62,13 +62,13 @@ public class JettyDtpAdapter extends AbstractWebServerDtpAdapter { private static final String TP_PREFIX = "tomcatTp"; @Override - public ExecutorWrapper enhanceAndGetExecutorWrapper(WebServer webServer) { + public void doEnhance(WebServer webServer) { TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer; Executor originExecutor = tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor(); TomcatExecutorProxy proxy = new TomcatExecutorProxy((ThreadPoolExecutor) originExecutor); tomcatWebServer.getTomcat().getConnector().getProtocolHandler().setExecutor(proxy); - return new ExecutorWrapper(getTpName(), new TomcatExecutorAdapter(proxy)); + putAndFinalize(getTpName(), (ExecutorService) originExecutor, new TomcatExecutorAdapter(proxy)); } @Override diff --git a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/tomcat/TomcatExecutorProxy.java b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/tomcat/TomcatExecutorProxy.java index 54802e93a..e3bf26b4d 100644 --- a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/tomcat/TomcatExecutorProxy.java +++ b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/tomcat/TomcatExecutorProxy.java @@ -31,8 +31,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; - /** * Tomcat ThreadPool Proxy * @@ -77,7 +75,6 @@ public TomcatExecutorProxy(ThreadPoolExecutor executor) { if (executor.getQueue() instanceof TaskQueue) { ((TaskQueue) executor.getQueue()).setParent(this); } - shutdownGracefulAsync(executor, "tomcat", 5); } @Override diff --git a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/undertow/UndertowDtpAdapter.java b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/undertow/UndertowDtpAdapter.java index 0bf0daa23..973e1c689 100644 --- a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/undertow/UndertowDtpAdapter.java +++ b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/undertow/UndertowDtpAdapter.java @@ -34,8 +34,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; - /** * UndertowDtpAdapter related * @@ -56,17 +54,17 @@ public UndertowDtpAdapter() { } @Override - public ExecutorWrapper enhanceAndGetExecutorWrapper(WebServer webServer) { + public void doEnhance(WebServer webServer) { val undertowServletWebServer = (UndertowServletWebServer) webServer; val undertow = (Undertow) ReflectionUtil.getFieldValue(UndertowServletWebServer.class, "undertow", undertowServletWebServer); if (Objects.isNull(undertow)) { - return null; + return; } XnioWorker xnioWorker = undertow.getWorker(); Object taskPool = ReflectionUtil.getFieldValue(XnioWorker.class, "taskPool", xnioWorker); if (Objects.isNull(taskPool)) { - return null; + return; } val handler = TaskPoolHandlerFactory.getTaskPoolHandler(taskPool.getClass().getSimpleName()); String internalExecutor = handler.taskPoolType().getInternalExecutor(); @@ -74,22 +72,18 @@ public ExecutorWrapper enhanceAndGetExecutorWrapper(WebServer webServer) { String tpName = getTpName(); if (executor instanceof ThreadPoolExecutor) { enhanceOriginExecutor(tpName, (ThreadPoolExecutor) executor, internalExecutor, taskPool); - return executors.get(getTpName()); } else if (executor instanceof EnhancedQueueExecutor) { try { val proxy = new EnhancedQueueExecutorProxy((EnhancedQueueExecutor) executor); ReflectionUtil.setFieldValue(internalExecutor, taskPool, proxy); - val executorWrapper = new ExecutorWrapper(tpName, new EnhancedQueueExecutorAdapter(proxy)); - executors.put(tpName, executorWrapper); - shutdownGracefulAsync((ExecutorService) executor, "undertow", 5); - return executorWrapper; + putAndFinalize(tpName, (ExecutorService) executor, new EnhancedQueueExecutorAdapter(proxy)); } catch (Throwable t) { log.error("DynamicTp adapter, enhance {} failed, please adjust the order of the two dependencies" + "(starter-undertow and starter-adapter-webserver) and try again.", tpName, t); - return new ExecutorWrapper(tpName, handler.adapt(executor)); + executors.put(tpName, new ExecutorWrapper(tpName, handler.adapt(executor))); } } else { - return new ExecutorWrapper(tpName, handler.adapt(executor)); + executors.put(tpName, new ExecutorWrapper(tpName, handler.adapt(executor))); } }