Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1841] Support custom implementation of EventExecutorChooser to avoid deadlock when calling await in EventLoop thread #3071

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ public TransportClientFactory(
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
logger.info("Module {} mode {} threads {}", conf.getModuleName(), ioMode, conf.clientThreads());
this.workerGroup =
NettyUtils.createEventLoop(ioMode, conf.clientThreads(), conf.getModuleName() + "-client");
NettyUtils.createEventLoop(
ioMode,
conf.clientThreads(),
conf.conflictAvoidChooserEnable(),
conf.getModuleName() + "-client");
// Always disable thread-local cache when creating pooled ByteBuf allocator for TransportClients
// because the ByteBufs are allocated by the event loop thread, but released by the executor
// thread rather than the event loop thread. Those thread-local caches actually delay the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.apache.celeborn.common.network.util;

import java.util.concurrent.atomic.AtomicLong;

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorChooserFactory;

public final class ConflictAvoidEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final ConflictAvoidEventExecutorChooserFactory INSTANCE =
new ConflictAvoidEventExecutorChooserFactory();

private ConflictAvoidEventExecutorChooserFactory() {}

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
return new ConflictAvoidEventExecutorChooser(executors);
}

private static final class ConflictAvoidEventExecutorChooser implements EventExecutorChooser {
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;

ConflictAvoidEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
EventExecutor executor = executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
if (executor.inEventLoop()) {
executor = executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
return executor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.celeborn.common.network.util;

import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -27,6 +28,7 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
Expand All @@ -36,6 +38,7 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import io.netty.util.internal.PlatformDependent;

import org.apache.celeborn.common.CelebornConf;
Expand All @@ -52,13 +55,25 @@ public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
return new DefaultThreadFactory(threadPoolPrefix, true);
}

/** Creates a Netty EventLoopGroup based on the IOMode. */
public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
return createEventLoop(mode, numThreads, false, threadPrefix);
}

/** Creates a Netty EventLoopGroup based on the IOMode. */
public static EventLoopGroup createEventLoop(
IOMode mode, int numThreads, boolean conflictAvoidChooserEnable, String threadPrefix) {
ThreadFactory threadFactory = createThreadFactory(threadPrefix);

switch (mode) {
case NIO:
return new NioEventLoopGroup(numThreads, threadFactory);
return conflictAvoidChooserEnable
? new NioEventLoopGroup(
numThreads,
new ThreadPerTaskExecutor(threadFactory),
ConflictAvoidEventExecutorChooserFactory.INSTANCE,
SelectorProvider.provider(),
DefaultSelectStrategyFactory.INSTANCE)
: new NioEventLoopGroup(numThreads, threadFactory);
case EPOLL:
return new EpollEventLoopGroup(numThreads, threadFactory);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public int clientThreads() {
return celebornConf.networkIoClientThreads(module);
}

/** * Whether to use conflict avoid EventExecutorChooser while creating transport client */
public boolean conflictAvoidChooserEnable() {
return celebornConf.networkIoConflictAvoidChooserEnable(module);
}

/**
* Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer
* should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
getTransportConfInt(module, NETWORK_IO_CLIENT_THREADS)
}

def networkIoConflictAvoidChooserEnable(module: String): Boolean = {
val key = NETWORK_IO_CLIENT_CONFLICT_AVOID_CHOOSER_ENABLE.key.replace("<module>", module)
getBoolean(key, NETWORK_IO_CLIENT_CONFLICT_AVOID_CHOOSER_ENABLE.defaultValue.get)
}

def networkIoReceiveBuf(module: String): Int = {
getTransportConfSizeAsBytes(module, NETWORK_IO_RECEIVE_BUFFER).toInt
}
Expand Down Expand Up @@ -2058,6 +2063,21 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(0)

val NETWORK_IO_CLIENT_CONFLICT_AVOID_CHOOSER_ENABLE: ConfigEntry[Boolean] =
buildConf("celeborn.<module>.io.conflictAvoidChooser.enable")
.categories("network")
.doc("Whether to use conflict avoid event executor chooser in the client thread pool. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker.")
.booleanConf
.createWithDefault(false)

Copy link
Contributor

@zaynt4606 zaynt4606 Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to run the following command to refresh docs if there are configuration changes.

UPDATE=1 build/mvn clean test -pl common -am -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite

val NETWORK_IO_RECEIVE_BUFFER: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.receiveBuffer")
.categories("network")
Expand Down
Loading