Skip to content

Commit

Permalink
Merge branch 'main' into CELEBORN-1122
Browse files Browse the repository at this point in the history
  • Loading branch information
RuiQin7 authored Dec 1, 2023
2 parents 381b8ba + 1c7cd1b commit 8e3e32a
Show file tree
Hide file tree
Showing 181 changed files with 6,191 additions and 853 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,34 @@ jobs:
name: flink-${{ matrix.flink }}-unit-test-log
path: |
**/target/unit-tests.log
mr:
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
java:
- 8
- 11
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: ${{ matrix.java }}
cache: maven
check-latest: false
- name: Test with Maven
run: |
PROFILES="-Pgoogle-mirror,mr"
TEST_MODULES="client-mr/mr,client-mr/mr-shaded,tests/mr-it"
build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
build/mvn $PROFILES -pl $TEST_MODULES test
- name: Upload test log
if: failure()
uses: actions/upload-artifact@v3
with:
name: mr-unit-test-log
path: |
**/target/unit-tests.log
27 changes: 27 additions & 0 deletions .github/workflows/sbt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,30 @@ jobs:
name: flink-${{ matrix.flink }}-unit-test-log
path: |
**/target/test-reports/**
mr:
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
java:
- 8
- 11
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: ${{ matrix.java }}
check-latest: false
- name: Test with SBT
run: |
build/sbt -Pmr "clean; celeborn-mr-group/test"
- name: Upload test log
if: failure()
uses: actions/upload-artifact@v3
with:
name: mr-unit-test-log
path: |
**/target/test-reports/**
2 changes: 2 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ javax.servlet:javax.servlet-api

MIT License
------------
See license/LISENCE-jdktools.txt for details.
com.github.olivergondza:maven-jdk-tools-wrapper
See license/LICENSE-slf4j.txt for details.
org.slf4j:jul-to-slf4j
org.slf4j:slf4j-api
Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ Copy $CELEBORN_HOME/flink/*.jar to $FLINK_HOME/lib/
To use Celeborn, the following flink configurations should be added.
```properties
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097

celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
Expand All @@ -306,6 +307,7 @@ taskmanager.network.memory.floating-buffers-per-gate: 4096
taskmanager.network.memory.buffers-per-channel: 0
taskmanager.memory.task.off-heap.size: 512m
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.

### Deploy mapreduce client
Add $CELEBORN_HOME/mr/*.jar to to `mapreduce.application.classpath` and `yarn.application.classpath`.
Expand Down Expand Up @@ -333,7 +335,10 @@ for more details.

For Spark versions < 3.5.0, we provide a patch to enable users to use Spark with DRA and Celeborn.
- For Spark 2.x check [Spark2 Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch).
- For Spark 3.0-3.3 check [Spark3 Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3.patch).
- For Spark 3.0 check [Spark3.0 Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch).
- For Spark 3.1 check [Spark3.1 Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch).
- For Spark 3.2 check [Spark3.2 Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_2.patch).
- For Spark 3.3 check [Spark3.3 Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_3.patch).
- For Spark 3.4 check [Spark3.4 Patch](assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch).

### Metrics
Expand Down
4 changes: 2 additions & 2 deletions assets/grafana/celeborn-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1184,12 +1184,12 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_SlotsAllocated_Count",
"expr": "increase(metrics_SlotsAllocated_Count[1h])",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
"title": "metrics_SlotsAllocated_Count",
"title": "metrics_SlotsAllocated_increase_1h",
"type": "timeseries"
},
{
Expand Down
87 changes: 87 additions & 0 deletions assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_0.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Subject: [PATCH] [CORE][SHUFFLE] Support enabling DRA with Apache Celeborn
---
Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala (revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala (revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
@@ -198,7 +198,7 @@
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
- } else if (!testing) {
+ } else if (!testing && !Utils.isCelebornEnabled(conf)) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
@@ -1851,7 +1851,8 @@
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
- val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
+ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
+ (workerLost || !env.blockManager.externalShuffleServiceEnabled)
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
@@ -944,7 +944,7 @@
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled
- && !isZombie) {
+ && !isZombie && !Utils.isCelebornEnabled(conf)) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
// We may have a running task whose partition has been marked as successful,
Index: core/src/main/scala/org/apache/spark/util/Utils.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision 65ac1e75dc468f53fc778cd2ce1ba3f21067aab8)
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (revision 1f5cb2ec9e9652d03c9775954b69a708b5d05ab3)
@@ -2934,6 +2934,9 @@
props.forEach((k, v) => resultProps.put(k, v))
resultProps
}
+
+ def isCelebornEnabled(conf: SparkConf): Boolean =
+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
}

private[util] object CallerContext extends Logging {
91 changes: 91 additions & 0 deletions assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_1.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Subject: [PATCH] [CORE][SHUFFLE] Support enabling DRA with Apache Celeborn
---
Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala (revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala (revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
@@ -210,7 +210,7 @@
(decommissionEnabled &&
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
- } else if (!testing) {
+ } else if (!testing && !Utils.isCelebornEnabled(conf)) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
@@ -2080,7 +2080,8 @@
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
- val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
+ val fileLost = !Utils.isCelebornEnabled(sc.getConf) &&
+ (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled)
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
@@ -973,7 +973,8 @@
// and we are not using an external shuffle server which could serve the shuffle outputs.
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
- if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
+ if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie &&
+ !Utils.isCelebornEnabled(conf)) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
// We may have a running task whose partition has been marked as successful,
Index: core/src/main/scala/org/apache/spark/util/Utils.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision d1f8a503a26bcfb4e466d9accc5fa241a7933667)
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (revision c8609fa08534e1cc6fd377b3573e5e65ddc79f15)
@@ -3057,7 +3057,12 @@
0
}
}
+
+ def isCelebornEnabled(conf: SparkConf): Boolean =
+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
}
+
+

private[util] object CallerContext extends Logging {
val callerContextSupported: Boolean = {
Loading

0 comments on commit 8e3e32a

Please sign in to comment.