diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 43e602d639d..52f7bb7a285 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -177,3 +177,34 @@ jobs:
name: flink-${{ matrix.flink }}-unit-test-log
path: |
**/target/unit-tests.log
+
+ mr:
+ runs-on: ubuntu-22.04
+ strategy:
+ fail-fast: false
+ matrix:
+ java:
+ - 8
+ - 11
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup JDK ${{ matrix.java }}
+ uses: actions/setup-java@v2
+ with:
+ distribution: zulu
+ java-version: ${{ matrix.java }}
+ cache: maven
+ check-latest: false
+ - name: Test with Maven
+ run: |
+ PROFILES="-Pgoogle-mirror,mr"
+ TEST_MODULES="client-mr/mr,client-mr/mr-shaded,tests/mr-it"
+ build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
+ build/mvn $PROFILES -pl $TEST_MODULES test
+ - name: Upload test log
+ if: failure()
+ uses: actions/upload-artifact@v3
+ with:
+ name: mr-unit-test-log
+ path: |
+ **/target/unit-tests.log
diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml
index 740e6bdf794..71a58d36f21 100644
--- a/.github/workflows/sbt.yml
+++ b/.github/workflows/sbt.yml
@@ -222,3 +222,30 @@ jobs:
name: flink-${{ matrix.flink }}-unit-test-log
path: |
**/target/test-reports/**
+
+ mr:
+ runs-on: ubuntu-22.04
+ strategy:
+ fail-fast: false
+ matrix:
+ java:
+ - 8
+ - 11
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup JDK ${{ matrix.java }}
+ uses: actions/setup-java@v2
+ with:
+ distribution: zulu
+ java-version: ${{ matrix.java }}
+ check-latest: false
+ - name: Test with SBT
+ run: |
+ build/sbt -Pmr "clean; celeborn-mr-group/test"
+ - name: Upload test log
+ if: failure()
+ uses: actions/upload-artifact@v3
+ with:
+ name: mr-unit-test-log
+ path: |
+ **/target/test-reports/**
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 1927ebffea6..34dfaa680a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1312,6 +1312,7 @@
client-mr/mr
client-mr/mr-shaded
+ tests/mr-it
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 3d7b748cff0..cf6b43e19e3 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -34,7 +34,7 @@ object Dependencies {
val zstdJniVersion = sparkClientProjects.map(_.zstdJniVersion).getOrElse("1.5.2-1")
val lz4JavaVersion = sparkClientProjects.map(_.lz4JavaVersion).getOrElse("1.8.0")
-
+
// Dependent library versions
val commonsCompressVersion = "1.4.1"
val commonsCryptoVersion = "1.0.0"
@@ -933,6 +933,24 @@ object MRClientProjects {
)
}
+ def mrIt: Project = {
+ Project("celeborn-mr-it", file("tests/mr-it"))
+ // ref: https://www.scala-sbt.org/1.x/docs/Multi-Project.html#Classpath+dependencies
+ .dependsOn(CelebornCommon.common % "test->test;compile->compile")
+ .dependsOn(CelebornClient.client % "test->test;compile->compile")
+ .dependsOn(CelebornMaster.master % "test->test;compile->compile")
+ .dependsOn(CelebornWorker.worker % "test->test;compile->compile")
+ .dependsOn(mrClient % "test->test;compile->compile")
+ .settings(
+ commonSettings,
+ copyDepsSettings,
+ libraryDependencies ++= Seq(
+ "org.apache.hadoop" % "hadoop-client-minicluster" % Dependencies.hadoopVersion % "test",
+ "org.apache.hadoop" % "hadoop-mapreduce-examples" % Dependencies.hadoopVersion % "test"
+ ) ++ commonUnitTestDependencies
+ )
+ }
+
def mrClientShade: Project = {
Project("celeborn-client-mr-shaded", file("client-mr/mr-shaded"))
.dependsOn(mrClient)
@@ -996,6 +1014,37 @@ object MRClientProjects {
}
def modules: Seq[Project] = {
- Seq(mrClient, mrClientShade)
+ Seq(mrClient, mrIt, mrGroup, mrClientShade)
+ }
+
+ // for test only, don't use this group for any other projects
+ lazy val mrGroup = (project withId "celeborn-mr-group").aggregate(mrClient, mrIt)
+
+ val copyDeps = TaskKey[Unit]("copyDeps", "Copies needed dependencies to the build directory.")
+ val destPath = (Compile / crossTarget) {
+ _ / "mapreduce_lib"
}
+
+ lazy val copyDepsSettings = Seq(
+ copyDeps := {
+ val dest = destPath.value
+ if (!dest.isDirectory() && !dest.mkdirs()) {
+ throw new java.io.IOException("Failed to create jars directory.")
+ }
+
+ (Compile / dependencyClasspath).value.map(_.data)
+ .filter { jar => jar.isFile() }
+ .foreach { jar =>
+ val destJar = new File(dest, jar.getName())
+ if (destJar.isFile()) {
+ destJar.delete()
+ }
+ Files.copy(jar.toPath(), destJar.toPath())
+ }
+ },
+ (Test / compile) := {
+ copyDeps.value
+ (Test / compile).value
+ }
+ )
}
diff --git a/tests/mr-it/pom.xml b/tests/mr-it/pom.xml
new file mode 100644
index 00000000000..7c3f1c1e43c
--- /dev/null
+++ b/tests/mr-it/pom.xml
@@ -0,0 +1,177 @@
+
+
+
+ 4.0.0
+
+ org.apache.celeborn
+ celeborn-parent_${scala.binary.version}
+ ${project.version}
+ ../../pom.xml
+
+
+ celeborn-mr-it_${scala.binary.version}
+ jar
+ Celeborn MapReduce Integration Test
+
+
+
+ org.apache.celeborn
+ celeborn-common_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.celeborn
+ celeborn-client_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.celeborn
+ celeborn-master_${scala.binary.version}
+ ${project.version}
+ test
+
+
+ org.apache.celeborn
+ celeborn-worker_${scala.binary.version}
+ ${project.version}
+ test
+
+
+ org.apache.celeborn
+ celeborn-worker_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.celeborn
+ celeborn-client-mr-shaded_${scala.binary.version}
+ ${project.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-common
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-web-proxy
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-nodemanager
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+
+
+ org.apache.hadoop
+ hadoop-client-minicluster
+ ${hadoop.version}
+ test
+
+
+ org.xerial.snappy
+ snappy-java
+
+
+ junit
+ junit
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-examples
+ ${hadoop.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-client-api
+ ${hadoop.version}
+ test
+
+
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+ ${hadoop.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-web-proxy
+ ${hadoop.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+ ${hadoop.version}
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-dependencies
+
+ copy-dependencies
+
+ package
+
+ ${project.build.directory}/mapreduce_lib
+
+
+
+
+
+
+
diff --git a/tests/mr-it/src/test/resources/container-log4j.properties b/tests/mr-it/src/test/resources/container-log4j.properties
new file mode 100644
index 00000000000..c37e1d32f01
--- /dev/null
+++ b/tests/mr-it/src/test/resources/container-log4j.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# STDOUT Appender
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n
+
+log4j.rootLogger=INFO, stdout
\ No newline at end of file
diff --git a/tests/mr-it/src/test/resources/log4j2-test.xml b/tests/mr-it/src/test/resources/log4j2-test.xml
new file mode 100644
index 00000000000..9adcdccfd0e
--- /dev/null
+++ b/tests/mr-it/src/test/resources/log4j2-test.xml
@@ -0,0 +1,41 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala b/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala
new file mode 100644
index 00000000000..0f1bb91aaf2
--- /dev/null
+++ b/tests/mr-it/src/test/scala/org/apache/celeborn/tests/mr/WordCountTest.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.mr
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.examples.WordCount
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{IntWritable, Text}
+import org.apache.hadoop.mapred.ShuffleHandler
+import org.apache.hadoop.mapreduce.{Job, MRJobConfig}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.service.Service
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+import org.apache.celeborn.service.deploy.worker.Worker
+
+class WordCountTest extends AnyFunSuite with Logging with MiniClusterFeature
+ with BeforeAndAfterAll {
+ var workers: collection.Set[Worker] = null
+
+ var yarnCluster: MiniYARNCluster = null
+ var hadoopConf: Configuration = null
+
+ override def beforeAll(): Unit = {
+ logInfo("test initialized , setup celeborn mini cluster")
+ val masterConf = Map(
+ "celeborn.master.host" -> "localhost",
+ "celeborn.master.port" -> "9097")
+ val workerConf = Map("celeborn.master.endpoints" -> "localhost:9097")
+ workers = setUpMiniCluster(masterConf, workerConf)._2
+
+ hadoopConf = new Configuration()
+ hadoopConf.set("yarn.scheduler.capacity.root.queues", "default,other_queue")
+
+ hadoopConf.setInt("yarn.scheduler.capacity.root.default.capacity", 100)
+ hadoopConf.setInt("yarn.scheduler.capacity.root.default.maximum-capacity", 100)
+ hadoopConf.setInt("yarn.scheduler.capacity.root.other_queue.maximum-capacity", 100)
+
+ hadoopConf.setStrings(
+ YarnConfiguration.NM_AUX_SERVICES,
+ ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)
+ hadoopConf.setClass(
+ String.format(
+ YarnConfiguration.NM_AUX_SERVICE_FMT,
+ ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID),
+ classOf[ShuffleHandler],
+ classOf[Service])
+
+ yarnCluster = new MiniYARNCluster("MiniClusterWordCount", 1, 1, 1)
+ yarnCluster.init(hadoopConf)
+ yarnCluster.start()
+ }
+
+ override def afterAll(): Unit = {
+ logInfo("all test complete , stop celeborn mini cluster")
+ shutdownMiniCluster()
+ if (yarnCluster != null) {
+ yarnCluster.stop()
+ }
+ }
+
+ test("celeborn mr integration test - word count") {
+ val input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input")
+ Files.write(
+ Paths.get(input.getPath, "v1.txt"),
+ "hello world celeborn".getBytes(StandardCharsets.UTF_8))
+ Files.write(
+ Paths.get(input.getPath, "v2.txt"),
+ "hello world mapreduce".getBytes(StandardCharsets.UTF_8))
+
+ val output = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "output")
+ val mrOutputPath = new Path(output.getPath + File.separator + "mr_output")
+
+ val conf = new Configuration(yarnCluster.getConfig)
+ // YARN config
+ conf.set("yarn.app.mapreduce.am.job.recovery.enable", "false")
+ conf.set(
+ "yarn.app.mapreduce.am.command-opts",
+ "org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn")
+
+ // MapReduce config
+ conf.set("mapreduce.framework.name", "yarn")
+ conf.set("mapreduce.job.user.classpath.first", "true")
+
+ conf.set("mapreduce.job.reduce.slowstart.completedmaps", "1")
+ conf.set("mapreduce.celeborn.master.endpoints", "localhost:9097")
+ conf.set(
+ MRJobConfig.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+ "org.apache.hadoop.mapred.CelebornMapOutputCollector")
+ conf.set(
+ "mapreduce.job.reduce.shuffle.consumer.plugin.class",
+ "org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer")
+
+ val job = Job.getInstance(conf, "word count")
+ job.setJarByClass(classOf[WordCount])
+ job.setMapperClass(classOf[WordCount.TokenizerMapper])
+ job.setCombinerClass(classOf[WordCount.IntSumReducer])
+ job.setReducerClass(classOf[WordCount.IntSumReducer])
+ job.setOutputKeyClass(classOf[Text])
+ job.setOutputValueClass(classOf[IntWritable])
+ FileInputFormat.addInputPath(job, new Path(input.getPath))
+ FileOutputFormat.setOutputPath(job, mrOutputPath)
+
+ val mapreduceLibPath = (Utils.getCodeSourceLocation(getClass).split("/").dropRight(1) ++ Array(
+ "mapreduce_lib")).mkString("/")
+ val excludeJarList =
+ Seq("hadoop-client-api", "hadoop-client-runtime", "hadoop-client-minicluster")
+ Files.list(Paths.get(mapreduceLibPath)).iterator().asScala.foreach(path => {
+ if (!excludeJarList.exists(path.toFile.getPath.contains(_))) {
+ job.addFileToClassPath(new Path(path.toString))
+ }
+ })
+
+ val exitCode = job.waitForCompletion(true)
+ assert(exitCode, "Returned error code.")
+
+ val outputFilePath = Paths.get(mrOutputPath.toString, "part-r-00000")
+ assert(outputFilePath.toFile.exists())
+ assert(Files.readAllLines(outputFilePath).contains("celeborn\t1"))
+ }
+}