diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/config/DefaultStoreConfiguration.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/config/DefaultStoreConfiguration.java index ee8f6f52..91361b46 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/config/DefaultStoreConfiguration.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/config/DefaultStoreConfiguration.java @@ -53,6 +53,9 @@ public class DefaultStoreConfiguration implements StoreConfiguration { private static final int SCHEDULE_CLEAN_BEFORE_DISPATCH_TIMES_IN_HOUR = 24; private static final int DEFAULT_SEGMENT_SCALE_IN_MIN = 60; + public static final String BROKER_WEIGHT_TASK_TIMER_INTERVAL = "broker.weight.task.timer.interval"; + public static final int DEFAULT_BROKER_WEIGHT_TASK_TIMER_INTERVAL = 3 * 1000; + private volatile int segmentScale; private volatile long inAdvanceLoadMillis; private volatile long loadBlockingExitMillis; diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java index 30732c10..46c140ea 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java @@ -20,14 +20,14 @@ import com.google.common.collect.Maps; import qunar.tc.qmq.broker.BrokerClusterInfo; import qunar.tc.qmq.broker.BrokerGroupInfo; -import qunar.tc.qmq.broker.BrokerLoadBalance; import qunar.tc.qmq.broker.BrokerService; -import qunar.tc.qmq.broker.impl.PollBrokerLoadBalance; import qunar.tc.qmq.common.ClientType; import qunar.tc.qmq.common.Disposable; import qunar.tc.qmq.configuration.DynamicConfig; import qunar.tc.qmq.delay.DelayLogFacade; import qunar.tc.qmq.delay.ScheduleIndex; +import qunar.tc.qmq.delay.sender.loadbalance.InSendingNumWeightLoadBalancer; +import qunar.tc.qmq.delay.sender.loadbalance.LoadBalancer; import java.util.List; import java.util.Map; @@ -42,15 +42,15 @@ class SenderExecutor implements Disposable { private static final int DEFAULT_SEND_THREAD = 1; private final ConcurrentMap groupSenders = new ConcurrentHashMap<>(); - private final BrokerLoadBalance brokerLoadBalance; private final Sender sender; private final DelayLogFacade store; private final int sendThreads; + private final LoadBalancer balancer; SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) { this.sender = sender; this.store = store; - this.brokerLoadBalance = PollBrokerLoadBalance.getInstance(); + this.balancer = new InSendingNumWeightLoadBalancer(sendConfig); this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD); } @@ -62,7 +62,7 @@ void execute(final List indexList, final SenderGroup.ResultHandle } private void doExecute(final SenderGroup group, final List list, final SenderGroup.ResultHandler handler) { - group.send(list, sender, handler); + group.send(list, sender, handler, balancer); } private Map> groupByBroker(final List indexList, final BrokerService brokerService) { @@ -100,7 +100,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) { private BrokerGroupInfo loadGroup(String subject, BrokerService brokerService) { BrokerClusterInfo cluster = brokerService.getClusterBySubject(ClientType.PRODUCER, subject); - return brokerLoadBalance.loadBalance(cluster, null); + return balancer.select(cluster); } private Map> groupBySubject(List list) { diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java index 0b66b689..46bf8b14 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java @@ -26,6 +26,8 @@ import qunar.tc.qmq.delay.DelayLogFacade; import qunar.tc.qmq.delay.ScheduleIndex; import qunar.tc.qmq.delay.monitor.QMon; +import qunar.tc.qmq.delay.sender.loadbalance.BrokerGroupStats; +import qunar.tc.qmq.delay.sender.loadbalance.LoadBalancer; import qunar.tc.qmq.delay.store.model.ScheduleSetRecord; import qunar.tc.qmq.metrics.Metrics; import qunar.tc.qmq.netty.exception.ClientSendException; @@ -62,23 +64,26 @@ public class SenderGroup implements Disposable { this.executorService = new ThreadPoolExecutor(1, sendThreads, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder() .setNameFormat("delay-sender-" + groupInfo.getGroupName() + "-%d").build()); + + Metrics.gauge("sendGroupQueueSize", new String[]{"brokerGroup"}, new String[]{groupInfo.getGroupName()}, () -> (double) executorService.getQueue().size()); } - public void send(final List records, final Sender sender, final ResultHandler handler) { - executorService.execute(() -> doSend(records, sender, handler)); + public void send(final List records, final Sender sender, final ResultHandler handler, final LoadBalancer balancer) { + final BrokerGroupStats stats = balancer.getBrokerGroupStats(groupInfo.get()); + stats.incrementToSendCount(records.size()); + executorService.execute(() -> doSend(records, sender, handler, balancer)); } - private void doSend(final List batch, final Sender sender, final ResultHandler handler) { + private void doSend(final List batch, final Sender sender, final ResultHandler handler, final LoadBalancer balancer) { BrokerGroupInfo groupInfo = this.groupInfo.get(); - String groupName = groupInfo.getGroupName(); List> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE); for (List partition : partitions) { - send(sender, handler, groupInfo, groupName, partition); + send(sender, handler, groupInfo, partition, balancer); } } - private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List list) { + private void send(final Sender sender, final ResultHandler handler, final BrokerGroupInfo groupInfo, final List list, final LoadBalancer balancer) { try { long start = System.currentTimeMillis(); List records = store.recoverLogRecord(list); @@ -86,9 +91,10 @@ private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInf Datagram response = sendMessages(records, sender); release(records); - monitor(list, groupName); + monitor(list, groupInfo.getGroupName()); if (response == null) { - handler.fail(list); + groupInfo.markFailed(); + fail(list, groupInfo.getGroupName(), handler); } else { final int responseCode = response.getHeader().getCode(); final Map resultMap = getSendResult(response); @@ -98,9 +104,7 @@ private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInf groupInfo.markFailed(); } - monitorSendFail(list, groupInfo.getGroupName()); - - handler.fail(list); + fail(list, groupInfo.getGroupName(), handler); return; } @@ -121,11 +125,19 @@ private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInf handler.success(records, failedMessageIds); } } catch (Throwable e) { - LOGGER.error("sender group send batch failed,broker:{},batch size:{}", groupName, list.size(), e); - handler.fail(list); + LOGGER.error("sender group send batch failed,broker:{},batch size:{}", groupInfo.getGroupName(), list.size(), e); + fail(list, groupInfo.getGroupName(), handler); + } finally { + BrokerGroupStats stats = balancer.getBrokerGroupStats(groupInfo); + stats.decrementToSendCount(list.size()); } } + private void fail(final List list, final String groupName, final ResultHandler handler) { + monitorSendFail(list, groupName); + handler.fail(list); + } + private void release(List records) { for (ScheduleSetRecord record : records) { record.release(); diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/BrokerGroupStats.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/BrokerGroupStats.java new file mode 100644 index 00000000..36fddda4 --- /dev/null +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/BrokerGroupStats.java @@ -0,0 +1,40 @@ +package qunar.tc.qmq.delay.sender.loadbalance; + +import qunar.tc.qmq.broker.BrokerGroupInfo; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author xufeng.deng dennisdxf@gmail.com + * @since 2019-01-08 15:08 + */ +public class BrokerGroupStats { + private final BrokerGroupInfo brokerGroupInfo; + + // send time + + // send failed + + // send success + + // to send count + private final AtomicLong toSend; + + public BrokerGroupStats(final BrokerGroupInfo brokerGroupInfo) { + this.brokerGroupInfo = brokerGroupInfo; + this.toSend = new AtomicLong(0); + } + + public void incrementToSendCount(long toSendNum) { + toSend.addAndGet(toSendNum); + } + + public void decrementToSendCount(long sendNum) { + long count = toSend.get(); + toSend.set(count - sendNum); + } + + long getToSendCount() { + return toSend.get(); + } +} diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/InSendingNumWeightLoadBalancer.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/InSendingNumWeightLoadBalancer.java new file mode 100644 index 00000000..bea41b48 --- /dev/null +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/InSendingNumWeightLoadBalancer.java @@ -0,0 +1,181 @@ +package qunar.tc.qmq.delay.sender.loadbalance; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import qunar.tc.qmq.broker.BrokerClusterInfo; +import qunar.tc.qmq.broker.BrokerGroupInfo; +import qunar.tc.qmq.configuration.DynamicConfig; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static qunar.tc.qmq.delay.config.DefaultStoreConfiguration.BROKER_WEIGHT_TASK_TIMER_INTERVAL; +import static qunar.tc.qmq.delay.config.DefaultStoreConfiguration.DEFAULT_BROKER_WEIGHT_TASK_TIMER_INTERVAL; + +/** + * @author xufeng.deng dennisdxf@gmail.com + * @since 2019-01-08 16:03 + */ +public class InSendingNumWeightLoadBalancer extends RandomLoadBalancer { + private static final Logger LOG = LoggerFactory.getLogger(InSendingNumWeightLoadBalancer.class); + + private final LoadBalanceStats stats; + + private volatile List accumulatedWeights = Collections.synchronizedList(new ArrayList<>()); + + private volatile List brokerGroups = Collections.synchronizedList(new ArrayList<>()); + + private final AtomicBoolean brokerWeightAssignmentInProgress = new AtomicBoolean(false); + + private final Timer brokerWeightTimer; + + private final Random random = new Random(); + + public InSendingNumWeightLoadBalancer(final DynamicConfig config) { + stats = new LoadBalanceStats(); + brokerWeightTimer = new Timer("brokerWeightTimer", true); + scheduleBrokerWeight(config); + } + + private void scheduleBrokerWeight(final DynamicConfig config) { + brokerWeightTimer.schedule(new DynamicBrokerGroupWeightTask(), 0, config.getInt(BROKER_WEIGHT_TASK_TIMER_INTERVAL, DEFAULT_BROKER_WEIGHT_TASK_TIMER_INTERVAL)); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + LOG.info("Stopping brokerWeightTimer."); + brokerWeightTimer.cancel(); + })); + } + + @Override + public BrokerGroupInfo select(BrokerClusterInfo clusterInfo) { + List arrivalGroups = clusterInfo.getGroups(); + if (arrivalGroups == null || arrivalGroups.isEmpty()) return null; + + List currentWeights = getAccumulatedWeights(); + List stayGroupInfos = getBrokerGroups(); + + int groupsSize = arrivalGroups.size(); + refreshBrokerGroups(arrivalGroups, stayGroupInfos); + + BrokerGroupInfo brokerGroupInfo = null; + int cyclicCount = 0; + while (brokerGroupInfo == null && cyclicCount++ < groupsSize * 3) { + int brokerIndex = 0; + long maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); + if (maxTotalWeight < 1000) { + brokerGroupInfo = super.select(clusterInfo); + } else { + long randomWeight = random.nextLong() * maxTotalWeight; + int n = 0; + for (Long l : currentWeights) { + if (l >= randomWeight) { + brokerIndex = n; + break; + } else { + ++n; + } + } + + brokerGroupInfo = stayGroupInfos.get(brokerIndex); + + if (brokerGroupInfo == null) { + Thread.yield(); + continue; + } + + if (brokerGroupInfo.isAvailable()) { + return brokerGroupInfo; + } + + brokerGroupInfo = null; + } + } + + return brokerGroupInfo; + } + + private void refreshBrokerGroups(List arrivalGroups, List stayBrokerGroups) { + Set oldSet = Sets.newHashSet(stayBrokerGroups); + Set newSet = Sets.newHashSet(arrivalGroups); + Set removals = Sets.difference(oldSet, newSet); + Set adds = Sets.difference(newSet, oldSet); + if (!removals.isEmpty() || !adds.isEmpty()) { + List attached = Lists.newArrayList(stayBrokerGroups); + attached.removeAll(removals); + attached.addAll(adds); + setBrokerGroups(attached); + } + } + + class DynamicBrokerGroupWeightTask extends TimerTask { + + @Override + public void run() { + BrokerWeight brokerWeight = new BrokerWeight(); + try { + brokerWeight.maintainWeights(); + } catch (Exception e) { + LOG.error("Error running DynamicBrokerGroupWeightTask.", e); + } + } + } + + class BrokerWeight { + void maintainWeights() { + if (!brokerWeightAssignmentInProgress.compareAndSet(false, true)) { + return; + } + + try { + doMaintain(); + } catch (Exception e) { + LOG.error("Error calculating broker weights."); + } finally { + brokerWeightAssignmentInProgress.set(false); + } + } + + private void doMaintain() { + long total = 0; + List groups = getBrokerGroups(); + for (BrokerGroupInfo brokerGroup : groups) { + final BrokerGroupStats brokerGroupStats = stats.getBrokerGroupStats(brokerGroup); + total += brokerGroupStats.getToSendCount(); + } + + long weightSoFar = 0; + List finalWeights = Lists.newArrayListWithCapacity(groups.size()); + for (BrokerGroupInfo brokerGroup : groups) { + final BrokerGroupStats brokerGroupStats = stats.getBrokerGroupStats(brokerGroup); + long weight = total - brokerGroupStats.getToSendCount(); + weightSoFar += weight; + finalWeights.add(weightSoFar); + } + setAccumulatedWeights(finalWeights); + } + } + + @Override + public BrokerGroupStats getBrokerGroupStats(BrokerGroupInfo brokerGroupInfo) { + return stats.getBrokerGroupStats(brokerGroupInfo); + } + + private void setAccumulatedWeights(final List weights) { + this.accumulatedWeights = weights; + } + + private void setBrokerGroups(final List brokerGroups) { + this.brokerGroups = brokerGroups; + } + + private List getBrokerGroups() { + return ImmutableList.copyOf(brokerGroups); + } + + private List getAccumulatedWeights() { + return ImmutableList.copyOf(accumulatedWeights); + } + +} diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/LoadBalanceStats.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/LoadBalanceStats.java new file mode 100644 index 00000000..165c5786 --- /dev/null +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/LoadBalanceStats.java @@ -0,0 +1,36 @@ +package qunar.tc.qmq.delay.sender.loadbalance; + +import com.google.common.cache.*; +import qunar.tc.qmq.broker.BrokerGroupInfo; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * @author xufeng.deng dennisdxf@gmail.com + * @since 2019-01-08 15:12 + */ +class LoadBalanceStats { + private final LoadingCache brokerGroupStatsCache = CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES).build(new CacheLoader() { + @Override + public BrokerGroupStats load(BrokerGroupInfo key) throws Exception { + return createBrokerGroupStats(key); + } + }); + + private BrokerGroupStats createBrokerGroupStats(BrokerGroupInfo brokerGroupInfo) { + return new BrokerGroupStats(brokerGroupInfo); + } + + BrokerGroupStats getBrokerGroupStats(final BrokerGroupInfo brokerGroup) { + try { + return brokerGroupStatsCache.get(brokerGroup); + } catch (ExecutionException e) { + BrokerGroupStats stats = createBrokerGroupStats(brokerGroup); + brokerGroupStatsCache.asMap().putIfAbsent(brokerGroup, stats); + return brokerGroupStatsCache.asMap().get(brokerGroup); + } + } + +} diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/LoadBalancer.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/LoadBalancer.java new file mode 100644 index 00000000..28028e3b --- /dev/null +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/LoadBalancer.java @@ -0,0 +1,14 @@ +package qunar.tc.qmq.delay.sender.loadbalance; + +import qunar.tc.qmq.broker.BrokerClusterInfo; +import qunar.tc.qmq.broker.BrokerGroupInfo; + +/** + * @author xufeng.deng dennisdxf@gmail.com + * @since 2019-01-08 14:46 + */ +public interface LoadBalancer { + BrokerGroupInfo select(BrokerClusterInfo clusterInfo); + + BrokerGroupStats getBrokerGroupStats(BrokerGroupInfo brokerGroupInfo); +} diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/RandomLoadBalancer.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/RandomLoadBalancer.java new file mode 100644 index 00000000..87f82952 --- /dev/null +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/loadbalance/RandomLoadBalancer.java @@ -0,0 +1,44 @@ +package qunar.tc.qmq.delay.sender.loadbalance; + +import qunar.tc.qmq.broker.BrokerClusterInfo; +import qunar.tc.qmq.broker.BrokerGroupInfo; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * @author xufeng.deng dennisdxf@gmail.com + * @since 2019-01-08 17:30 + */ +public class RandomLoadBalancer implements LoadBalancer { + + RandomLoadBalancer() { + } + + @Override + public BrokerGroupInfo select(BrokerClusterInfo clusterInfo) { + List groups = clusterInfo.getGroups(); + int size = groups.size(); + if (size == 0) return null; + BrokerGroupInfo brokerGroupInfo; + for (int i = 0; i < size * 3; ++i) { + int index = randomInt(size); + brokerGroupInfo = groups.get(index); + if (brokerGroupInfo.isAvailable()) { + return brokerGroupInfo; + } + } + + return null; + } + + @Override + public BrokerGroupStats getBrokerGroupStats(BrokerGroupInfo brokerGroupInfo) { + throw new RuntimeException("UnsupportedOps"); + } + + private int randomInt(int count) { + return ThreadLocalRandom.current().nextInt(count); + } + +}