Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaohehuhu committed Nov 13, 2024
1 parent dcddcc3 commit 930cf9c
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 107 deletions.
4 changes: 2 additions & 2 deletions build/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ function sbt_build_service {
echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
echo "Build flags: $@" >> "$DIST_DIR/RELEASE"

if [[ $@ == *"aws-mpu"* ]]; then
export SBT_MAVEN_PROFILES="aws-mpu"
if [[ $@ == *"aws"* ]]; then
export SBT_MAVEN_PROFILES="aws"
fi
BUILD_COMMAND=("$SBT" clean package)

Expand Down
20 changes: 20 additions & 0 deletions master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,25 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>aws-mpu</id>
<activation>
<property>
<name>aws-mpu-deps</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
43 changes: 9 additions & 34 deletions multipart-uploader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,19 @@

<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-service_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-service_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>

Expand All @@ -56,27 +54,4 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>aws-mpu</id>
<activation>
<property>
<name>aws-mpu-deps</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,27 @@

package org.apache.celeborn;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListPartsRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PartListing;
import com.amazonaws.services.s3.model.PartSummary;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;

import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
Expand All @@ -62,9 +61,7 @@ public S3MultipartUploadHandler(AWSCredentials awsCredentials, String key) {
new BasicAWSCredentials(awsCredentials.getS3AccessKey(), awsCredentials.getS3SecretKey());
ClientConfiguration clientConfig = new ClientConfiguration()
.withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(5))
.withMaxErrorRetry(5)
.withConnectionTimeout(10000)
.withRequestTimeout(50000);
.withMaxErrorRetry(5);
this.s3Client =
AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
Expand All @@ -79,23 +76,34 @@ public void startUpload() {
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(awsCredentials.getBucketName(), key);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
uploadId = initResponse.getUploadId();
this.uploadId = initResponse.getUploadId();
}

@Override
public void putPart(InputStream inputStream, Long lengthInBytes, Integer partNumber) {
public void putPart(InputStream inputStream, Integer partNumber, Boolean finalFlush) throws IOException {
try {
int partSize = inputStream.available();
if (partSize == 0) {
logger.warn("key {} uploadId {} part size is 0 for part number {} finalFlush {}", key, uploadId, partNumber, finalFlush);
return;
}
UploadPartRequest uploadRequest =
new UploadPartRequest()
.withBucketName(awsCredentials.getBucketName())
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withInputStream(inputStream)
.withPartSize(lengthInBytes);
.withPartSize(partSize)
.withLastPart(finalFlush);
s3Client.uploadPart(uploadRequest);
logger.warn("key {} uploadId {} part number {} uploaded with size {} finalFlush {}",key, uploadId, partNumber, partSize, finalFlush);
} catch (RuntimeException e) {
logger.error("Failed to upload part", e);
throw e;
} catch (IOException e) {
logger.error("Failed to upload part", e);
throw e;
}
}

Expand All @@ -111,20 +119,22 @@ public void complete() {
}
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
} while (partListing.isTruncated());
if (partETags.size() == 0) {
logger.debug("UploadId {} has no parts uploaded, aborting upload", uploadId);
if (partETags.size() == 0){
logger.debug("bucket {} key {} uploadId {} has no parts uploaded, aborting upload", awsCredentials.getBucketName(), key, uploadId);
abort();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
PutObjectRequest putRequest = new PutObjectRequest(awsCredentials.getBucketName(), key, new ByteArrayInputStream(new byte[0]), metadata);
s3Client.putObject(putRequest);
logger.debug("bucket {} key {} upload completed with size {}", awsCredentials.getBucketName(), key, 0);
return;
}
CompleteMultipartUploadRequest compRequest =
new CompleteMultipartUploadRequest(
awsCredentials.getBucketName(), key, uploadId, partETags);
logger.debug("UploadId {} upload completing and partSize is {}", uploadId, partETags.size());
s3Client.completeMultipartUpload(compRequest);
ProgressListener progressListener = progressEvent -> {
logger.debug("key {} uploadId {} progress event type {} transferred {} bytes", key, uploadId, progressEvent.getEventType(), progressEvent.getBytesTransferred());
};

CompleteMultipartUploadRequest compRequest =
new CompleteMultipartUploadRequest(
awsCredentials.getBucketName(), key, uploadId, partETags)
.withGeneralProgressListener(progressListener);
CompleteMultipartUploadResult compResult = s3Client.completeMultipartUpload(compRequest);
logger.debug("bucket {} key {} uploadId {} upload completed location is in {} ", awsCredentials.getBucketName(), key, uploadId, compResult.getLocation());
}

@Override
Expand All @@ -133,4 +143,11 @@ public void abort() {
new AbortMultipartUploadRequest(awsCredentials.getBucketName(), key, uploadId);
s3Client.abortMultipartUpload(abortMultipartUploadRequest);
}

@Override
public void close() {
if (s3Client != null) {
s3Client.shutdown();
}
}
}
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@

<!-- use hadoop-3 as default -->
<hadoop.version>3.3.6</hadoop.version>
<aws.version>1.12.532</aws.version>
<!--
If you change codahale.metrics.version, you also need to change
the link to metrics.dropwizard.io in docs/monitoring.md.
Expand Down Expand Up @@ -1341,13 +1342,12 @@
</dependencies>
</profile>
<profile>
<id>aws-mpu</id>
<id>aws</id>
<modules>
<module>multipart-uploader</module>
</modules>
<properties>
<aws-mpu-deps>true</aws-mpu-deps>
<aws.version>1.12.367</aws.version>
<aws-deps>true</aws-deps>
</properties>
</profile>
<profile>
Expand Down
2 changes: 1 addition & 1 deletion project/CelebornBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ object CelebornWorker {
.dependsOn(CelebornClient.client % "test->compile")
.dependsOn(CelebornMaster.master % "test->compile")

if (profiles.exists(_.startsWith("aws-mpu"))) {
if (profiles.exists(_.startsWith("aws"))) {
worker = worker.dependsOn(CeleborMPU.celeborMPU)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.celeborn.server.common.service.mpu;

import java.io.IOException;
import java.io.InputStream;

public interface MultipartUploadHandler {

void startUpload();

void putPart(InputStream inputStream, Long lengthInBytes, Integer partNumber);
void putPart(InputStream inputStream, Integer partNumber, Boolean finalFlush) throws IOException;

void complete();

void abort();

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ public void initFileChannelsForDiskFile() throws IOException {

URI uri = hadoopFs.getUri();
String bucketName = uri.getHost();
int slashIndex = diskFileInfo.getFilePath().indexOf("/");
String key = diskFileInfo.getFilePath().substring(slashIndex + 1);
int index = diskFileInfo.getFilePath().indexOf(bucketName);
String key = diskFileInfo.getFilePath().substring(index + bucketName.length() + 1);

AWSCredentials awsCredentials =
new AWSCredentials(bucketName, s3AccessKey, s3SecretKey, s3EndpointRegion);
Expand Down Expand Up @@ -271,7 +271,13 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException {
task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, false);
} else if (diskFileInfo.isS3()) {
task =
new S3FlushTask(flushBuffer, notifier, false, s3MultipartUploadHandler, partNumber);
new S3FlushTask(
flushBuffer,
notifier,
false,
s3MultipartUploadHandler,
partNumber++,
finalFlush);
}
MemoryManager.instance().releaseMemoryFileStorage(numBytes);
MemoryManager.instance().incrementDiskBuffer(numBytes);
Expand Down Expand Up @@ -301,7 +307,12 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException {
} else if (diskFileInfo.isS3()) {
task =
new S3FlushTask(
flushBuffer, notifier, true, s3MultipartUploadHandler, partNumber);
flushBuffer,
notifier,
true,
s3MultipartUploadHandler,
partNumber++,
finalFlush);
}
}
}
Expand All @@ -310,7 +321,6 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException {
if (task != null) {
addTask(task);
flushBuffer = null;
partNumber++;
if (!fromEvict) {
diskFileInfo.updateBytesFlushed(numBytes);
}
Expand Down Expand Up @@ -340,12 +350,12 @@ public void write(ByteBuf data) throws IOException {
throw new AlreadyClosedException(msg);
}

if (notifier.hasException() && s3MultipartUploadHandler != null) {
logger.warn("Abort s3 multipart upload for {}", diskFileInfo.getFilePath());
s3MultipartUploadHandler.complete();
}

if (notifier.hasException()) {
if (s3MultipartUploadHandler != null) {
logger.warn("Abort s3 multipart upload for {}", diskFileInfo.getFilePath());
s3MultipartUploadHandler.complete();
s3MultipartUploadHandler.close();
}
return;
}

Expand Down Expand Up @@ -504,6 +514,7 @@ protected synchronized long close(
finalClose.run();
if (s3MultipartUploadHandler != null) {
s3MultipartUploadHandler.complete();
s3MultipartUploadHandler.close();
}
// unregister from DeviceMonitor
if (diskFileInfo != null && !this.diskFileInfo.isDFS()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ private[worker] class S3FlushTask(
notifier: FlushNotifier,
keepBuffer: Boolean,
s3MultipartUploader: MultipartUploadHandler,
partNumber: Int)
partNumber: Int,
finalFlush: Boolean = false)
extends FlushTask(buffer, notifier, keepBuffer) {

override def flush(): Unit = {
val bytes = ByteBufUtil.getBytes(buffer)
val currentPartSize = java.lang.Long.valueOf(bytes.length.toLong)
val inputStream = new ByteArrayInputStream(bytes)
s3MultipartUploader.putPart(inputStream, currentPartSize, partNumber)
s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
}
}

0 comments on commit 930cf9c

Please sign in to comment.