From d14f9bda963361ef046e7a98cb6ccbb3aa5fe2d9 Mon Sep 17 00:00:00 2001 From: xinyuwang1 Date: Sun, 19 Jan 2025 11:05:19 +0800 Subject: [PATCH] [CELEBORN-1841] Support custom implementation of EventExecutorChooser to avoid deadlock when calling await in EventLoop thread --- .../client/TransportClientFactory.java | 6 ++- ...flictAvoidEventExecutorChooserFactory.java | 53 +++++++++++++++++++ .../common/network/util/NettyUtils.java | 19 ++++++- .../common/network/util/TransportConf.java | 5 ++ .../apache/celeborn/common/CelebornConf.scala | 20 +++++++ docs/configuration/network.md | 1 + 6 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 common/src/main/java/org/apache/celeborn/common/network/util/ConflictAvoidEventExecutorChooserFactory.java diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java index fac64490b08..b86e528d534 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java @@ -114,7 +114,11 @@ public TransportClientFactory( this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); logger.info("Module {} mode {} threads {}", conf.getModuleName(), ioMode, conf.clientThreads()); this.workerGroup = - NettyUtils.createEventLoop(ioMode, conf.clientThreads(), conf.getModuleName() + "-client"); + NettyUtils.createEventLoop( + ioMode, + conf.clientThreads(), + conf.conflictAvoidChooserEnable(), + conf.getModuleName() + "-client"); // Always disable thread-local cache when creating pooled ByteBuf allocator for TransportClients // because the ByteBufs are allocated by the event loop thread, but released by the executor // thread rather than the event loop thread. Those thread-local caches actually delay the diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/ConflictAvoidEventExecutorChooserFactory.java b/common/src/main/java/org/apache/celeborn/common/network/util/ConflictAvoidEventExecutorChooserFactory.java new file mode 100644 index 00000000000..a16522598bc --- /dev/null +++ b/common/src/main/java/org/apache/celeborn/common/network/util/ConflictAvoidEventExecutorChooserFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network.util; + +import java.util.concurrent.atomic.AtomicLong; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorChooserFactory; + +public final class ConflictAvoidEventExecutorChooserFactory implements EventExecutorChooserFactory { + public static final ConflictAvoidEventExecutorChooserFactory INSTANCE = + new ConflictAvoidEventExecutorChooserFactory(); + + private ConflictAvoidEventExecutorChooserFactory() {} + + @Override + public EventExecutorChooser newChooser(EventExecutor[] executors) { + return new ConflictAvoidEventExecutorChooser(executors); + } + + private static final class ConflictAvoidEventExecutorChooser implements EventExecutorChooser { + private final AtomicLong idx = new AtomicLong(); + private final EventExecutor[] executors; + + ConflictAvoidEventExecutorChooser(EventExecutor[] executors) { + this.executors = executors; + } + + @Override + public EventExecutor next() { + EventExecutor executor = executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; + if (executor.inEventLoop()) { + executor = executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; + } + return executor; + } + } +} diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java index 55a01891492..2f04179d7a4 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java @@ -17,6 +17,7 @@ package org.apache.celeborn.common.network.util; +import java.nio.channels.spi.SelectorProvider; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -29,6 +30,7 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -38,6 +40,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.ThreadPerTaskExecutor; import io.netty.util.internal.PlatformDependent; import org.apache.celeborn.common.CelebornConf; @@ -56,13 +59,25 @@ public static ThreadFactory createThreadFactory(String threadPoolPrefix) { return new DefaultThreadFactory(threadPoolPrefix, true); } - /** Creates a Netty EventLoopGroup based on the IOMode. */ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) { + return createEventLoop(mode, numThreads, false, threadPrefix); + } + + /** Creates a Netty EventLoopGroup based on the IOMode. */ + public static EventLoopGroup createEventLoop( + IOMode mode, int numThreads, boolean conflictAvoidChooserEnable, String threadPrefix) { ThreadFactory threadFactory = createThreadFactory(threadPrefix); switch (mode) { case NIO: - return new NioEventLoopGroup(numThreads, threadFactory); + return conflictAvoidChooserEnable + ? new NioEventLoopGroup( + numThreads, + new ThreadPerTaskExecutor(threadFactory), + ConflictAvoidEventExecutorChooserFactory.INSTANCE, + SelectorProvider.provider(), + DefaultSelectStrategyFactory.INSTANCE) + : new NioEventLoopGroup(numThreads, threadFactory); case EPOLL: return new EpollEventLoopGroup(numThreads, threadFactory); default: diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java index b607eaa8df9..36fcf363aa2 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java @@ -84,6 +84,11 @@ public int clientThreads() { return celebornConf.networkIoClientThreads(module); } + /** * Whether to use conflict avoid EventExecutorChooser while creating transport client */ + public boolean conflictAvoidChooserEnable() { + return celebornConf.networkIoConflictAvoidChooserEnable(module); + } + /** * Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer * should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index f18cc272fb3..9ae4491c38d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -572,6 +572,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se getTransportConfInt(module, NETWORK_IO_CLIENT_THREADS) } + def networkIoConflictAvoidChooserEnable(module: String): Boolean = { + val key = NETWORK_IO_CLIENT_CONFLICT_AVOID_CHOOSER_ENABLE.key.replace("", module) + getBoolean(key, NETWORK_IO_CLIENT_CONFLICT_AVOID_CHOOSER_ENABLE.defaultValue.get) + } + def networkIoReceiveBuf(module: String): Int = { getTransportConfSizeAsBytes(module, NETWORK_IO_RECEIVE_BUFFER).toInt } @@ -2061,6 +2066,21 @@ object CelebornConf extends Logging { .intConf .createWithDefault(0) + val NETWORK_IO_CLIENT_CONFLICT_AVOID_CHOOSER_ENABLE: ConfigEntry[Boolean] = + buildConf("celeborn..io.conflictAvoidChooser.enable") + .categories("network") + .doc("Whether to use conflict avoid event executor chooser in the client thread pool. " + + s"If setting to `${TransportModuleConstants.RPC_APP_MODULE}`, " + + s"works for shuffle client. " + + s"If setting to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " + + s"works for master or worker. " + + s"If setting to `${TransportModuleConstants.DATA_MODULE}`, " + + s"it works for shuffle client push and fetch data. " + + s"If setting to `${TransportModuleConstants.REPLICATE_MODULE}`, " + + s"it works for replicate client of worker replicating data to peer worker.") + .booleanConf + .createWithDefault(false) + val NETWORK_IO_RECEIVE_BUFFER: ConfigEntry[Long] = buildConf("celeborn..io.receiveBuffer") .categories("network") diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 4a5f8c9434e..3f9c0bc9a4f 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -24,6 +24,7 @@ license: | | celeborn.<module>.heartbeat.interval | 60s | false | The heartbeat interval between worker and client. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. If you are using the "celeborn.client.heartbeat.interval", please use the new configs for each module according to your needs or replace it with "celeborn.rpc.heartbeat.interval", "celeborn.data.heartbeat.interval" and "celeborn.replicate.heartbeat.interval". | 0.3.0 | celeborn.client.heartbeat.interval | | celeborn.<module>.io.backLog | 0 | false | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.clientThreads | 0 | false | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | +| celeborn.<module>.io.conflictAvoidChooser.enable | false | false | Whether to use conflict avoid event executor chooser in the client thread pool. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.connectTimeout | <value of celeborn.network.connect.timeout> | false | Socket connect timeout. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `replicate`, it works for the replicate client of worker replicating data to peer worker. | | | | celeborn.<module>.io.connectionTimeout | <value of celeborn.network.timeout> | false | Connection active timeout. If setting to `rpc_app`, works for shuffle client. If setting to `rpc_service`, works for master or worker. If setting to `data`, it works for shuffle client push and fetch data. If setting to `push`, it works for worker receiving push data. If setting to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting to `fetch`, it works for worker fetch server. | | | | celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | |