-
Notifications
You must be signed in to change notification settings - Fork 374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1530] support MPU for S3 #2830
Conversation
Thanks for this PR. Are there any test results? |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR but there are some points to polish.
multipart-uploader/pom.xml
Outdated
</dependency> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-1.2-api</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency is duplicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
multipart-uploader/pom.xml
Outdated
<name>aws-mpu-deps</name> | ||
</property> | ||
</activation> | ||
<dependencies> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these dependencies can be moved to dependencies section because this module is loaded when aws-mpu profile is activated only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
multipart-uploader/pom.xml
Outdated
|
||
<profiles> | ||
<profile> | ||
<id>aws-mpu</id> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The profile name can be changed to aws.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
package org.apache.celeborn.server.common.service.mpu.bean; | ||
|
||
public class AWSCredentials { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class should not be in the common module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
worker/pom.xml
Outdated
<property> | ||
<name>aws-mpu-deps</name> | ||
</property> | ||
</activation> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This segment is not needed.
<activation>
<property>
<name>aws-mpu-deps</name>
</property>
</activation>
DynConstructors.builder() | ||
.impl( | ||
"org.apache.celeborn.S3MultipartUploadHandler", | ||
awsCredentials.getClass(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass the arguments to S3MultipartUploadHandler
should be enough for this scenerio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), notifier, true); | ||
task = | ||
new S3FlushTask( | ||
flushBuffer, notifier, true, s3MultipartUploadHandler, partNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushBuffer, notifier, true, s3MultipartUploadHandler, partNumber); | |
flushBuffer, notifier, true, s3MultipartUploadHandler, partNumber++); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -273,6 +310,7 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException { | |||
if (task != null) { | |||
addTask(task); | |||
flushBuffer = null; | |||
partNumber++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line can be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
logger.warn("Abort s3 multipart upload for {}", diskFileInfo.getFilePath()); | ||
s3MultipartUploadHandler.complete(); | ||
} | ||
|
||
if (notifier.hasException()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two if blocks can be merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import java.lang.{Long => JLong} | ||
import java.util.{List => JList} | ||
|
||
case class MultipartUploadRequestParam( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused class. Can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@zhaohehuhu @FMX @WillemJiang |
I still need more time to fully test it as S3 has some limitations related to MPU. |
Every PR should be production-ready before it's been merged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks. Merged into main(v0.6.0).
What changes were proposed in this pull request?
as title
Why are the changes needed?
AWS S3 doesn't support append, so Celeborn had to copy the historical data from s3 to worker and write to s3 again, which heavily scales out the write. This PR implements a better solution via MPU to avoid copy-and-write.
Does this PR introduce any user-facing change?
How was this patch tested?
I conducted an experiment with a 1GB input dataset to compare the performance of Celeborn using only S3 storage versus using SSD storage. The results showed that Celeborn with SSD storage was approximately three times faster than with only S3 storage.
The above screenshot is the second test with 5000 mapper and reducer that I did.