Skip to content

Commit

Permalink
optimize dubbo threadpool, add eager mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Oct 10, 2023
1 parent 0b11401 commit 40d3738
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dubbo.common.store.DataStore;
import org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor;
import org.apache.dubbo.config.spring.context.event.ServiceBeanExportedEvent;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter;
Expand All @@ -38,6 +39,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

Expand All @@ -54,7 +56,7 @@ public class ApacheDubboDtpAdapter extends AbstractDtpAdapter {
private static final String TP_PREFIX = "dubboTp";

private static final String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();

@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ServiceBeanExportedEvent) {
Expand All @@ -67,7 +69,7 @@ public void onApplicationEvent(ApplicationEvent event) {
}
}
}

@Override
public void refresh(DtpProperties dtpProperties) {
refresh(dtpProperties.getDubboTp(), dtpProperties.getPlatforms());
Expand All @@ -91,7 +93,7 @@ protected void initialize() {
Map<String, Object> executorMap = dataStore.get(EXECUTOR_SERVICE_COMPONENT_KEY);
if (MapUtils.isNotEmpty(executorMap)) {
executorMap.forEach((k, v) -> {
ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) v);
ThreadPoolExecutor proxy = getProxy((ThreadPoolExecutor) v);
executorMap.replace(k, proxy);
String tpName = genTpName(k);
executors.put(tpName, new ExecutorWrapper(tpName, proxy));
Expand All @@ -118,14 +120,24 @@ protected void initialize() {
Map<Integer, ExecutorService> executorMap = data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
if (MapUtils.isNotEmpty(executorMap)) {
executorMap.forEach((k, v) -> {
ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) v);
ThreadPoolExecutor proxy = getProxy(v);
executorMap.replace(k, proxy);
String tpName = genTpName(k.toString());
executors.put(tpName, new ExecutorWrapper(tpName, proxy));
});
}
}

private ThreadPoolExecutor getProxy(Executor executor) {
ThreadPoolExecutor proxy;
if (executor instanceof EagerThreadPoolExecutor) {
proxy = new EagerThreadPoolExecutorProxy((EagerThreadPoolExecutor) executor);
} else {
proxy = new ThreadPoolExecutorProxy((ThreadPoolExecutor) executor);
}
return proxy;
}

private String genTpName(String port) {
return TP_PREFIX + "#" + port;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.dromara.dynamictp.adapter.dubbo.apache;

import org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor;
import org.apache.dubbo.common.threadpool.support.eager.TaskQueue;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* EagerThreadPoolExecutorProxy related
*
* @author yanhom
* @since 1.1.5
*/
public class EagerThreadPoolExecutorProxy extends EagerThreadPoolExecutor implements TaskEnhanceAware, RejectHandlerAware {

/**
* Task wrappers, do sth enhanced.
*/
private List<TaskWrapper> taskWrappers;

private final String rejectHandlerType;

public EagerThreadPoolExecutorProxy(EagerThreadPoolExecutor executor) {
super(executor.getCorePoolSize(), executor.getMaximumPoolSize(),
executor.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS, (TaskQueue<Runnable>) executor.getQueue(),
executor.getThreadFactory(), executor.getRejectedExecutionHandler());
this.rejectHandlerType = getRejectedExecutionHandler().getClass().getSimpleName();
setRejectedExecutionHandler(RejectHandlerGetter.getProxy(getRejectedExecutionHandler()));
((TaskQueue<Runnable>) getQueue()).setExecutor(this);
executor.shutdownNow();
}

@Override
public void execute(Runnable command) {
command = getEnhancedTask(command, taskWrappers);
AwareManager.execute(this, command);
super.execute(command);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
}

@Override
public void setTaskWrappers(List<TaskWrapper> taskWrappers) {
this.taskWrappers = taskWrappers;
}

@Override
public String getRejectHandlerType() {
return rejectHandlerType;
}
}

0 comments on commit 40d3738

Please sign in to comment.