From f1bdce5740192da92f4e0033837483114c9bd00b Mon Sep 17 00:00:00 2001 From: libailin Date: Fri, 26 Jan 2024 15:56:55 +0800 Subject: [PATCH] [Feature-#1862][s3] Added support for reading compressed format files , writing multiple objects etc. --- .../chunjun-connector-s3/pom.xml | 4 + .../chunjun/connector/s3/config/S3Config.java | 9 ++ .../connector/s3/enums/CompressType.java | 24 +++++ .../connector/s3/sink/S3DynamicTableSink.java | 6 +- .../connector/s3/sink/S3OutputFormat.java | 94 +++++++++++++++---- .../s3/source/S3DynamicTableSource.java | 2 + .../connector/s3/source/S3InputFormat.java | 11 ++- .../s3/table/S3DynamicTableFactory.java | 18 +++- .../connector/s3/table/options/S3Options.java | 27 ++++++ .../chunjun/connector/s3/util/S3Util.java | 8 ++ .../com/dtstack/chunjun/util/JsonUtil.java | 4 +- 11 files changed, 187 insertions(+), 20 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/enums/CompressType.java diff --git a/chunjun-connectors/chunjun-connector-s3/pom.xml b/chunjun-connectors/chunjun-connector-s3/pom.xml index 4ea7eb9634..51faa129a1 100644 --- a/chunjun-connectors/chunjun-connector-s3/pom.xml +++ b/chunjun-connectors/chunjun-connector-s3/pom.xml @@ -71,6 +71,10 @@ + + org.apache.maven.plugins + maven-shade-plugin + org.apache.maven.plugins maven-antrun-plugin diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java index 3c962b4cea..2fe73532da 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java @@ -77,4 +77,13 @@ public class S3Config extends CommonConfig implements Serializable { * Default is false. */ private boolean safetySwitch = false; + + /** 压缩方式 */ + private String compress; + + /** 是否写一个还是多个对象 */ + private boolean writeSingleObject = true; + + /** 生成的文件名后缀 */ + private String suffix; } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/enums/CompressType.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/enums/CompressType.java new file mode 100644 index 0000000000..14a533c64c --- /dev/null +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/enums/CompressType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dtstack.chunjun.connector.s3.enums; + +public enum CompressType { + GZIP, + BZIP2, + ZIP; +} diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java index 6aa6cff90b..e50a2e1125 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java @@ -19,6 +19,7 @@ package com.dtstack.chunjun.connector.s3.sink; import com.dtstack.chunjun.config.FieldConfig; +import com.dtstack.chunjun.config.SpeedConfig; import com.dtstack.chunjun.config.TypeConfig; import com.dtstack.chunjun.connector.s3.config.S3Config; import com.dtstack.chunjun.connector.s3.converter.S3SqlConverter; @@ -66,11 +67,14 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } s3Config.setColumn(columnList); S3OutputFormatBuilder builder = new S3OutputFormatBuilder(new S3OutputFormat()); + builder.setSpeedConf(new SpeedConfig()); builder.setS3Conf(s3Config); builder.setRowConverter( new S3SqlConverter(InternalTypeInfo.of(logicalType).toRowType(), s3Config)); - return SinkFunctionProvider.of(new DtOutputFormatSinkFunction<>(builder.finish()), 1); + int sinkParallelism = s3Config.getParallelism() == null ? 1 : s3Config.getParallelism(); + return SinkFunctionProvider.of( + new DtOutputFormatSinkFunction<>(builder.finish()), sinkParallelism); } @Override diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java index 4cd770373f..7abd7d4c87 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java @@ -22,6 +22,7 @@ import com.dtstack.chunjun.connector.s3.config.S3Config; import com.dtstack.chunjun.connector.s3.util.S3Util; import com.dtstack.chunjun.connector.s3.util.WriterUtil; +import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.restore.FormatState; import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; import com.dtstack.chunjun.throwable.ChunJunRuntimeException; @@ -32,7 +33,6 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.PartETag; -import com.esotericsoftware.minlog.Log; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -54,10 +54,10 @@ public class S3OutputFormat extends BaseRichOutputFormat { private S3Config s3Config; /** Must start at 1 and cannot be greater than 10,000 */ - private static int currentPartNumber; + private int currentPartNumber; - private static String currentUploadId; - private static boolean willClose = false; + private String currentUploadId; + private boolean willClose = false; private transient StringWriter sw; private transient List myPartETags; @@ -66,22 +66,50 @@ public class S3OutputFormat extends BaseRichOutputFormat { private static final long MIN_SIZE = 1024 * 1024 * 25L; + @Override + public void initializeGlobal(int parallelism) { + this.amazonS3 = S3Util.getS3Client(s3Config); + if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) { + checkOutputDir(); + } + } + @Override protected void openInternal(int taskNumber, int numTasks) { + // 写多个对象时 + if (!s3Config.isWriteSingleObject()) { + s3Config.setObject( + s3Config.getObject() + + ConstantValue.SINGLE_SLASH_SYMBOL + + jobId + + "_" + + taskNumber + + getExtension()); + } else { + // 写单个对象时 + if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) { + // 当写入模式是overwrite时 + s3Config.setObject(s3Config.getObject() + getExtension()); + } else { + // 当写入模式是append时 + s3Config.setObject( + s3Config.getObject() + "_" + jobId + "_" + taskNumber + getExtension()); + } + } + log.info("current write object name: {}", s3Config.getObject()); + List column = s3Config.getColumn(); + columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList()); + columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList()); openSource(); restore(); - checkOutputDir(); createActionFinishedTag(); nextBlock(); - List column = s3Config.getColumn(); - columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList()); - columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList()); } private void openSource() { this.amazonS3 = S3Util.getS3Client(s3Config); this.myPartETags = new ArrayList<>(); - this.currentPartNumber = taskNumber - numTasks + 1; + this.currentPartNumber = 0; beforeWriteRecords(); } @@ -95,17 +123,50 @@ private void restore() { } private void checkOutputDir() { - if (S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) { - if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) { - S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject()); + // 覆盖写单个对象时 + if (s3Config.isWriteSingleObject() + && S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) { + S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject()); + } + // 覆盖写多个对象时 + if (!s3Config.isWriteSingleObject()) { + List subObjects; + if (s3Config.isUseV2()) { + subObjects = + S3Util.listObjectsKeyByPrefix( + amazonS3, + s3Config.getBucket(), + s3Config.getObject(), + s3Config.getFetchSize()); + } else { + subObjects = + S3Util.listObjectsByv1( + amazonS3, + s3Config.getBucket(), + s3Config.getObject(), + s3Config.getFetchSize()); } + String[] keys = subObjects.toArray(new String[] {}); + S3Util.deleteObjects(amazonS3, s3Config.getBucket(), keys); + log.info("delete objects num:" + keys.length); + log.debug("delete objects list:" + StringUtils.join(keys, ",")); + } + } + + public String getExtension() { + if (StringUtils.isNotBlank(s3Config.getSuffix())) { + return s3Config.getSuffix(); + } else { + return ""; } } private void nextBlock() { - sw = new StringWriter(); + if (sw == null) { + sw = new StringWriter(); + } this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter()); - this.currentPartNumber = this.currentPartNumber + numTasks; + this.currentPartNumber = this.currentPartNumber + 1; } /** Create file multipart upload ID */ @@ -159,6 +220,7 @@ protected void flushDataInternal() { myPartETags.stream().map(Objects::toString).collect(Collectors.joining(","))); writerUtil.close(); writerUtil = null; + sw = null; } } @@ -192,7 +254,7 @@ public void closeInternal() { flushDataInternal(); completeMultipartUploadFile(); S3Util.closeS3(amazonS3); - Log.info("S3Client close!"); + log.info("S3Client close!"); } @Override @@ -218,7 +280,7 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce } String[] stringRecord = new String[columnNameList.size()]; // convert row to string - rowConverter.toExternal(rowData, stringRecord); + stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord); try { for (int i = 0; i < columnNameList.size(); ++i) { diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java index 03ad034225..5ee4beec7b 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java @@ -19,6 +19,7 @@ package com.dtstack.chunjun.connector.s3.source; import com.dtstack.chunjun.config.FieldConfig; +import com.dtstack.chunjun.config.RestoreConfig; import com.dtstack.chunjun.config.TypeConfig; import com.dtstack.chunjun.connector.s3.config.S3Config; import com.dtstack.chunjun.connector.s3.converter.S3SqlConverter; @@ -70,6 +71,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon } s3Config.setColumn(columnList); S3InputFormatBuilder builder = new S3InputFormatBuilder(new S3InputFormat()); + builder.setRestoreConf(new RestoreConfig()); builder.setRowConverter( new S3SqlConverter(InternalTypeInfo.of(logicalType).toRowType(), s3Config)); builder.setS3Conf(s3Config); diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java index 56c5a47a69..c450c96854 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java @@ -20,6 +20,7 @@ import com.dtstack.chunjun.config.RestoreConfig; import com.dtstack.chunjun.connector.s3.config.S3Config; +import com.dtstack.chunjun.connector.s3.enums.CompressType; import com.dtstack.chunjun.connector.s3.util.ReaderUtil; import com.dtstack.chunjun.connector.s3.util.S3SimpleObject; import com.dtstack.chunjun.connector.s3.util.S3Util; @@ -40,6 +41,7 @@ import org.apache.commons.lang3.StringUtils; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashSet; @@ -50,6 +52,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; @Slf4j public class S3InputFormat extends BaseRichInputFormat { @@ -195,9 +198,15 @@ public boolean reachedEndWithoutCheckState() throws IOException { // the file has not been read S3Object o = amazonS3.getObject(rangeObjectRequest); S3ObjectInputStream s3is = o.getObjectContent(); + InputStream inputStream = s3is; + if (StringUtils.isNotEmpty(s3Config.getCompress())) { + if (CompressType.GZIP.name().equalsIgnoreCase(s3Config.getCompress())) { + inputStream = new GZIPInputStream(s3is); + } + } readerUtil = new ReaderUtil( - new InputStreamReader(s3is, s3Config.getEncoding()), + new InputStreamReader(inputStream, s3Config.getEncoding()), s3Config.getFieldDelimiter(), 0L, s3Config.isSafetySwitch()); diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java index dc11452756..fa05abaf7f 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java @@ -22,6 +22,7 @@ import com.dtstack.chunjun.connector.s3.sink.S3DynamicTableSink; import com.dtstack.chunjun.connector.s3.source.S3DynamicTableSource; import com.dtstack.chunjun.connector.s3.table.options.S3Options; +import com.dtstack.chunjun.table.options.SinkOptions; import com.dtstack.chunjun.util.GsonUtil; import org.apache.flink.configuration.ConfigOption; @@ -58,7 +59,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { s3Config.setEncoding(options.get(S3Options.ENCODING)); s3Config.setRegion(options.get(S3Options.REGION)); s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER)); - + s3Config.setEndpoint(options.get(S3Options.ENDPOINT)); + s3Config.setCompress(options.get(S3Options.COMPRESS)); return new S3DynamicTableSource(context.getCatalogTable().getResolvedSchema(), s3Config); } @@ -85,6 +87,13 @@ public Set> optionalOptions() { options.add(S3Options.IS_FIRST_LINE_HEADER); options.add(S3Options.OBJECTS); options.add(S3Options.OBJECT); + options.add(S3Options.ENDPOINT); + options.add(S3Options.COMPRESS); + options.add(S3Options.WRITE_SINGLE_OBJECT); + options.add(S3Options.USE_V2); + options.add(S3Options.SUFFIX); + options.add(SinkOptions.SINK_PARALLELISM); + options.add(S3Options.WRITE_MODE); return options; } @@ -105,6 +114,13 @@ public DynamicTableSink createDynamicTableSink(Context context) { s3Config.setEncoding(options.get(S3Options.ENCODING)); s3Config.setRegion(options.get(S3Options.REGION)); s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER)); + s3Config.setEndpoint(options.get(S3Options.ENDPOINT)); + s3Config.setCompress(options.get(S3Options.COMPRESS)); + s3Config.setWriteSingleObject(options.get(S3Options.WRITE_SINGLE_OBJECT)); + s3Config.setUseV2(options.get(S3Options.USE_V2)); + s3Config.setSuffix(options.get(S3Options.SUFFIX)); + s3Config.setParallelism(options.get(SinkOptions.SINK_PARALLELISM)); + s3Config.setWriteMode(options.get(S3Options.WRITE_MODE)); return new S3DynamicTableSink(context.getCatalogTable().getResolvedSchema(), s3Config); } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java index 8cd87c2f64..1f6236438f 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java @@ -68,4 +68,31 @@ public class S3Options { .defaultValue(false) .withDescription( "whether the first line is a header line, if so, the first line is not read"); + + public static final ConfigOption ENDPOINT = + key("endpoint").stringType().noDefaultValue().withDescription("endpoint"); + + public static final ConfigOption COMPRESS = + key("compress") + .stringType() + .noDefaultValue() + .withDescription("s3 file compression type"); + + public static final ConfigOption WRITE_SINGLE_OBJECT = + key("writeSingleObject") + .booleanType() + .defaultValue(true) + .withDescription("whether to write a single or multiple objects"); + + public static final ConfigOption USE_V2 = + key("useV2") + .booleanType() + .defaultValue(true) + .withDescription("Get the api version of the number of files in a directory"); + + public static final ConfigOption SUFFIX = + key("suffix").stringType().noDefaultValue().withDescription("s3 file suffix"); + + public static final ConfigOption WRITE_MODE = + key("writeMode").stringType().defaultValue("overwrite").withDescription("writeMode"); } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java index 2e783b0c41..a7d849a4a0 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java @@ -32,6 +32,7 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; @@ -196,6 +197,13 @@ public static void deleteObject(AmazonS3 s3Client, String bucketName, String obj s3Client.deleteObject(bucketName, object); } + public static void deleteObjects(AmazonS3 s3Client, String bucketName, String[] keys) { + if (keys.length > 0) { + DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys); + s3Client.deleteObjects(request); + } + } + public static void closeS3(AmazonS3 amazonS3) { if (amazonS3 != null) { amazonS3.shutdown(); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java b/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java index e9bd724c71..0db2597056 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java @@ -110,7 +110,9 @@ public static String toPrintJson(Object obj) { Map result = objectMapper.readValue(objectMapper.writeValueAsString(obj), HashMap.class); MapUtil.replaceAllElement( - result, Lists.newArrayList("pwd", "password", "druid.password", "secretKey"), "******"); + result, + Lists.newArrayList("pwd", "password", "druid.password", "secretKey"), + "******"); return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(result); } catch (Exception e) { throw new RuntimeException("error parse [" + obj + "] to json", e);