From aac129df06effc20781a9b5ccb0da0abe8716858 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Mon, 13 Nov 2023 12:34:36 +0800 Subject: [PATCH] bump guava from 14.0.1 to 27.0-jre --- .../buffer/FileSegmentManagedBuffer.java | 12 +-- .../network/buffer/NettyManagedBuffer.java | 7 +- .../network/buffer/NioManagedBuffer.java | 7 +- .../network/client/TransportClient.java | 11 +-- .../network/protocol/ChunkFetchFailure.java | 12 +-- .../network/protocol/ChunkFetchRequest.java | 7 +- .../network/protocol/ChunkFetchSuccess.java | 12 +-- .../common/network/protocol/Message.java | 4 +- .../network/protocol/OneWayMessage.java | 10 ++- .../common/network/protocol/OpenStream.java | 16 ++-- .../common/network/protocol/PushData.java | 18 +++-- .../network/protocol/PushDataHandShake.java | 20 ++--- .../network/protocol/PushMergedData.java | 20 ++--- .../common/network/protocol/RegionFinish.java | 16 ++-- .../common/network/protocol/RegionStart.java | 20 ++--- .../common/network/protocol/RpcFailure.java | 12 +-- .../common/network/protocol/RpcRequest.java | 11 ++- .../common/network/protocol/RpcResponse.java | 11 ++- .../network/protocol/StreamChunkSlice.java | 16 ++-- .../common/network/protocol/StreamHandle.java | 12 +-- .../celeborn/common/util/ThreadUtils.scala | 79 ++++++++++++++++++- project/CelebornBuild.scala | 2 +- 22 files changed, 228 insertions(+), 107 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/network/buffer/FileSegmentManagedBuffer.java b/common/src/main/java/org/apache/celeborn/common/network/buffer/FileSegmentManagedBuffer.java index dbd9d71ad82..1d5463b4dc0 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/buffer/FileSegmentManagedBuffer.java +++ b/common/src/main/java/org/apache/celeborn/common/network/buffer/FileSegmentManagedBuffer.java @@ -21,8 +21,8 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; - -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import com.google.common.io.ByteStreams; import io.netty.channel.DefaultFileRegion; @@ -145,10 +145,10 @@ public long getLength() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("file", file) - .add("offset", offset) - .add("length", length) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("file", file) + .append("offset", offset) + .append("length", length) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/buffer/NettyManagedBuffer.java b/common/src/main/java/org/apache/celeborn/common/network/buffer/NettyManagedBuffer.java index d3be0a6d960..dd293ed07df 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/buffer/NettyManagedBuffer.java +++ b/common/src/main/java/org/apache/celeborn/common/network/buffer/NettyManagedBuffer.java @@ -18,10 +18,11 @@ package org.apache.celeborn.common.network.buffer; import java.io.IOException; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import java.io.InputStream; import java.nio.ByteBuffer; -import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; @@ -77,6 +78,8 @@ public Object convertToNetty() throws IOException { @Override public String toString() { - return Objects.toStringHelper(this).add("buf", buf).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("buf", buf) + .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/buffer/NioManagedBuffer.java b/common/src/main/java/org/apache/celeborn/common/network/buffer/NioManagedBuffer.java index 70a355a2efd..d11c0e55b6b 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/buffer/NioManagedBuffer.java +++ b/common/src/main/java/org/apache/celeborn/common/network/buffer/NioManagedBuffer.java @@ -18,10 +18,11 @@ package org.apache.celeborn.common.network.buffer; import java.io.IOException; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import java.io.InputStream; import java.nio.ByteBuffer; -import com.google.common.base.Objects; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; @@ -65,6 +66,8 @@ public Object convertToNetty() throws IOException { @Override public String toString() { - return Objects.toStringHelper(this).add("buf", buf).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("buf", buf) + .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java index 7865732c37d..340289b424a 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java @@ -18,6 +18,8 @@ package org.apache.celeborn.common.network.client; import java.io.Closeable; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -25,7 +27,6 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; @@ -314,10 +315,10 @@ public void close() { @Override public String toString() { - return Objects.toStringHelper(this) - .add("remoteAddress", channel.remoteAddress()) - .add("isActive", isActive()) - .toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("remoteAdress", channel.remoteAddress()) + .append("isActive", isActive()) + .toString(); } private static final AtomicLong counter = new AtomicLong(); diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchFailure.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchFailure.java index 532d0bab049..51260456468 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchFailure.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchFailure.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; import org.apache.celeborn.common.protocol.PbChunkFetchRequest; @@ -56,7 +58,7 @@ public static ChunkFetchFailure decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(streamChunkSlice, errorString); + return Objects.hash(streamChunkSlice, errorString); } @Override @@ -70,9 +72,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamChunkId", streamChunkSlice) - .add("errorString", errorString) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamChunkId", streamChunkSlice) + .append("errorString", errorString) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchRequest.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchRequest.java index 28672eac1d8..c200aeb904c 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchRequest.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchRequest.java @@ -17,7 +17,8 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import io.netty.buffer.ByteBuf; /** @@ -67,6 +68,8 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this).add("streamChunkId", streamChunkSlice).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamChunkId", streamChunkSlice) + .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java index 7d59920035d..fc3a5f0e2b0 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/ChunkFetchSuccess.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; import org.apache.celeborn.common.network.buffer.ManagedBuffer; @@ -76,7 +78,7 @@ public static ChunkFetchSuccess decode(ByteBuf buf, boolean decodeBody) { @Override public int hashCode() { - return Objects.hashCode(streamChunkSlice, body()); + return Objects.hash(streamChunkSlice, body()); } @Override @@ -90,9 +92,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamChunkId", streamChunkSlice) - .add("buffer", body()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamChunkId", streamChunkSlice) + .append("buffer", body()) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java index b2aaf735417..ee1c6ab6566 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java @@ -19,7 +19,7 @@ import java.nio.ByteBuffer; -import com.google.common.base.Objects; +import java.util.Objects; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -56,7 +56,7 @@ public boolean needCopyOut() { } protected boolean equals(Message other) { - return Objects.equal(body, other.body); + return Objects.equals(body, other.body); } public ByteBuffer toByteBuffer() { diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/OneWayMessage.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/OneWayMessage.java index 68c5a4263b8..5957f67b411 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/OneWayMessage.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/OneWayMessage.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; import org.apache.celeborn.common.network.buffer.ManagedBuffer; @@ -66,7 +68,7 @@ public static OneWayMessage decode(ByteBuf buf, boolean decodeBody) { @Override public int hashCode() { - return Objects.hashCode(body()); + return Objects.hash(body()); } @Override @@ -80,6 +82,8 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this).add("body", body()).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("body", body()) + .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/OpenStream.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/OpenStream.java index 7919900e764..3aec17f1445 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/OpenStream.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/OpenStream.java @@ -20,7 +20,9 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; /** @@ -81,7 +83,7 @@ public static OpenStream decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode( + return Objects.hash( Arrays.hashCode(shuffleKey), Arrays.hashCode(fileName), startMapIndex, endMapIndex); } @@ -99,11 +101,11 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("shuffleKey", new String(shuffleKey, StandardCharsets.UTF_8)) - .add("fileName", new String(fileName, StandardCharsets.UTF_8)) - .add("startMapIndex", startMapIndex) - .add("endMapIndex", endMapIndex) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("shuffleKey", new String(shuffleKey, StandardCharsets.UTF_8)) + .append("fileName", new String(fileName, StandardCharsets.UTF_8)) + .append("startMapIndex", startMapIndex) + .append("endMapIndex", endMapIndex) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java index a3e5da3d2e3..ab1be1ccbe4 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushData.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import java.util.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import io.netty.buffer.ByteBuf; import org.apache.celeborn.common.network.buffer.ManagedBuffer; @@ -86,7 +88,7 @@ public static PushData decode(ByteBuf buf, boolean decodeBody) { @Override public int hashCode() { - return Objects.hashCode(requestId, mode, shuffleKey, partitionUniqueId, body()); + return Objects.hash(requestId, mode, shuffleKey, partitionUniqueId, body()); } @Override @@ -104,12 +106,12 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("requestId", requestId) - .add("mode", mode) - .add("shuffleKey", shuffleKey) - .add("partitionUniqueId", partitionUniqueId) - .add("body size", body().size()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("mode", mode) + .append("shuffleKey", shuffleKey) + .append("partitionUniqueId", partitionUniqueId) + .append("body size", body().size()) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java index dc8c0481680..632df682eae 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; @Deprecated @@ -83,7 +85,7 @@ public static PushDataHandShake decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode( + return Objects.hash( mode, shuffleKey, partitionUniqueId, attemptId, numPartitions, bufferSize); } @@ -104,13 +106,13 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("mode", mode) - .add("shuffleKey", shuffleKey) - .add("partitionUniqueId", partitionUniqueId) - .add("attemptId", attemptId) - .add("numSubPartitions", numPartitions) - .add("bufferSize", bufferSize) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("mode", mode) + .append("shuffleKey", shuffleKey) + .append("partitionUniqueId", partitionUniqueId) + .append("attemptId", attemptId) + .append("numSubPartitions", numPartitions) + .append("bufferSize", bufferSize) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushMergedData.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushMergedData.java index 332e66f6f1e..1e30ba87675 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushMergedData.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushMergedData.java @@ -19,7 +19,9 @@ import java.util.Arrays; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; import org.apache.celeborn.common.network.buffer.ManagedBuffer; @@ -99,7 +101,7 @@ public static PushMergedData decode(ByteBuf buf, boolean decodeBody) { @Override public int hashCode() { - return Objects.hashCode(requestId, mode, shuffleKey); + return Objects.hash(requestId, mode, shuffleKey); } @Override @@ -118,13 +120,13 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("requestId", requestId) - .add("mode", mode) - .add("shuffleKey", shuffleKey) - .add("partitionIds", Arrays.toString(partitionUniqueIds)) - .add("batchOffsets", Arrays.toString(batchOffsets)) - .add("body size", body().size()) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("mode", mode) + .append("shuffleKey", shuffleKey) + .append("partitionIds", Arrays.toString(partitionUniqueIds)) + .append("batchOffsets", Arrays.toString(batchOffsets)) + .append("body size", body().size()) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java index c7f804d84c7..44d300540d7 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; @Deprecated @@ -68,7 +70,7 @@ public static RegionFinish decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(mode, shuffleKey, partitionUniqueId, attemptId); + return Objects.hash(mode, shuffleKey, partitionUniqueId, attemptId); } @Override @@ -86,11 +88,11 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("mode", mode) - .add("shuffleKey", shuffleKey) - .add("partitionUniqueId", partitionUniqueId) - .add("attemptId", attemptId) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("mode", mode) + .append("shuffleKey", shuffleKey) + .append("partitionUniqueId", partitionUniqueId) + .append("attemptId", attemptId) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java index 2c327c880b7..68d73ffb631 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; @Deprecated @@ -85,7 +87,7 @@ public static RegionStart decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode( + return Objects.hash( mode, shuffleKey, partitionUniqueId, attemptId, currentRegionIndex, isBroadcast); } @@ -106,13 +108,13 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("mode", mode) - .add("shuffleKey", shuffleKey) - .add("partitionUniqueId", partitionUniqueId) - .add("attemptId", attemptId) - .add("currentRegionIndex", currentRegionIndex) - .add("isBroadcast", isBroadcast) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("mode", mode) + .append("shuffleKey", shuffleKey) + .append("partitionUniqueId", partitionUniqueId) + .append("attemptId", attemptId) + .append("currentRegionIndex", currentRegionIndex) + .append("isBroadcast", isBroadcast) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcFailure.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcFailure.java index f1041959544..86564e3d207 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcFailure.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcFailure.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; /** Response to {@link RpcRequest} for a failed RPC. */ @@ -54,7 +56,7 @@ public static RpcFailure decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(requestId, errorString); + return Objects.hash(requestId, errorString); } @Override @@ -68,9 +70,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("requestId", requestId) - .add("errorString", errorString) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("errorString", errorString) .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcRequest.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcRequest.java index 93066f2245c..dbe22ea2c25 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcRequest.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcRequest.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; import org.apache.celeborn.common.network.buffer.ManagedBuffer; @@ -74,7 +76,7 @@ public static RpcRequest decode(ByteBuf buf, boolean decodeBody) { @Override public int hashCode() { - return Objects.hashCode(requestId, body()); + return Objects.hash(requestId, body()); } @Override @@ -88,6 +90,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this).add("requestId", requestId).add("body", body()).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("body", body()) + .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcResponse.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcResponse.java index ea37627944f..9be6cbc0780 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcResponse.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RpcResponse.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; import org.apache.celeborn.common.network.buffer.ManagedBuffer; @@ -74,7 +76,7 @@ public static RpcResponse decode(ByteBuf buf, boolean decodeBody) { @Override public int hashCode() { - return Objects.hashCode(requestId, body()); + return Objects.hash(requestId, body()); } @Override @@ -88,6 +90,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this).add("requestId", requestId).add("body", body()).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("requestId", requestId) + .append("body", body()) + .toString(); } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java index 4771faf8726..0ec3ad6b971 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamChunkSlice.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; import org.apache.celeborn.common.protocol.PbStreamChunkSlice; @@ -68,7 +70,7 @@ public static StreamChunkSlice decode(ByteBuf buffer) { @Override public int hashCode() { - return Objects.hashCode(streamId, chunkIndex, offset, len); + return Objects.hash(streamId, chunkIndex, offset, len); } @Override @@ -85,11 +87,11 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamId", streamId) - .add("chunkIndex", chunkIndex) - .add("offset", offset) - .add("len", len) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamId", streamId) + .append("chunkIndex", chunkIndex) + .append("offset", offset) + .append("len", len) .toString(); } diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamHandle.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamHandle.java index c2428728ef7..e97f06e6c11 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamHandle.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/StreamHandle.java @@ -17,7 +17,9 @@ package org.apache.celeborn.common.network.protocol; -import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import java.util.Objects; import io.netty.buffer.ByteBuf; /** @@ -56,7 +58,7 @@ public static StreamHandle decode(ByteBuf buf) { @Override public int hashCode() { - return Objects.hashCode(streamId, numChunks); + return Objects.hash(streamId, numChunks); } @Override @@ -70,9 +72,9 @@ public boolean equals(Object other) { @Override public String toString() { - return Objects.toStringHelper(this) - .add("streamId", streamId) - .add("numChunks", numChunks) + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("streamId", streamId) + .append("numChunks", numChunks) .toString(); } } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala index 822ee032776..704b2b693bf 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala @@ -19,6 +19,7 @@ package org.apache.celeborn.common.util import java.util.concurrent._ import java.util.concurrent.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} +import java.util.concurrent.locks.ReentrantLock import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.{Duration, FiniteDuration} @@ -26,14 +27,88 @@ import scala.language.higherKinds import scala.util.control.NonFatal import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} - import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.internal.Logging object ThreadUtils { private val sameThreadExecutionContext = - ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor()) + ExecutionContext.fromExecutorService(sameThreadExecutorService()) + + // Inspired by Guava MoreExecutors.sameThreadExecutor; inlined and converted + // to Scala here to avoid Guava version issues + def sameThreadExecutorService(): ExecutorService = new AbstractExecutorService { + private val lock = new ReentrantLock() + private val termination = lock.newCondition() + private var runningTasks = 0 + private var serviceIsShutdown = false + + override def shutdown(): Unit = { + lock.lock() + try { + serviceIsShutdown = true + } finally { + lock.unlock() + } + } + + override def shutdownNow(): java.util.List[Runnable] = { + shutdown() + java.util.Collections.emptyList() + } + + override def isShutdown: Boolean = { + lock.lock() + try { + serviceIsShutdown + } finally { + lock.unlock() + } + } + + override def isTerminated: Boolean = synchronized { + lock.lock() + try { + serviceIsShutdown && runningTasks == 0 + } finally { + lock.unlock() + } + } + + override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = { + var nanos = unit.toNanos(timeout) + lock.lock() + try { + while (nanos > 0 && !isTerminated()) { + nanos = termination.awaitNanos(nanos) + } + isTerminated() + } finally { + lock.unlock() + } + } + + override def execute(command: Runnable): Unit = { + lock.lock() + try { + if (isShutdown()) throw new RejectedExecutionException("Executor already shutdown") + runningTasks += 1 + } finally { + lock.unlock() + } + try { + command.run() + } finally { + lock.lock() + try { + runningTasks -= 1 + if (isTerminated()) termination.signalAll() + } finally { + lock.unlock() + } + } + } + } /** * An `ExecutionContextExecutor` that runs each task in the thread that invokes `execute/submit`. diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index 168a6f3bd5f..5108268ecda 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -42,7 +42,7 @@ object Dependencies { val commonsLoggingVersion = "1.1.3" val commonsLang3Version = "3.12.0" val findbugsVersion = "1.3.9" - val guavaVersion = "14.0.1" + val guavaVersion = "27.0-jre" val hadoopVersion = "3.2.4" val javaxServletVersion = "3.1.0" val junitInterfaceVersion = "0.13.3"