From ff29892f629bd9f1e4df32cbd84aef1d1bcf0253 Mon Sep 17 00:00:00 2001 From: libailin Date: Fri, 26 Jan 2024 15:56:55 +0800 Subject: [PATCH] [Feature-#1862][s3] Added support for reading compressed format files , writing multiple objects etc. --- .../chunjun/connector/s3/config/S3Config.java | 6 ++ .../connector/s3/sink/S3DynamicTableSink.java | 6 +- .../connector/s3/sink/S3OutputFormat.java | 94 +++++++++++++++---- .../s3/table/S3DynamicTableFactory.java | 13 +++ .../connector/s3/table/options/S3Options.java | 18 ++++ .../chunjun/connector/s3/util/S3Util.java | 8 ++ 6 files changed, 128 insertions(+), 17 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java index ea02dbbf96..2fe73532da 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java @@ -80,4 +80,10 @@ public class S3Config extends CommonConfig implements Serializable { /** 压缩方式 */ private String compress; + + /** 是否写一个还是多个对象 */ + private boolean writeSingleObject = true; + + /** 生成的文件名后缀 */ + private String suffix; } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java index 6aa6cff90b..e50a2e1125 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java @@ -19,6 +19,7 @@ package com.dtstack.chunjun.connector.s3.sink; import com.dtstack.chunjun.config.FieldConfig; +import com.dtstack.chunjun.config.SpeedConfig; import com.dtstack.chunjun.config.TypeConfig; import com.dtstack.chunjun.connector.s3.config.S3Config; import com.dtstack.chunjun.connector.s3.converter.S3SqlConverter; @@ -66,11 +67,14 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } s3Config.setColumn(columnList); S3OutputFormatBuilder builder = new S3OutputFormatBuilder(new S3OutputFormat()); + builder.setSpeedConf(new SpeedConfig()); builder.setS3Conf(s3Config); builder.setRowConverter( new S3SqlConverter(InternalTypeInfo.of(logicalType).toRowType(), s3Config)); - return SinkFunctionProvider.of(new DtOutputFormatSinkFunction<>(builder.finish()), 1); + int sinkParallelism = s3Config.getParallelism() == null ? 1 : s3Config.getParallelism(); + return SinkFunctionProvider.of( + new DtOutputFormatSinkFunction<>(builder.finish()), sinkParallelism); } @Override diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java index 4cd770373f..7abd7d4c87 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java @@ -22,6 +22,7 @@ import com.dtstack.chunjun.connector.s3.config.S3Config; import com.dtstack.chunjun.connector.s3.util.S3Util; import com.dtstack.chunjun.connector.s3.util.WriterUtil; +import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.restore.FormatState; import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; import com.dtstack.chunjun.throwable.ChunJunRuntimeException; @@ -32,7 +33,6 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.PartETag; -import com.esotericsoftware.minlog.Log; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -54,10 +54,10 @@ public class S3OutputFormat extends BaseRichOutputFormat { private S3Config s3Config; /** Must start at 1 and cannot be greater than 10,000 */ - private static int currentPartNumber; + private int currentPartNumber; - private static String currentUploadId; - private static boolean willClose = false; + private String currentUploadId; + private boolean willClose = false; private transient StringWriter sw; private transient List myPartETags; @@ -66,22 +66,50 @@ public class S3OutputFormat extends BaseRichOutputFormat { private static final long MIN_SIZE = 1024 * 1024 * 25L; + @Override + public void initializeGlobal(int parallelism) { + this.amazonS3 = S3Util.getS3Client(s3Config); + if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) { + checkOutputDir(); + } + } + @Override protected void openInternal(int taskNumber, int numTasks) { + // 写多个对象时 + if (!s3Config.isWriteSingleObject()) { + s3Config.setObject( + s3Config.getObject() + + ConstantValue.SINGLE_SLASH_SYMBOL + + jobId + + "_" + + taskNumber + + getExtension()); + } else { + // 写单个对象时 + if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) { + // 当写入模式是overwrite时 + s3Config.setObject(s3Config.getObject() + getExtension()); + } else { + // 当写入模式是append时 + s3Config.setObject( + s3Config.getObject() + "_" + jobId + "_" + taskNumber + getExtension()); + } + } + log.info("current write object name: {}", s3Config.getObject()); + List column = s3Config.getColumn(); + columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList()); + columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList()); openSource(); restore(); - checkOutputDir(); createActionFinishedTag(); nextBlock(); - List column = s3Config.getColumn(); - columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList()); - columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList()); } private void openSource() { this.amazonS3 = S3Util.getS3Client(s3Config); this.myPartETags = new ArrayList<>(); - this.currentPartNumber = taskNumber - numTasks + 1; + this.currentPartNumber = 0; beforeWriteRecords(); } @@ -95,17 +123,50 @@ private void restore() { } private void checkOutputDir() { - if (S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) { - if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) { - S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject()); + // 覆盖写单个对象时 + if (s3Config.isWriteSingleObject() + && S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) { + S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject()); + } + // 覆盖写多个对象时 + if (!s3Config.isWriteSingleObject()) { + List subObjects; + if (s3Config.isUseV2()) { + subObjects = + S3Util.listObjectsKeyByPrefix( + amazonS3, + s3Config.getBucket(), + s3Config.getObject(), + s3Config.getFetchSize()); + } else { + subObjects = + S3Util.listObjectsByv1( + amazonS3, + s3Config.getBucket(), + s3Config.getObject(), + s3Config.getFetchSize()); } + String[] keys = subObjects.toArray(new String[] {}); + S3Util.deleteObjects(amazonS3, s3Config.getBucket(), keys); + log.info("delete objects num:" + keys.length); + log.debug("delete objects list:" + StringUtils.join(keys, ",")); + } + } + + public String getExtension() { + if (StringUtils.isNotBlank(s3Config.getSuffix())) { + return s3Config.getSuffix(); + } else { + return ""; } } private void nextBlock() { - sw = new StringWriter(); + if (sw == null) { + sw = new StringWriter(); + } this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter()); - this.currentPartNumber = this.currentPartNumber + numTasks; + this.currentPartNumber = this.currentPartNumber + 1; } /** Create file multipart upload ID */ @@ -159,6 +220,7 @@ protected void flushDataInternal() { myPartETags.stream().map(Objects::toString).collect(Collectors.joining(","))); writerUtil.close(); writerUtil = null; + sw = null; } } @@ -192,7 +254,7 @@ public void closeInternal() { flushDataInternal(); completeMultipartUploadFile(); S3Util.closeS3(amazonS3); - Log.info("S3Client close!"); + log.info("S3Client close!"); } @Override @@ -218,7 +280,7 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce } String[] stringRecord = new String[columnNameList.size()]; // convert row to string - rowConverter.toExternal(rowData, stringRecord); + stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord); try { for (int i = 0; i < columnNameList.size(); ++i) { diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java index c6142c95b2..fa05abaf7f 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java @@ -22,6 +22,7 @@ import com.dtstack.chunjun.connector.s3.sink.S3DynamicTableSink; import com.dtstack.chunjun.connector.s3.source.S3DynamicTableSource; import com.dtstack.chunjun.connector.s3.table.options.S3Options; +import com.dtstack.chunjun.table.options.SinkOptions; import com.dtstack.chunjun.util.GsonUtil; import org.apache.flink.configuration.ConfigOption; @@ -88,6 +89,11 @@ public Set> optionalOptions() { options.add(S3Options.OBJECT); options.add(S3Options.ENDPOINT); options.add(S3Options.COMPRESS); + options.add(S3Options.WRITE_SINGLE_OBJECT); + options.add(S3Options.USE_V2); + options.add(S3Options.SUFFIX); + options.add(SinkOptions.SINK_PARALLELISM); + options.add(S3Options.WRITE_MODE); return options; } @@ -108,6 +114,13 @@ public DynamicTableSink createDynamicTableSink(Context context) { s3Config.setEncoding(options.get(S3Options.ENCODING)); s3Config.setRegion(options.get(S3Options.REGION)); s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER)); + s3Config.setEndpoint(options.get(S3Options.ENDPOINT)); + s3Config.setCompress(options.get(S3Options.COMPRESS)); + s3Config.setWriteSingleObject(options.get(S3Options.WRITE_SINGLE_OBJECT)); + s3Config.setUseV2(options.get(S3Options.USE_V2)); + s3Config.setSuffix(options.get(S3Options.SUFFIX)); + s3Config.setParallelism(options.get(SinkOptions.SINK_PARALLELISM)); + s3Config.setWriteMode(options.get(S3Options.WRITE_MODE)); return new S3DynamicTableSink(context.getCatalogTable().getResolvedSchema(), s3Config); } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java index acb381fba1..1f6236438f 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java @@ -77,4 +77,22 @@ public class S3Options { .stringType() .noDefaultValue() .withDescription("s3 file compression type"); + + public static final ConfigOption WRITE_SINGLE_OBJECT = + key("writeSingleObject") + .booleanType() + .defaultValue(true) + .withDescription("whether to write a single or multiple objects"); + + public static final ConfigOption USE_V2 = + key("useV2") + .booleanType() + .defaultValue(true) + .withDescription("Get the api version of the number of files in a directory"); + + public static final ConfigOption SUFFIX = + key("suffix").stringType().noDefaultValue().withDescription("s3 file suffix"); + + public static final ConfigOption WRITE_MODE = + key("writeMode").stringType().defaultValue("overwrite").withDescription("writeMode"); } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java index 2e783b0c41..a7d849a4a0 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java @@ -32,6 +32,7 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; @@ -196,6 +197,13 @@ public static void deleteObject(AmazonS3 s3Client, String bucketName, String obj s3Client.deleteObject(bucketName, object); } + public static void deleteObjects(AmazonS3 s3Client, String bucketName, String[] keys) { + if (keys.length > 0) { + DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys); + s3Client.deleteObjects(request); + } + } + public static void closeS3(AmazonS3 amazonS3) { if (amazonS3 != null) { amazonS3.shutdown();