Skip to content

Commit

Permalink
[Feature-#1862][s3] Added support for reading compressed format files…
Browse files Browse the repository at this point in the history
… , writing multiple objects etc.
  • Loading branch information
libailin authored and lihongwei committed Jan 26, 2024
1 parent e0a71d0 commit ff29892
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ public class S3Config extends CommonConfig implements Serializable {

/** 压缩方式 */
private String compress;

/** 是否写一个还是多个对象 */
private boolean writeSingleObject = true;

/** 生成的文件名后缀 */
private String suffix;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.dtstack.chunjun.connector.s3.sink;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.SpeedConfig;
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.converter.S3SqlConverter;
Expand Down Expand Up @@ -66,11 +67,14 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
}
s3Config.setColumn(columnList);
S3OutputFormatBuilder builder = new S3OutputFormatBuilder(new S3OutputFormat());
builder.setSpeedConf(new SpeedConfig());
builder.setS3Conf(s3Config);
builder.setRowConverter(
new S3SqlConverter(InternalTypeInfo.of(logicalType).toRowType(), s3Config));

return SinkFunctionProvider.of(new DtOutputFormatSinkFunction<>(builder.finish()), 1);
int sinkParallelism = s3Config.getParallelism() == null ? 1 : s3Config.getParallelism();
return SinkFunctionProvider.of(
new DtOutputFormatSinkFunction<>(builder.finish()), sinkParallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.util.S3Util;
import com.dtstack.chunjun.connector.s3.util.WriterUtil;
import com.dtstack.chunjun.constants.ConstantValue;
import com.dtstack.chunjun.restore.FormatState;
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
Expand All @@ -32,7 +33,6 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PartETag;
import com.esotericsoftware.minlog.Log;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -54,10 +54,10 @@ public class S3OutputFormat extends BaseRichOutputFormat {
private S3Config s3Config;

/** Must start at 1 and cannot be greater than 10,000 */
private static int currentPartNumber;
private int currentPartNumber;

private static String currentUploadId;
private static boolean willClose = false;
private String currentUploadId;
private boolean willClose = false;
private transient StringWriter sw;
private transient List<MyPartETag> myPartETags;

Expand All @@ -66,22 +66,50 @@ public class S3OutputFormat extends BaseRichOutputFormat {

private static final long MIN_SIZE = 1024 * 1024 * 25L;

@Override
public void initializeGlobal(int parallelism) {
this.amazonS3 = S3Util.getS3Client(s3Config);
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
checkOutputDir();
}
}

@Override
protected void openInternal(int taskNumber, int numTasks) {
// 写多个对象时
if (!s3Config.isWriteSingleObject()) {
s3Config.setObject(
s3Config.getObject()
+ ConstantValue.SINGLE_SLASH_SYMBOL
+ jobId
+ "_"
+ taskNumber
+ getExtension());
} else {
// 写单个对象时
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
// 当写入模式是overwrite时
s3Config.setObject(s3Config.getObject() + getExtension());
} else {
// 当写入模式是append时
s3Config.setObject(
s3Config.getObject() + "_" + jobId + "_" + taskNumber + getExtension());
}
}
log.info("current write object name: {}", s3Config.getObject());
List<FieldConfig> column = s3Config.getColumn();
columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList());
columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList());
openSource();
restore();
checkOutputDir();
createActionFinishedTag();
nextBlock();
List<FieldConfig> column = s3Config.getColumn();
columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList());
columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList());
}

private void openSource() {
this.amazonS3 = S3Util.getS3Client(s3Config);
this.myPartETags = new ArrayList<>();
this.currentPartNumber = taskNumber - numTasks + 1;
this.currentPartNumber = 0;
beforeWriteRecords();
}

Expand All @@ -95,17 +123,50 @@ private void restore() {
}

private void checkOutputDir() {
if (S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) {
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject());
// 覆盖写单个对象时
if (s3Config.isWriteSingleObject()
&& S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) {
S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject());
}
// 覆盖写多个对象时
if (!s3Config.isWriteSingleObject()) {
List<String> subObjects;
if (s3Config.isUseV2()) {
subObjects =
S3Util.listObjectsKeyByPrefix(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
s3Config.getFetchSize());
} else {
subObjects =
S3Util.listObjectsByv1(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
s3Config.getFetchSize());
}
String[] keys = subObjects.toArray(new String[] {});
S3Util.deleteObjects(amazonS3, s3Config.getBucket(), keys);
log.info("delete objects num:" + keys.length);
log.debug("delete objects list:" + StringUtils.join(keys, ","));
}
}

public String getExtension() {
if (StringUtils.isNotBlank(s3Config.getSuffix())) {
return s3Config.getSuffix();
} else {
return "";
}
}

private void nextBlock() {
sw = new StringWriter();
if (sw == null) {
sw = new StringWriter();
}
this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter());
this.currentPartNumber = this.currentPartNumber + numTasks;
this.currentPartNumber = this.currentPartNumber + 1;
}

/** Create file multipart upload ID */
Expand Down Expand Up @@ -159,6 +220,7 @@ protected void flushDataInternal() {
myPartETags.stream().map(Objects::toString).collect(Collectors.joining(",")));
writerUtil.close();
writerUtil = null;
sw = null;
}
}

Expand Down Expand Up @@ -192,7 +254,7 @@ public void closeInternal() {
flushDataInternal();
completeMultipartUploadFile();
S3Util.closeS3(amazonS3);
Log.info("S3Client close!");
log.info("S3Client close!");
}

@Override
Expand All @@ -218,7 +280,7 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
}
String[] stringRecord = new String[columnNameList.size()];
// convert row to string
rowConverter.toExternal(rowData, stringRecord);
stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord);
try {
for (int i = 0; i < columnNameList.size(); ++i) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dtstack.chunjun.connector.s3.sink.S3DynamicTableSink;
import com.dtstack.chunjun.connector.s3.source.S3DynamicTableSource;
import com.dtstack.chunjun.connector.s3.table.options.S3Options;
import com.dtstack.chunjun.table.options.SinkOptions;
import com.dtstack.chunjun.util.GsonUtil;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -88,6 +89,11 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(S3Options.OBJECT);
options.add(S3Options.ENDPOINT);
options.add(S3Options.COMPRESS);
options.add(S3Options.WRITE_SINGLE_OBJECT);
options.add(S3Options.USE_V2);
options.add(S3Options.SUFFIX);
options.add(SinkOptions.SINK_PARALLELISM);
options.add(S3Options.WRITE_MODE);
return options;
}

Expand All @@ -108,6 +114,13 @@ public DynamicTableSink createDynamicTableSink(Context context) {
s3Config.setEncoding(options.get(S3Options.ENCODING));
s3Config.setRegion(options.get(S3Options.REGION));
s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER));
s3Config.setEndpoint(options.get(S3Options.ENDPOINT));
s3Config.setCompress(options.get(S3Options.COMPRESS));
s3Config.setWriteSingleObject(options.get(S3Options.WRITE_SINGLE_OBJECT));
s3Config.setUseV2(options.get(S3Options.USE_V2));
s3Config.setSuffix(options.get(S3Options.SUFFIX));
s3Config.setParallelism(options.get(SinkOptions.SINK_PARALLELISM));
s3Config.setWriteMode(options.get(S3Options.WRITE_MODE));

return new S3DynamicTableSink(context.getCatalogTable().getResolvedSchema(), s3Config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,22 @@ public class S3Options {
.stringType()
.noDefaultValue()
.withDescription("s3 file compression type");

public static final ConfigOption<Boolean> WRITE_SINGLE_OBJECT =
key("writeSingleObject")
.booleanType()
.defaultValue(true)
.withDescription("whether to write a single or multiple objects");

public static final ConfigOption<Boolean> USE_V2 =
key("useV2")
.booleanType()
.defaultValue(true)
.withDescription("Get the api version of the number of files in a directory");

public static final ConfigOption<String> SUFFIX =
key("suffix").stringType().noDefaultValue().withDescription("s3 file suffix");

public static final ConfigOption<String> WRITE_MODE =
key("writeMode").stringType().defaultValue("overwrite").withDescription("writeMode");
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
Expand Down Expand Up @@ -196,6 +197,13 @@ public static void deleteObject(AmazonS3 s3Client, String bucketName, String obj
s3Client.deleteObject(bucketName, object);
}

public static void deleteObjects(AmazonS3 s3Client, String bucketName, String[] keys) {
if (keys.length > 0) {
DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
s3Client.deleteObjects(request);
}
}

public static void closeS3(AmazonS3 amazonS3) {
if (amazonS3 != null) {
amazonS3.shutdown();
Expand Down

0 comments on commit ff29892

Please sign in to comment.