From e0619821c328312ec7177576e82a4f2f229715e6 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Tue, 20 Aug 2024 17:45:01 +0800 Subject: [PATCH 01/14] support MPU for S3 --- build/make-distribution.sh | 4 +- common/pom.xml | 15 +-- .../apache/celeborn/common/CelebornConf.scala | 6 +- .../common/util/CelebornHadoopUtils.scala | 6 +- docs/configuration/client.md | 2 +- docs/configuration/master.md | 2 +- docs/configuration/worker.md | 2 +- multipart-uploader/pom.xml | 57 ++++++++++ .../org/apache/celeborn/AWSCredentials.java | 70 ++++++++++++ .../apache/celeborn/S3MultipartUploader.java | 100 ++++++++++++++++++ pom.xml | 20 ++-- project/CelebornBuild.scala | 25 ++++- worker/pom.xml | 18 ++++ .../worker/storage/PartitionDataWriter.java | 47 +++++++- .../worker/MultipartUploadRequestParam.scala | 32 ++++++ .../deploy/worker/storage/FlushTask.scala | 34 +++--- .../deploy/worker/storage/Flusher.scala | 9 +- 17 files changed, 386 insertions(+), 63 deletions(-) create mode 100644 multipart-uploader/pom.xml create mode 100644 multipart-uploader/src/main/java/org/apache/celeborn/AWSCredentials.java create mode 100644 multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploader.java create mode 100644 worker/src/main/scala/org/apache/celeborn/service/deploy/worker/MultipartUploadRequestParam.scala diff --git a/build/make-distribution.sh b/build/make-distribution.sh index 55b419d1598..e95149276b2 100755 --- a/build/make-distribution.sh +++ b/build/make-distribution.sh @@ -264,8 +264,8 @@ function sbt_build_service { echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE" echo "Build flags: $@" >> "$DIST_DIR/RELEASE" - if [[ $@ == *"hadoop-aws"* ]]; then - SBT_MAVEN_PROFILES="hadoop-aws" + if [[ $@ == *"aws-mpu"* ]]; then + SBT_MAVEN_PROFILES="aws-mpu" fi BUILD_COMMAND=("$SBT" clean package) diff --git a/common/pom.xml b/common/pom.xml index 3c34be923a6..dcdf153d865 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -210,22 +210,17 @@ - hadoop-aws + aws-mpu - hadoop-aws-deps + aws-mpu-deps - org.apache.hadoop - hadoop-aws - ${hadoop.version} - - - com.amazonaws - aws-java-sdk-bundle - ${aws.version} + org.apache.celeborn + celeborn-multipart-uploader_2.12 + ${project.version} diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index bf1af9b8a66..a75c364fef6 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1136,7 +1136,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("") - def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("") + def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("") def s3Dir: String = { get(S3_DIR).map { @@ -3019,8 +3019,8 @@ object CelebornConf extends Logging { .stringConf .createOptional - val S3_ENDPOINT: OptionalConfigEntry[String] = - buildConf("celeborn.storage.s3.endpoint") + val S3_ENDPOINT_REGION: OptionalConfigEntry[String] = + buildConf("celeborn.storage.s3.endpoint.region") .categories("worker", "master", "client") .version("0.6.0") .doc("S3 endpoint for Celeborn to store shuffle data.") diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 356711d755c..b703da07cbc 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -49,9 +49,9 @@ object CelebornHadoopUtils extends Logging { } if (conf.s3Dir.nonEmpty) { - if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3Endpoint.isEmpty) { + if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3EndpointRegion.isEmpty) { throw new CelebornException( - "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint is not set") + "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3EndpointRegion is not set") } hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") hadoopConf.set( @@ -59,7 +59,7 @@ object CelebornHadoopUtils extends Logging { "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey) hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey) - hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint) + hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion) } appendSparkHadoopConfigs(conf, hadoopConf) hadoopConf diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 91d1faac52e..99426d4d52f 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -128,6 +128,6 @@ license: | | celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index c9f68f21ed6..c711e010823 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -88,6 +88,6 @@ license: | | celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 5330c42daa0..c91070dbe46 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -48,7 +48,7 @@ license: | | celeborn.storage.hdfs.kerberos.principal | <undefined> | false | Kerberos principal for HDFS storage connection. | 0.3.2 | | | celeborn.storage.s3.access.key | <undefined> | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.dir | <undefined> | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | | -| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | +| celeborn.storage.s3.endpoint.region | <undefined> | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | | | celeborn.worker.activeConnection.max | <undefined> | false | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | | | celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size of the application registry on Workers. | 0.5.0 | | diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml new file mode 100644 index 00000000000..ed4458dbc42 --- /dev/null +++ b/multipart-uploader/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + org.apache.celeborn + celeborn-parent_${scala.binary.version} + ${project.version} + ../pom.xml + + + celeborn-multipart-uploader_${scala.binary.version} + jar + Celeborn Multipart Uploader + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-1.2-api + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + + + + aws-mpu + + + aws-mpu-deps + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + ${aws.version} + + + + + diff --git a/multipart-uploader/src/main/java/org/apache/celeborn/AWSCredentials.java b/multipart-uploader/src/main/java/org/apache/celeborn/AWSCredentials.java new file mode 100644 index 00000000000..79eb5d0711b --- /dev/null +++ b/multipart-uploader/src/main/java/org/apache/celeborn/AWSCredentials.java @@ -0,0 +1,70 @@ +package org.apache.celeborn; + +public class AWSCredentials { + + private final String bucketName; + + private final String s3AccessKey; + + private final String s3SecretKey; + + private final String s3EndpointRegion; + + private AWSCredentials(Builder builder) { + this.bucketName = builder.bucketName; + this.s3AccessKey = builder.s3AccessKey; + this.s3SecretKey = builder.s3SecretKey; + this.s3EndpointRegion = builder.s3EndpointRegion; + } + + public String getBucketName() { + return bucketName; + } + + public String getS3AccessKey() { + return s3AccessKey; + } + + public String getS3SecretKey() { + return s3SecretKey; + } + + public String getS3EndpointRegion() { + return s3EndpointRegion; + } + + public static class Builder { + + private String bucketName; + + private String s3AccessKey; + + private String s3SecretKey; + + private String s3EndpointRegion; + + public AWSCredentials.Builder withBucketName(String bucketName) { + this.bucketName = bucketName; + return this; + } + + public AWSCredentials.Builder withS3AccessKey(String s3AccessKey) { + this.s3AccessKey = s3AccessKey; + return this; + } + + public AWSCredentials.Builder withS3SecretKey(String s3SecretKey) { + this.s3SecretKey = s3SecretKey; + return this; + } + + public AWSCredentials.Builder withS3EndpointRegion(String s3EndpointRegion) { + this.s3EndpointRegion = s3EndpointRegion; + return this; + } + + public AWSCredentials build() { + return new AWSCredentials(this); + } + } +} diff --git a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploader.java b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploader.java new file mode 100644 index 00000000000..8771400a110 --- /dev/null +++ b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploader.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3MultipartUploader { + + private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploader.class); + + private final AWSCredentials awsCredentials; + private List partETags; + private String uploadId; + private AmazonS3 s3Client; + private String key; + + public S3MultipartUploader(AWSCredentials awsCredentials, String key) { + this.awsCredentials = awsCredentials; + BasicAWSCredentials basicAWSCredentials = + new BasicAWSCredentials(awsCredentials.getS3AccessKey(), awsCredentials.getS3AccessKey()); + this.s3Client = + AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) + .withRegion(awsCredentials.getS3EndpointRegion()) + .build(); + this.partETags = new ArrayList(); + this.key = key; + } + + public String startUpload() { + InitiateMultipartUploadRequest initRequest = + new InitiateMultipartUploadRequest(awsCredentials.getBucketName(), key); + InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); + uploadId = initResponse.getUploadId(); + return uploadId; + } + + public void putPart(Path filePath, InputStream inputStream, long lengthInBytes, int partNumber) { + try { + UploadPartRequest uploadRequest = + new UploadPartRequest() + .withBucketName(awsCredentials.getBucketName()) + .withKey(filePath.toString()) + .withUploadId(uploadId) + .withPartNumber(partNumber) + .withInputStream(inputStream) + .withPartSize(lengthInBytes); + + UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); + partETags.add(uploadResult.getPartETag()); + } catch (RuntimeException e) { + logger.error("Failed to upload part", e); + } + } + + public void complete() { + CompleteMultipartUploadRequest compRequest = + new CompleteMultipartUploadRequest( + awsCredentials.getBucketName(), key, uploadId, partETags); + s3Client.completeMultipartUpload(compRequest); + } + + public void abort() { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(awsCredentials.getBucketName(), key, uploadId); + s3Client.abortMultipartUpload(abortMultipartUploadRequest); + } +} diff --git a/pom.xml b/pom.xml index c2627a9a610..1de6563e203 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ service master worker + web cli @@ -1340,23 +1341,14 @@ - hadoop-aws + aws-mpu + + multipart-uploader + - true + true 1.12.367 - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - - - com.amazonaws - aws-java-sdk-bundle - ${aws.version} - - spark-2.4 diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index ec3050fbf29..f0558b912b6 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -494,14 +494,33 @@ object CelebornSpi { ) } -object CelebornCommon { +object CeleborMPU { - lazy val hadoopAwsDependencies = if(profiles.exists(_.startsWith("hadoop-aws"))){ + lazy val hadoopAwsDependencies = if(profiles.exists(_.startsWith("aws-mpu"))){ Seq(Dependencies.hadoopAws, Dependencies.awsClient) } else { Seq.empty } + lazy val celeborMPU = Project("celeborn-multipart-uploader", file("multipart-uploader")) + .settings ( + commonSettings, + libraryDependencies ++= Seq( + Dependencies.log4j12Api, + Dependencies.log4jSlf4jImpl, + ) ++ hadoopAwsDependencies + ) +} + +object CelebornCommon { + + + lazy val celeborMPU = if(profiles.exists(_.startsWith("aws-mpu"))){ + CeleborMPU.celeborMPU + } else { + Seq.empty + } + lazy val common = Project("celeborn-common", file("common")) .dependsOn(CelebornSpi.spi) .settings ( @@ -538,7 +557,7 @@ object CelebornCommon { // SSL support Dependencies.bouncycastleBcprovJdk18on, Dependencies.bouncycastleBcpkixJdk18on - ) ++ commonUnitTestDependencies ++ hadoopAwsDependencies, + ) ++ commonUnitTestDependencies ++ celeborMPU, Compile / sourceGenerators += Def.task { val file = (Compile / sourceManaged).value / "org" / "apache" / "celeborn" / "package.scala" diff --git a/worker/pom.xml b/worker/pom.xml index e770bc8cd1c..51b75ce4aae 100644 --- a/worker/pom.xml +++ b/worker/pom.xml @@ -141,4 +141,22 @@ + + + + aws-mpu + + + aws-mpu-deps + + + + + org.apache.celeborn + celeborn-multipart-uploader_2.12 + ${project.version} + + + + diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 194796c08cd..38aa9c3046c 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; @@ -32,11 +33,14 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.celeborn.AWSCredentials; +import org.apache.celeborn.S3MultipartUploader; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.exception.AlreadyClosedException; import org.apache.celeborn.common.exception.CelebornIOException; @@ -113,6 +117,10 @@ public abstract class PartitionDataWriter implements DeviceObserver { private UserCongestionControlContext userCongestionControlContext = null; + protected S3MultipartUploader s3MultipartUploader; + + protected int partNumber = 1; + public PartitionDataWriter( StorageManager storageManager, AbstractSource workerSource, @@ -187,6 +195,25 @@ public void initFileChannelsForDiskFile() throws IOException { // If we reuse DFS output stream, we will exhaust the memory soon. try { hadoopFs.create(this.diskFileInfo.getDfsPath(), true).close(); + if (diskFileInfo.isS3()) { + Configuration configuration = hadoopFs.getConf(); + String s3AccessKey = configuration.get("fs.s3a.access.key"); + String s3SecretKey = configuration.get("fs.s3a.secret.key"); + String s3EndpointRegion = configuration.get("fs.s3a.endpoint.region"); + URI uri = hadoopFs.getUri(); + String bucketName = uri.getHost(); + int slashIndex = diskFileInfo.getFilePath().indexOf("/"); + String key = diskFileInfo.getFilePath().substring(slashIndex + 1); + AWSCredentials awsCredentials = + new AWSCredentials.Builder() + .withS3AccessKey(s3AccessKey) + .withS3SecretKey(s3SecretKey) + .withS3EndpointRegion(s3EndpointRegion) + .withBucketName(bucketName) + .build(); + this.s3MultipartUploader = new S3MultipartUploader(awsCredentials, key); + this.s3MultipartUploader.startUpload(); + } } catch (IOException e) { try { // If create file failed, wait 10 ms and retry @@ -236,7 +263,14 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException { } else if (diskFileInfo.isHdfs()) { task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, false); } else if (diskFileInfo.isS3()) { - task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, false); + task = + new S3FlushTask( + flushBuffer, + diskFileInfo.getDfsPath(), + notifier, + false, + s3MultipartUploader, + partNumber); } MemoryManager.instance().releaseMemoryFileStorage(numBytes); MemoryManager.instance().incrementDiskBuffer(numBytes); @@ -264,7 +298,14 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException { } else if (diskFileInfo.isHdfs()) { task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, true); } else if (diskFileInfo.isS3()) { - task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, true); + task = + new S3FlushTask( + flushBuffer, + diskFileInfo.getDfsPath(), + notifier, + true, + s3MultipartUploader, + partNumber); } } } @@ -273,6 +314,7 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException { if (task != null) { addTask(task); flushBuffer = null; + partNumber++; if (!fromEvict) { diskFileInfo.updateBytesFlushed(numBytes); } @@ -507,6 +549,7 @@ public void evict(boolean checkClose) throws IOException { storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), diskFileInfo); } } + s3MultipartUploader.complete(); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/MultipartUploadRequestParam.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/MultipartUploadRequestParam.scala new file mode 100644 index 00000000000..6e649621c13 --- /dev/null +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/MultipartUploadRequestParam.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker + +import java.lang.{Long => JLong} +import java.util.{List => JList} + +case class MultipartUploadRequestParam( + bucketName: String, + s3Client: AnyRef, + uploadId: JLong, + var partNumber: Int, + partETags: JList[String]) { + def incrementPartNumber(): Unit = { + partNumber += 1 + } +} diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala index 0c4fa410545..7295b17ff5e 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala @@ -17,12 +17,13 @@ package org.apache.celeborn.service.deploy.worker.storage +import java.io.ByteArrayInputStream import java.nio.channels.FileChannel import io.netty.buffer.{ByteBufUtil, CompositeByteBuf} import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.IOUtils +import org.apache.celeborn.S3MultipartUploader import org.apache.celeborn.common.protocol.StorageInfo.Type abstract private[worker] class FlushTask( @@ -60,33 +61,22 @@ private[worker] class HdfsFlushTask( hdfsStream.write(ByteBufUtil.getBytes(buffer)) hdfsStream.close() } + } private[worker] class S3FlushTask( buffer: CompositeByteBuf, val path: Path, notifier: FlushNotifier, - keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) { + keepBuffer: Boolean, + s3MultipartUploader: S3MultipartUploader, + partNumber: Int) + extends FlushTask(buffer, notifier, keepBuffer) { + override def flush(): Unit = { - val hadoopFs = StorageManager.hadoopFs.get(Type.S3) - if (hadoopFs.exists(path)) { - val conf = hadoopFs.getConf - val tempPath = new Path(path.getParent, path.getName + ".tmp") - val outputStream = hadoopFs.create(tempPath, true, 256 * 1024) - val inputStream = hadoopFs.open(path) - try { - IOUtils.copyBytes(inputStream, outputStream, conf, false) - } finally { - inputStream.close() - } - outputStream.write(ByteBufUtil.getBytes(buffer)) - outputStream.close() - hadoopFs.delete(path, false) - hadoopFs.rename(tempPath, path) - } else { - val s3Stream = hadoopFs.create(path, true, 256 * 1024) - s3Stream.write(ByteBufUtil.getBytes(buffer)) - s3Stream.close() - } + val bytes = ByteBufUtil.getBytes(buffer) + val currentPartSize = bytes.length.toLong + val inputStream = new ByteArrayInputStream(bytes) + s3MultipartUploader.putPart(path, inputStream, currentPartSize, partNumber) } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index 977b3b72f2d..85b71f797a0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -18,23 +18,30 @@ package org.apache.celeborn.service.deploy.worker.storage import java.io.IOException +import java.nio.channels.ClosedByInterruptException +import java.util import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray} +import scala.collection.JavaConverters._ import scala.util.Random -import io.netty.buffer.{CompositeByteBuf, PooledByteBufAllocator} +import io.netty.buffer.{CompositeByteBuf, PooledByteBufAllocator, Unpooled} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow} import org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSource} import org.apache.celeborn.common.protocol.StorageInfo +import org.apache.celeborn.common.protocol.StorageInfo.Type import org.apache.celeborn.common.util.{ThreadUtils, Utils} +import org.apache.celeborn.reflect.{DynClasses, DynMethods} import org.apache.celeborn.service.deploy.worker.WorkerSource import org.apache.celeborn.service.deploy.worker.WorkerSource.FLUSH_WORKING_QUEUE_SIZE import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController import org.apache.celeborn.service.deploy.worker.memory.MemoryManager +import java.util + abstract private[worker] class Flusher( val workerSource: AbstractSource, val threadCount: Int, From ebb2ffab038199822218558b1bbf2469f5148bf7 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Tue, 20 Aug 2024 17:45:01 +0800 Subject: [PATCH 02/14] fix conflicts --- build/make-distribution.sh | 2 +- common/pom.xml | 15 ---- multipart-uploader/pom.xml | 9 +++ ...der.java => S3MultipartUploadHandler.java} | 58 +++++++++++---- project/CelebornBuild.scala | 29 ++++---- .../service/mpu/MultipartUploadHandler.java | 14 ++++ .../service/mpu/bean/AWSCredentials.java | 70 +++++++++++++++++++ worker/pom.xml | 2 +- .../worker/storage/PartitionDataWriter.java | 54 +++++++------- .../deploy/worker/storage/FlushTask.scala | 10 ++- .../deploy/worker/storage/Flusher.scala | 9 +-- 11 files changed, 184 insertions(+), 88 deletions(-) rename multipart-uploader/src/main/java/org/apache/celeborn/{S3MultipartUploader.java => S3MultipartUploadHandler.java} (62%) create mode 100644 service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java create mode 100644 service/src/main/java/org/apache/celeborn/server/common/service/mpu/bean/AWSCredentials.java diff --git a/build/make-distribution.sh b/build/make-distribution.sh index e95149276b2..f4c3a4e23d4 100755 --- a/build/make-distribution.sh +++ b/build/make-distribution.sh @@ -265,7 +265,7 @@ function sbt_build_service { echo "Build flags: $@" >> "$DIST_DIR/RELEASE" if [[ $@ == *"aws-mpu"* ]]; then - SBT_MAVEN_PROFILES="aws-mpu" + export SBT_MAVEN_PROFILES="aws-mpu" fi BUILD_COMMAND=("$SBT" clean package) diff --git a/common/pom.xml b/common/pom.xml index dcdf153d865..0f4da3e7a74 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -209,20 +209,5 @@ - - aws-mpu - - - aws-mpu-deps - - - - - org.apache.celeborn - celeborn-multipart-uploader_2.12 - ${project.version} - - - diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml index ed4458dbc42..4c9a2c54dac 100644 --- a/multipart-uploader/pom.xml +++ b/multipart-uploader/pom.xml @@ -21,6 +21,15 @@ org.apache.logging.log4j log4j-1.2-api + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.celeborn + celeborn-service_${scala.binary.version} + ${project.version} + diff --git a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploader.java b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java similarity index 62% rename from multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploader.java rename to multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java index 8771400a110..96170d2cdc3 100644 --- a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploader.java +++ b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java @@ -17,8 +17,10 @@ package org.apache.celeborn; +import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -29,69 +31,95 @@ import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListPartsRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PartListing; +import com.amazonaws.services.s3.model.PartSummary; +import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; -import org.apache.hadoop.fs.Path; + +import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler; +import org.apache.celeborn.server.common.service.mpu.bean.AWSCredentials; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3MultipartUploader { - private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploader.class); +public class S3MultipartUploadHandler implements MultipartUploadHandler { + + private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploadHandler.class); private final AWSCredentials awsCredentials; - private List partETags; private String uploadId; private AmazonS3 s3Client; private String key; - public S3MultipartUploader(AWSCredentials awsCredentials, String key) { + public S3MultipartUploadHandler(AWSCredentials awsCredentials, String key) { this.awsCredentials = awsCredentials; BasicAWSCredentials basicAWSCredentials = - new BasicAWSCredentials(awsCredentials.getS3AccessKey(), awsCredentials.getS3AccessKey()); + new BasicAWSCredentials(awsCredentials.getS3AccessKey(), awsCredentials.getS3SecretKey()); this.s3Client = AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) .withRegion(awsCredentials.getS3EndpointRegion()) .build(); - this.partETags = new ArrayList(); this.key = key; } - public String startUpload() { + @Override + public void startUpload() { InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(awsCredentials.getBucketName(), key); InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); uploadId = initResponse.getUploadId(); - return uploadId; } - public void putPart(Path filePath, InputStream inputStream, long lengthInBytes, int partNumber) { + @Override + public void putPart(InputStream inputStream, Long lengthInBytes, Integer partNumber) { try { UploadPartRequest uploadRequest = new UploadPartRequest() .withBucketName(awsCredentials.getBucketName()) - .withKey(filePath.toString()) + .withKey(key) .withUploadId(uploadId) .withPartNumber(partNumber) .withInputStream(inputStream) .withPartSize(lengthInBytes); - - UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest); - partETags.add(uploadResult.getPartETag()); + s3Client.uploadPart(uploadRequest); } catch (RuntimeException e) { logger.error("Failed to upload part", e); } } + @Override public void complete() { + List partETags = new ArrayList<>(); + ListPartsRequest listPartsRequest = new ListPartsRequest(awsCredentials.getBucketName(), key, uploadId); + PartListing partListing; + do { + partListing = s3Client.listParts(listPartsRequest); + for (PartSummary part : partListing.getParts()) { + partETags.add(new PartETag(part.getPartNumber(), part.getETag())); + } + listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker()); + } while (partListing.isTruncated()); + if (partETags.size() == 0) { + logger.debug("UploadId {} has no parts uploaded, aborting upload", uploadId); + abort(); + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(0); + PutObjectRequest putRequest = new PutObjectRequest(awsCredentials.getBucketName(), key, new ByteArrayInputStream(new byte[0]), metadata); + s3Client.putObject(putRequest); + return; + } CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest( awsCredentials.getBucketName(), key, uploadId, partETags); + logger.debug("UploadId {} upload completing and partSize is {}", uploadId, partETags.size()); s3Client.completeMultipartUpload(compRequest); } + @Override public void abort() { AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest(awsCredentials.getBucketName(), key, uploadId); diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index f0558b912b6..9e6c891389c 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -248,7 +248,7 @@ object CelebornCommonSettings { "Build-Revision" -> gitHeadCommit.value.getOrElse("N/A"), "Build-Branch" -> gitCurrentBranch.value, "Build-Time" -> java.time.ZonedDateTime.now().format(java.time.format.DateTimeFormatter.ISO_DATE_TIME)), - + // -target cannot be passed as a parameter to javadoc. See https://github.com/sbt/sbt/issues/355 Compile / compile / javacOptions ++= Seq("-target", "1.8"), @@ -370,7 +370,8 @@ object CelebornBuild extends sbt.internal.BuildDef { CelebornService.service, CelebornWorker.worker, CelebornMaster.master, - CelebornCli.cli) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules + CelebornCli.cli, + CeleborMPU.celeborMPU) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules } // ThisBuild / parallelExecution := false @@ -496,13 +497,10 @@ object CelebornSpi { object CeleborMPU { - lazy val hadoopAwsDependencies = if(profiles.exists(_.startsWith("aws-mpu"))){ - Seq(Dependencies.hadoopAws, Dependencies.awsClient) - } else { - Seq.empty - } + lazy val hadoopAwsDependencies = Seq(Dependencies.hadoopAws, Dependencies.awsClient) lazy val celeborMPU = Project("celeborn-multipart-uploader", file("multipart-uploader")) + .dependsOn(CelebornService.service % "test->test;compile->compile") .settings ( commonSettings, libraryDependencies ++= Seq( @@ -515,12 +513,6 @@ object CeleborMPU { object CelebornCommon { - lazy val celeborMPU = if(profiles.exists(_.startsWith("aws-mpu"))){ - CeleborMPU.celeborMPU - } else { - Seq.empty - } - lazy val common = Project("celeborn-common", file("common")) .dependsOn(CelebornSpi.spi) .settings ( @@ -557,7 +549,7 @@ object CelebornCommon { // SSL support Dependencies.bouncycastleBcprovJdk18on, Dependencies.bouncycastleBcpkixJdk18on - ) ++ commonUnitTestDependencies ++ celeborMPU, + ) ++ commonUnitTestDependencies, Compile / sourceGenerators += Def.task { val file = (Compile / sourceManaged).value / "org" / "apache" / "celeborn" / "package.scala" @@ -664,13 +656,18 @@ object CelebornMaster { } object CelebornWorker { - lazy val worker = Project("celeborn-worker", file("worker")) + var worker = Project("celeborn-worker", file("worker")) .dependsOn(CelebornService.service) .dependsOn(CelebornCommon.common % "test->test;compile->compile") .dependsOn(CelebornService.service % "test->test;compile->compile") .dependsOn(CelebornClient.client % "test->compile") .dependsOn(CelebornMaster.master % "test->compile") - .settings ( + + if (profiles.exists(_.startsWith("aws-mpu"))) { + worker = worker.dependsOn(CeleborMPU.celeborMPU) + } + + worker = worker.settings( commonSettings, libraryDependencies ++= Seq( Dependencies.apLoader, diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java new file mode 100644 index 00000000000..fef89a4214d --- /dev/null +++ b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java @@ -0,0 +1,14 @@ +package org.apache.celeborn.server.common.service.mpu; + +import java.io.InputStream; + +public interface MultipartUploadHandler { + + void startUpload(); + + void putPart(InputStream inputStream, Long lengthInBytes, Integer partNumber); + + void complete(); + + void abort(); +} diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/mpu/bean/AWSCredentials.java b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/bean/AWSCredentials.java new file mode 100644 index 00000000000..dea9f71fb5e --- /dev/null +++ b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/bean/AWSCredentials.java @@ -0,0 +1,70 @@ +package org.apache.celeborn.server.common.service.mpu.bean; + +public class AWSCredentials { + + private String bucketName; + + private String s3AccessKey; + + private String s3SecretKey; + + private String s3EndpointRegion; + + public AWSCredentials( + String bucketName, String s3AccessKey, String s3SecretKey, String s3EndpointRegion) { + this.bucketName = bucketName; + this.s3AccessKey = s3AccessKey; + this.s3SecretKey = s3SecretKey; + this.s3EndpointRegion = s3EndpointRegion; + } + + public String getBucketName() { + return bucketName; + } + + public String getS3AccessKey() { + return s3AccessKey; + } + + public String getS3SecretKey() { + return s3SecretKey; + } + + public String getS3EndpointRegion() { + return s3EndpointRegion; + } + + public void setBucketName(String bucketName) { + this.bucketName = bucketName; + } + + public void setS3AccessKey(String s3AccessKey) { + this.s3AccessKey = s3AccessKey; + } + + public void setS3SecretKey(String s3SecretKey) { + this.s3SecretKey = s3SecretKey; + } + + public void setS3EndpointRegion(String s3EndpointRegion) { + this.s3EndpointRegion = s3EndpointRegion; + } + + @Override + public String toString() { + return "AWSCredentials{" + + "bucketName='" + + bucketName + + '\'' + + ", s3AccessKey='" + + s3AccessKey + + '\'' + + ", s3SecretKey='" + + s3SecretKey + + '\'' + + ", s3EndpointRegion='" + + s3EndpointRegion + + '\'' + + '}'; + } +} diff --git a/worker/pom.xml b/worker/pom.xml index 51b75ce4aae..bdaa259f8be 100644 --- a/worker/pom.xml +++ b/worker/pom.xml @@ -153,7 +153,7 @@ org.apache.celeborn - celeborn-multipart-uploader_2.12 + celeborn-multipart-uploader_${scala.binary.version} ${project.version} diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 38aa9c3046c..6c0b659573d 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.celeborn.reflect.DynConstructors; import scala.Tuple4; import com.google.common.annotations.VisibleForTesting; @@ -39,8 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.celeborn.AWSCredentials; -import org.apache.celeborn.S3MultipartUploader; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.exception.AlreadyClosedException; import org.apache.celeborn.common.exception.CelebornIOException; @@ -53,6 +52,8 @@ import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.common.unsafe.Platform; import org.apache.celeborn.common.util.FileChannelUtils; +import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler; +import org.apache.celeborn.server.common.service.mpu.bean.AWSCredentials; import org.apache.celeborn.service.deploy.worker.WorkerSource; import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController; import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo; @@ -117,7 +118,7 @@ public abstract class PartitionDataWriter implements DeviceObserver { private UserCongestionControlContext userCongestionControlContext = null; - protected S3MultipartUploader s3MultipartUploader; + protected MultipartUploadHandler s3MultipartUploadHandler; protected int partNumber = 1; @@ -200,19 +201,25 @@ public void initFileChannelsForDiskFile() throws IOException { String s3AccessKey = configuration.get("fs.s3a.access.key"); String s3SecretKey = configuration.get("fs.s3a.secret.key"); String s3EndpointRegion = configuration.get("fs.s3a.endpoint.region"); + URI uri = hadoopFs.getUri(); String bucketName = uri.getHost(); int slashIndex = diskFileInfo.getFilePath().indexOf("/"); String key = diskFileInfo.getFilePath().substring(slashIndex + 1); + AWSCredentials awsCredentials = - new AWSCredentials.Builder() - .withS3AccessKey(s3AccessKey) - .withS3SecretKey(s3SecretKey) - .withS3EndpointRegion(s3EndpointRegion) - .withBucketName(bucketName) - .build(); - this.s3MultipartUploader = new S3MultipartUploader(awsCredentials, key); - this.s3MultipartUploader.startUpload(); + new AWSCredentials(bucketName, s3AccessKey, s3SecretKey, s3EndpointRegion); + this.s3MultipartUploadHandler = + (MultipartUploadHandler) + DynConstructors.builder() + .impl( + "org.apache.celeborn.S3MultipartUploadHandler", + awsCredentials.getClass(), + String.class) + .build() + .newInstance(awsCredentials, key); + + s3MultipartUploadHandler.startUpload(); } } catch (IOException e) { try { @@ -264,13 +271,7 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException { task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, false); } else if (diskFileInfo.isS3()) { task = - new S3FlushTask( - flushBuffer, - diskFileInfo.getDfsPath(), - notifier, - false, - s3MultipartUploader, - partNumber); + new S3FlushTask(flushBuffer, notifier, false, s3MultipartUploadHandler, partNumber); } MemoryManager.instance().releaseMemoryFileStorage(numBytes); MemoryManager.instance().incrementDiskBuffer(numBytes); @@ -300,12 +301,7 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException { } else if (diskFileInfo.isS3()) { task = new S3FlushTask( - flushBuffer, - diskFileInfo.getDfsPath(), - notifier, - true, - s3MultipartUploader, - partNumber); + flushBuffer, notifier, true, s3MultipartUploadHandler, partNumber); } } } @@ -344,6 +340,11 @@ public void write(ByteBuf data) throws IOException { throw new AlreadyClosedException(msg); } + if (notifier.hasException() && s3MultipartUploadHandler != null) { + logger.warn("Abort s3 multipart upload for {}", diskFileInfo.getFilePath()); + s3MultipartUploadHandler.complete(); + } + if (notifier.hasException()) { return; } @@ -501,7 +502,9 @@ protected synchronized long close( } finalClose.run(); - + if (s3MultipartUploadHandler != null) { + s3MultipartUploadHandler.complete(); + } // unregister from DeviceMonitor if (diskFileInfo != null && !this.diskFileInfo.isDFS()) { logger.debug("file info {} unregister from device monitor", diskFileInfo); @@ -549,7 +552,6 @@ public void evict(boolean checkClose) throws IOException { storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), diskFileInfo); } } - s3MultipartUploader.complete(); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala index 7295b17ff5e..82ddc035edd 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala @@ -23,8 +23,8 @@ import java.nio.channels.FileChannel import io.netty.buffer.{ByteBufUtil, CompositeByteBuf} import org.apache.hadoop.fs.Path -import org.apache.celeborn.S3MultipartUploader import org.apache.celeborn.common.protocol.StorageInfo.Type +import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler abstract private[worker] class FlushTask( val buffer: CompositeByteBuf, @@ -61,22 +61,20 @@ private[worker] class HdfsFlushTask( hdfsStream.write(ByteBufUtil.getBytes(buffer)) hdfsStream.close() } - } private[worker] class S3FlushTask( buffer: CompositeByteBuf, - val path: Path, notifier: FlushNotifier, keepBuffer: Boolean, - s3MultipartUploader: S3MultipartUploader, + s3MultipartUploader: MultipartUploadHandler, partNumber: Int) extends FlushTask(buffer, notifier, keepBuffer) { override def flush(): Unit = { val bytes = ByteBufUtil.getBytes(buffer) - val currentPartSize = bytes.length.toLong + val currentPartSize = java.lang.Long.valueOf(bytes.length.toLong) val inputStream = new ByteArrayInputStream(bytes) - s3MultipartUploader.putPart(path, inputStream, currentPartSize, partNumber) + s3MultipartUploader.putPart(inputStream, currentPartSize, partNumber) } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index 85b71f797a0..977b3b72f2d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -18,30 +18,23 @@ package org.apache.celeborn.service.deploy.worker.storage import java.io.IOException -import java.nio.channels.ClosedByInterruptException -import java.util import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray} -import scala.collection.JavaConverters._ import scala.util.Random -import io.netty.buffer.{CompositeByteBuf, PooledByteBufAllocator, Unpooled} +import io.netty.buffer.{CompositeByteBuf, PooledByteBufAllocator} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow} import org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSource} import org.apache.celeborn.common.protocol.StorageInfo -import org.apache.celeborn.common.protocol.StorageInfo.Type import org.apache.celeborn.common.util.{ThreadUtils, Utils} -import org.apache.celeborn.reflect.{DynClasses, DynMethods} import org.apache.celeborn.service.deploy.worker.WorkerSource import org.apache.celeborn.service.deploy.worker.WorkerSource.FLUSH_WORKING_QUEUE_SIZE import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController import org.apache.celeborn.service.deploy.worker.memory.MemoryManager -import java.util - abstract private[worker] class Flusher( val workerSource: AbstractSource, val threadCount: Int, From 4d6f5c88bef2e64ec9138d392e6daba4bc18662a Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Mon, 21 Oct 2024 16:36:02 +0800 Subject: [PATCH 03/14] fix UT --- .../org/apache/celeborn/AWSCredentials.java | 70 ------------------- .../worker/storage/PartitionDataWriter.java | 2 +- 2 files changed, 1 insertion(+), 71 deletions(-) delete mode 100644 multipart-uploader/src/main/java/org/apache/celeborn/AWSCredentials.java diff --git a/multipart-uploader/src/main/java/org/apache/celeborn/AWSCredentials.java b/multipart-uploader/src/main/java/org/apache/celeborn/AWSCredentials.java deleted file mode 100644 index 79eb5d0711b..00000000000 --- a/multipart-uploader/src/main/java/org/apache/celeborn/AWSCredentials.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.apache.celeborn; - -public class AWSCredentials { - - private final String bucketName; - - private final String s3AccessKey; - - private final String s3SecretKey; - - private final String s3EndpointRegion; - - private AWSCredentials(Builder builder) { - this.bucketName = builder.bucketName; - this.s3AccessKey = builder.s3AccessKey; - this.s3SecretKey = builder.s3SecretKey; - this.s3EndpointRegion = builder.s3EndpointRegion; - } - - public String getBucketName() { - return bucketName; - } - - public String getS3AccessKey() { - return s3AccessKey; - } - - public String getS3SecretKey() { - return s3SecretKey; - } - - public String getS3EndpointRegion() { - return s3EndpointRegion; - } - - public static class Builder { - - private String bucketName; - - private String s3AccessKey; - - private String s3SecretKey; - - private String s3EndpointRegion; - - public AWSCredentials.Builder withBucketName(String bucketName) { - this.bucketName = bucketName; - return this; - } - - public AWSCredentials.Builder withS3AccessKey(String s3AccessKey) { - this.s3AccessKey = s3AccessKey; - return this; - } - - public AWSCredentials.Builder withS3SecretKey(String s3SecretKey) { - this.s3SecretKey = s3SecretKey; - return this; - } - - public AWSCredentials.Builder withS3EndpointRegion(String s3EndpointRegion) { - this.s3EndpointRegion = s3EndpointRegion; - return this; - } - - public AWSCredentials build() { - return new AWSCredentials(this); - } - } -} diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 6c0b659573d..0e5e4570d37 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.celeborn.reflect.DynConstructors; import scala.Tuple4; import com.google.common.annotations.VisibleForTesting; @@ -52,6 +51,7 @@ import org.apache.celeborn.common.protocol.StorageInfo; import org.apache.celeborn.common.unsafe.Platform; import org.apache.celeborn.common.util.FileChannelUtils; +import org.apache.celeborn.reflect.DynConstructors; import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler; import org.apache.celeborn.server.common.service.mpu.bean.AWSCredentials; import org.apache.celeborn.service.deploy.worker.WorkerSource; From 3edbeec9553df0f002740b7ba00fb61361fde5c5 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Mon, 21 Oct 2024 17:03:48 +0800 Subject: [PATCH 04/14] fix UT --- multipart-uploader/pom.xml | 16 ++++++++++++++++ .../service/mpu/MultipartUploadHandler.java | 17 +++++++++++++++++ .../common/service/mpu/bean/AWSCredentials.java | 17 +++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml index 4c9a2c54dac..f69dcfd4ea0 100644 --- a/multipart-uploader/pom.xml +++ b/multipart-uploader/pom.xml @@ -1,4 +1,20 @@ + 4.0.0 diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java index fef89a4214d..82037327d4d 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.celeborn.server.common.service.mpu; import java.io.InputStream; diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/mpu/bean/AWSCredentials.java b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/bean/AWSCredentials.java index dea9f71fb5e..3d52eabc254 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/mpu/bean/AWSCredentials.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/bean/AWSCredentials.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.celeborn.server.common.service.mpu.bean; public class AWSCredentials { From dcddcc3a41e4c0b1ce67d69fa7a8d981c1f691f0 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 23 Oct 2024 13:35:10 +0800 Subject: [PATCH 05/14] support retry --- .../org/apache/celeborn/S3MultipartUploadHandler.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java index 96170d2cdc3..82bc12ee604 100644 --- a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java +++ b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java @@ -23,8 +23,10 @@ import java.util.Collections; import java.util.List; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.retry.PredefinedRetryPolicies; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; @@ -58,10 +60,16 @@ public S3MultipartUploadHandler(AWSCredentials awsCredentials, String key) { this.awsCredentials = awsCredentials; BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsCredentials.getS3AccessKey(), awsCredentials.getS3SecretKey()); + ClientConfiguration clientConfig = new ClientConfiguration() + .withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(5)) + .withMaxErrorRetry(5) + .withConnectionTimeout(10000) + .withRequestTimeout(50000); this.s3Client = AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) .withRegion(awsCredentials.getS3EndpointRegion()) + .withClientConfiguration(clientConfig) .build(); this.key = key; } From 930cf9cf7469c63646957458c557738b2a5bcaf4 Mon Sep 17 00:00:00 2001 From: zhaohehuhu Date: Wed, 13 Nov 2024 15:43:11 +0800 Subject: [PATCH 06/14] refactor --- build/make-distribution.sh | 4 +- master/pom.xml | 20 +++++++ multipart-uploader/pom.xml | 43 +++----------- .../celeborn/S3MultipartUploadHandler.java | 59 ++++++++++++------- pom.xml | 6 +- project/CelebornBuild.scala | 2 +- .../service/mpu/MultipartUploadHandler.java | 5 +- .../worker/storage/PartitionDataWriter.java | 31 ++++++---- .../worker/MultipartUploadRequestParam.scala | 32 ---------- .../deploy/worker/storage/FlushTask.scala | 6 +- 10 files changed, 101 insertions(+), 107 deletions(-) delete mode 100644 worker/src/main/scala/org/apache/celeborn/service/deploy/worker/MultipartUploadRequestParam.scala diff --git a/build/make-distribution.sh b/build/make-distribution.sh index f4c3a4e23d4..1c9bbe6e5d4 100755 --- a/build/make-distribution.sh +++ b/build/make-distribution.sh @@ -264,8 +264,8 @@ function sbt_build_service { echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE" echo "Build flags: $@" >> "$DIST_DIR/RELEASE" - if [[ $@ == *"aws-mpu"* ]]; then - export SBT_MAVEN_PROFILES="aws-mpu" + if [[ $@ == *"aws"* ]]; then + export SBT_MAVEN_PROFILES="aws" fi BUILD_COMMAND=("$SBT" clean package) diff --git a/master/pom.xml b/master/pom.xml index 51286b9efad..ae6fab55669 100644 --- a/master/pom.xml +++ b/master/pom.xml @@ -198,5 +198,25 @@ + + aws-mpu + + + aws-mpu-deps + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + ${aws.version} + + + diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml index f69dcfd4ea0..cfdbbb4ee7c 100644 --- a/multipart-uploader/pom.xml +++ b/multipart-uploader/pom.xml @@ -30,21 +30,19 @@ - org.apache.logging.log4j - log4j-slf4j-impl + org.apache.celeborn + celeborn-service_${scala.binary.version} + ${project.version} - org.apache.logging.log4j - log4j-1.2-api + org.apache.hadoop + hadoop-aws + ${hadoop.version} - org.apache.logging.log4j - log4j-1.2-api - - - org.apache.celeborn - celeborn-service_${scala.binary.version} - ${project.version} + com.amazonaws + aws-java-sdk-bundle + ${aws.version} @@ -56,27 +54,4 @@ - - - - aws-mpu - - - aws-mpu-deps - - - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - - - com.amazonaws - aws-java-sdk-bundle - ${aws.version} - - - - diff --git a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java index 82bc12ee604..925fe638f4b 100644 --- a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java +++ b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java @@ -17,28 +17,27 @@ package org.apache.celeborn; -import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.event.ProgressListener; import com.amazonaws.retry.PredefinedRetryPolicies; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.ListPartsRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PartListing; import com.amazonaws.services.s3.model.PartSummary; -import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.UploadPartRequest; import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler; @@ -62,9 +61,7 @@ public S3MultipartUploadHandler(AWSCredentials awsCredentials, String key) { new BasicAWSCredentials(awsCredentials.getS3AccessKey(), awsCredentials.getS3SecretKey()); ClientConfiguration clientConfig = new ClientConfiguration() .withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(5)) - .withMaxErrorRetry(5) - .withConnectionTimeout(10000) - .withRequestTimeout(50000); + .withMaxErrorRetry(5); this.s3Client = AmazonS3ClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials)) @@ -79,12 +76,17 @@ public void startUpload() { InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(awsCredentials.getBucketName(), key); InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); - uploadId = initResponse.getUploadId(); + this.uploadId = initResponse.getUploadId(); } @Override - public void putPart(InputStream inputStream, Long lengthInBytes, Integer partNumber) { + public void putPart(InputStream inputStream, Integer partNumber, Boolean finalFlush) throws IOException { try { + int partSize = inputStream.available(); + if (partSize == 0) { + logger.warn("key {} uploadId {} part size is 0 for part number {} finalFlush {}", key, uploadId, partNumber, finalFlush); + return; + } UploadPartRequest uploadRequest = new UploadPartRequest() .withBucketName(awsCredentials.getBucketName()) @@ -92,10 +94,16 @@ public void putPart(InputStream inputStream, Long lengthInBytes, Integer partNum .withUploadId(uploadId) .withPartNumber(partNumber) .withInputStream(inputStream) - .withPartSize(lengthInBytes); + .withPartSize(partSize) + .withLastPart(finalFlush); s3Client.uploadPart(uploadRequest); + logger.warn("key {} uploadId {} part number {} uploaded with size {} finalFlush {}",key, uploadId, partNumber, partSize, finalFlush); } catch (RuntimeException e) { logger.error("Failed to upload part", e); + throw e; + } catch (IOException e) { + logger.error("Failed to upload part", e); + throw e; } } @@ -111,20 +119,22 @@ public void complete() { } listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker()); } while (partListing.isTruncated()); - if (partETags.size() == 0) { - logger.debug("UploadId {} has no parts uploaded, aborting upload", uploadId); + if (partETags.size() == 0){ + logger.debug("bucket {} key {} uploadId {} has no parts uploaded, aborting upload", awsCredentials.getBucketName(), key, uploadId); abort(); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(0); - PutObjectRequest putRequest = new PutObjectRequest(awsCredentials.getBucketName(), key, new ByteArrayInputStream(new byte[0]), metadata); - s3Client.putObject(putRequest); + logger.debug("bucket {} key {} upload completed with size {}", awsCredentials.getBucketName(), key, 0); return; } - CompleteMultipartUploadRequest compRequest = - new CompleteMultipartUploadRequest( - awsCredentials.getBucketName(), key, uploadId, partETags); - logger.debug("UploadId {} upload completing and partSize is {}", uploadId, partETags.size()); - s3Client.completeMultipartUpload(compRequest); + ProgressListener progressListener = progressEvent -> { + logger.debug("key {} uploadId {} progress event type {} transferred {} bytes", key, uploadId, progressEvent.getEventType(), progressEvent.getBytesTransferred()); + }; + + CompleteMultipartUploadRequest compRequest = + new CompleteMultipartUploadRequest( + awsCredentials.getBucketName(), key, uploadId, partETags) + .withGeneralProgressListener(progressListener); + CompleteMultipartUploadResult compResult = s3Client.completeMultipartUpload(compRequest); + logger.debug("bucket {} key {} uploadId {} upload completed location is in {} ", awsCredentials.getBucketName(), key, uploadId, compResult.getLocation()); } @Override @@ -133,4 +143,11 @@ public void abort() { new AbortMultipartUploadRequest(awsCredentials.getBucketName(), key, uploadId); s3Client.abortMultipartUpload(abortMultipartUploadRequest); } + + @Override + public void close() { + if (s3Client != null) { + s3Client.shutdown(); + } + } } diff --git a/pom.xml b/pom.xml index 1de6563e203..65d14fa8621 100644 --- a/pom.xml +++ b/pom.xml @@ -72,6 +72,7 @@ 3.3.6 + 1.12.532