Skip to content

Commit

Permalink
Merge pull request #344 from dromara/dev
Browse files Browse the repository at this point in the history
Optimized shutdown original thread pool after with proxy enhancement
  • Loading branch information
yanhom1314 authored Oct 22, 2023
2 parents 29a71fb + b558421 commit fe1b94f
Show file tree
Hide file tree
Showing 14 changed files with 49 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static java.util.stream.Collectors.toList;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PROPERTIES_CHANGE_SHOW_STYLE;
import static org.dromara.dynamictp.core.notifier.manager.NotifyHelper.updateNotifyInfo;
import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync;

/**
* AbstractDtpAdapter related
Expand Down Expand Up @@ -162,12 +165,21 @@ protected void enhanceOriginExecutor(String tpName, ThreadPoolExecutor executor,
ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy(executor);
try {
ReflectionUtil.setFieldValue(fieldName, targetObj, proxy);
executors.put(tpName, new ExecutorWrapper(tpName, proxy));
putAndFinalize(tpName, executor, proxy);
} catch (IllegalAccessException e) {
log.error("DynamicTp adapter, enhance {} failed.", tpName, e);
}
}

protected void putAndFinalize(String tpName, ExecutorService origin, Executor targetForWrapper) {
executors.put(tpName, new ExecutorWrapper(tpName, targetForWrapper));
shutdownOriginalExecutor(origin);
}

protected void shutdownOriginalExecutor(ExecutorService executor) {
shutdownGracefulAsync(executor, getTpPrefix(), 5);
}

protected void doRefresh(ExecutorWrapper executorWrapper,
List<NotifyPlatform> platforms,
TpExecutorProps props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy;
import org.springframework.beans.factory.InitializingBean;

Expand Down Expand Up @@ -58,7 +57,7 @@ public void afterPropertiesSet() throws Exception {
while (!registered.get()) {
try {
Thread.sleep(1000);
final DtpProperties dtpProperties = ApplicationContextHolder.getBean(DtpProperties.class);
DtpProperties dtpProperties = ApplicationContextHolder.getBean(DtpProperties.class);
this.initialize();
this.refresh(dtpProperties);
} catch (Throwable e) { }
Expand All @@ -79,11 +78,9 @@ protected void initialize() {
Map<String, Object> executorMap = dataStore.get(EXECUTOR_SERVICE_COMPONENT_KEY);
if (MapUtils.isNotEmpty(executorMap) && registered.compareAndSet(false, true)) {
executorMap.forEach((k, v) -> {
val name = genTpName(k);
ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) v);
val proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) v);
executorMap.replace(k, proxy);
val executorWrapper = new ExecutorWrapper(name, proxy);
executors.put(name, executorWrapper);
putAndFinalize(genTpName(k), (ExecutorService) v, proxy);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy;
import org.springframework.context.ApplicationEvent;

Expand Down Expand Up @@ -95,8 +94,7 @@ protected void initialize() {
executorMap.forEach((k, v) -> {
ThreadPoolExecutor proxy = getProxy((ThreadPoolExecutor) v);
executorMap.replace(k, proxy);
String tpName = genTpName(k);
executors.put(tpName, new ExecutorWrapper(tpName, proxy));
putAndFinalize(genTpName(k), (ExecutorService) v, proxy);
});
}
return;
Expand All @@ -122,8 +120,7 @@ protected void initialize() {
executorMap.forEach((k, v) -> {
ThreadPoolExecutor proxy = getProxy(v);
executorMap.replace(k, proxy);
String tpName = genTpName(k.toString());
executors.put(tpName, new ExecutorWrapper(tpName, proxy));
putAndFinalize(genTpName(k.toString()), (ExecutorService) v, proxy);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync;

/**
* EagerThreadPoolExecutorProxy related
*
Expand All @@ -56,7 +54,6 @@ public EagerThreadPoolExecutorProxy(EagerThreadPoolExecutor executor) {
this.rejectHandlerType = getRejectedExecutionHandler().getClass().getSimpleName();
setRejectedExecutionHandler(RejectHandlerGetter.getProxy(getRejectedExecutionHandler()));
((TaskQueue<Runnable>) getQueue()).setExecutor(this);
shutdownGracefulAsync(executor, "dubbo", 5);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.support.ExecutorWrapper;

import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -92,7 +91,7 @@ protected void initialize() {
String tpName = TP_PREFIX + "#" + nettyServer.getUrl().getPort();
try {
ReflectionUtil.setFieldValue(EXECUTOR_FIELD, nettyServer, proxy);
executors.put(tpName, new ExecutorWrapper(tpName, proxy));
putAndFinalize(tpName, executor, proxy);
} catch (IllegalAccessException ex) {
log.error("DynamicTp adapter, enhance {} failed.", tpName, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync;

/**
* @author hanli
* @since 1.1.4
Expand Down Expand Up @@ -61,7 +59,6 @@ public StandardThreadExecutorProxy(StandardThreadExecutor executor) {
throw new RuntimeException(e);
}
}
shutdownGracefulAsync(executor, "motan", 5);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public void adaptProducerExecutors() {
if (Objects.nonNull(executor)) {
ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy(executor);
producer.setAsyncSenderExecutor(proxy);
shutdownOriginalExecutor(executor);

String proKey = TP_PREFIX + "#producer#" + defaultMQProducer.getProducerGroup();
executors.put(proKey, new ExecutorWrapper(proKey, proxy));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.dromara.dynamictp.core.DtpRegistry;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import org.dromara.dynamictp.core.executor.eager.TaskQueue;
import org.dromara.dynamictp.core.plugin.DtpInterceptorRegistry;
import org.dromara.dynamictp.core.support.DynamicTp;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.executor.eager.TaskQueue;
import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand All @@ -50,6 +50,8 @@
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;

import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync;

/**
* BeanPostProcessor that handles all related beans managed by Spring.
*
Expand Down Expand Up @@ -121,14 +123,14 @@ private Object registerAndReturnCommon(Object bean, String beanName) {

private Object doRegisterAndReturnCommon(Object bean, String poolName) {
if (bean instanceof ThreadPoolTaskExecutor) {
val proxy = new ThreadPoolExecutorProxy(((ThreadPoolTaskExecutor) bean).getThreadPoolExecutor());
val proxy = newProxy(poolName, ((ThreadPoolTaskExecutor) bean).getThreadPoolExecutor());
try {
ReflectionUtil.setFieldValue("threadPoolExecutor", bean, proxy);
} catch (IllegalAccessException ignored) { }
DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE);
return bean;
}
val proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) bean);
val proxy = newProxy(poolName, (ThreadPoolExecutor) bean);
DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE);
return proxy;
}
Expand All @@ -142,4 +144,10 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}

private ThreadPoolExecutorProxy newProxy(String name, ThreadPoolExecutor originExecutor) {
val proxy = new ThreadPoolExecutorProxy(originExecutor);
shutdownGracefulAsync(originExecutor, name, 0);
return proxy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync;

/**
* ThreadPoolExecutor Proxy
*
Expand All @@ -56,7 +54,6 @@ public ThreadPoolExecutorProxy(ThreadPoolExecutor executor) {
executor.getThreadFactory(), executor.getRejectedExecutionHandler());
this.rejectHandlerType = getRejectedExecutionHandler().getClass().getSimpleName();
setRejectedExecutionHandler(RejectHandlerGetter.getProxy(getRejectedExecutionHandler()));
shutdownGracefulAsync(executor, "", 5);
}

@Override
Expand All @@ -72,7 +69,6 @@ protected void beforeExecute(Thread t, Runnable r) {
AwareManager.beforeExecute(this, t, r);
}


@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
import org.dromara.dynamictp.core.converter.ExecutorConverter;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.springframework.boot.web.context.WebServerApplicationContext;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.boot.web.server.WebServer;
Expand Down Expand Up @@ -63,10 +62,9 @@ protected void initialize() {
if (executors.get(getTpName()) == null) {
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
WebServer webServer = ((WebServerApplicationContext) applicationContext).getWebServer();
ExecutorWrapper wrapper = enhanceAndGetExecutorWrapper(webServer);
executors.put(getTpName(), wrapper);
doEnhance(webServer);
log.info("DynamicTp adapter, web server {} executor init end, executor: {}",
getTpName(), ExecutorConverter.toMainFields(wrapper));
getTpName(), ExecutorConverter.toMainFields(executors.get(getTpName())));
}
}

Expand All @@ -75,10 +73,9 @@ protected String getTpName() {
}

/**
* Enhance and get thread pool executor wrapper.
* Do enhance.
*
* @param webServer webServer
* @return the Executor instance
*/
protected abstract ExecutorWrapper enhanceAndGetExecutorWrapper(WebServer webServer);
protected abstract void doEnhance(WebServer webServer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ public class JettyDtpAdapter extends AbstractWebServerDtpAdapter<ThreadPool.Size
private static final String PRODUCER_FIELD = "_producer";

@Override
public ExecutorWrapper enhanceAndGetExecutorWrapper(WebServer webServer) {
public void doEnhance(WebServer webServer) {
JettyWebServer jettyWebServer = (JettyWebServer) webServer;
ThreadPool threadPool = jettyWebServer.getServer().getThreadPool();
final JettyExecutorAdapter adapter = new JettyExecutorAdapter(
(ThreadPool.SizedThreadPool) threadPool);
JettyExecutorAdapter adapter = new JettyExecutorAdapter((ThreadPool.SizedThreadPool) threadPool);
enhanceOriginTask(jettyWebServer, threadPool);
return new ExecutorWrapper(TP_PREFIX, adapter);
String tpName = getTpName();
executors.put(tpName, new ExecutorWrapper(tpName, adapter));
}

private void enhanceOriginTask(JettyWebServer jettyWebServer, ThreadPool threadPool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
import org.dromara.dynamictp.core.support.ExecutorAdapter;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.starter.adapter.webserver.AbstractWebServerDtpAdapter;
import org.springframework.boot.web.embedded.tomcat.TomcatWebServer;
import org.springframework.boot.web.server.WebServer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -44,12 +44,12 @@ public class TomcatDtpAdapter extends AbstractWebServerDtpAdapter<Executor> {
private static final String TP_PREFIX = "tomcatTp";

@Override
public ExecutorWrapper enhanceAndGetExecutorWrapper(WebServer webServer) {
public void doEnhance(WebServer webServer) {
TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer;
Executor originExecutor = tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor();
TomcatExecutorProxy proxy = new TomcatExecutorProxy((ThreadPoolExecutor) originExecutor);
tomcatWebServer.getTomcat().getConnector().getProtocolHandler().setExecutor(proxy);
return new ExecutorWrapper(getTpName(), new TomcatExecutorAdapter(proxy));
putAndFinalize(getTpName(), (ExecutorService) originExecutor, new TomcatExecutorAdapter(proxy));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync;

/**
* Tomcat ThreadPool Proxy
*
Expand Down Expand Up @@ -77,7 +75,6 @@ public TomcatExecutorProxy(ThreadPoolExecutor executor) {
if (executor.getQueue() instanceof TaskQueue) {
((TaskQueue) executor.getQueue()).setParent(this);
}
shutdownGracefulAsync(executor, "tomcat", 5);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync;

/**
* UndertowDtpAdapter related
*
Expand All @@ -56,40 +54,36 @@ public UndertowDtpAdapter() {
}

@Override
public ExecutorWrapper enhanceAndGetExecutorWrapper(WebServer webServer) {
public void doEnhance(WebServer webServer) {
val undertowServletWebServer = (UndertowServletWebServer) webServer;
val undertow = (Undertow) ReflectionUtil.getFieldValue(UndertowServletWebServer.class,
"undertow", undertowServletWebServer);
if (Objects.isNull(undertow)) {
return null;
return;
}
XnioWorker xnioWorker = undertow.getWorker();
Object taskPool = ReflectionUtil.getFieldValue(XnioWorker.class, "taskPool", xnioWorker);
if (Objects.isNull(taskPool)) {
return null;
return;
}
val handler = TaskPoolHandlerFactory.getTaskPoolHandler(taskPool.getClass().getSimpleName());
String internalExecutor = handler.taskPoolType().getInternalExecutor();
Object executor = ReflectionUtil.getFieldValue(taskPool.getClass(), internalExecutor, taskPool);
String tpName = getTpName();
if (executor instanceof ThreadPoolExecutor) {
enhanceOriginExecutor(tpName, (ThreadPoolExecutor) executor, internalExecutor, taskPool);
return executors.get(getTpName());
} else if (executor instanceof EnhancedQueueExecutor) {
try {
val proxy = new EnhancedQueueExecutorProxy((EnhancedQueueExecutor) executor);
ReflectionUtil.setFieldValue(internalExecutor, taskPool, proxy);
val executorWrapper = new ExecutorWrapper(tpName, new EnhancedQueueExecutorAdapter(proxy));
executors.put(tpName, executorWrapper);
shutdownGracefulAsync((ExecutorService) executor, "undertow", 5);
return executorWrapper;
putAndFinalize(tpName, (ExecutorService) executor, new EnhancedQueueExecutorAdapter(proxy));
} catch (Throwable t) {
log.error("DynamicTp adapter, enhance {} failed, please adjust the order of the two dependencies" +
"(starter-undertow and starter-adapter-webserver) and try again.", tpName, t);
return new ExecutorWrapper(tpName, handler.adapt(executor));
executors.put(tpName, new ExecutorWrapper(tpName, handler.adapt(executor)));
}
} else {
return new ExecutorWrapper(tpName, handler.adapt(executor));
executors.put(tpName, new ExecutorWrapper(tpName, handler.adapt(executor)));
}
}

Expand Down

0 comments on commit fe1b94f

Please sign in to comment.