Skip to content

Commit

Permalink
[Feature-#1862][s3] Added support for reading compressed format files
Browse files Browse the repository at this point in the history
[Feature-#1862][s3] Added support for reading compressed format files , writing multiple objects etc.
  • Loading branch information
libailin authored and lihongwei committed Jan 26, 2024
1 parent eabf986 commit 415d7df
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 20 deletions.
4 changes: 4 additions & 0 deletions chunjun-connectors/chunjun-connector-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,13 @@ public class S3Config extends CommonConfig implements Serializable {
* Default is false.
*/
private boolean safetySwitch = false;

/** 压缩方式 */
private String compress;

/** 是否写一个还是多个对象 */
private boolean writeSingleObject = true;

/** 生成的文件名后缀 */
private String suffix;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.chunjun.connector.s3.enums;

public enum CompressType {
GZIP,
BZIP2,
ZIP;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.dtstack.chunjun.connector.s3.sink;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.SpeedConfig;
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.converter.S3SqlConverter;
Expand Down Expand Up @@ -66,11 +67,14 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
}
s3Config.setColumn(columnList);
S3OutputFormatBuilder builder = new S3OutputFormatBuilder(new S3OutputFormat());
builder.setSpeedConf(new SpeedConfig());
builder.setS3Conf(s3Config);
builder.setRowConverter(
new S3SqlConverter(InternalTypeInfo.of(logicalType).toRowType(), s3Config));

return SinkFunctionProvider.of(new DtOutputFormatSinkFunction<>(builder.finish()), 1);
int sinkParallelism = s3Config.getParallelism() == null ? 1 : s3Config.getParallelism();
return SinkFunctionProvider.of(
new DtOutputFormatSinkFunction<>(builder.finish()), sinkParallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.util.S3Util;
import com.dtstack.chunjun.connector.s3.util.WriterUtil;
import com.dtstack.chunjun.constants.ConstantValue;
import com.dtstack.chunjun.restore.FormatState;
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
Expand All @@ -32,7 +33,6 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PartETag;
import com.esotericsoftware.minlog.Log;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -54,10 +54,10 @@ public class S3OutputFormat extends BaseRichOutputFormat {
private S3Config s3Config;

/** Must start at 1 and cannot be greater than 10,000 */
private static int currentPartNumber;
private int currentPartNumber;

private static String currentUploadId;
private static boolean willClose = false;
private String currentUploadId;
private boolean willClose = false;
private transient StringWriter sw;
private transient List<MyPartETag> myPartETags;

Expand All @@ -66,22 +66,50 @@ public class S3OutputFormat extends BaseRichOutputFormat {

private static final long MIN_SIZE = 1024 * 1024 * 25L;

@Override
public void initializeGlobal(int parallelism) {
this.amazonS3 = S3Util.getS3Client(s3Config);
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
checkOutputDir();
}
}

@Override
protected void openInternal(int taskNumber, int numTasks) {
// 写多个对象时
if (!s3Config.isWriteSingleObject()) {
s3Config.setObject(
s3Config.getObject()
+ ConstantValue.SINGLE_SLASH_SYMBOL
+ jobId
+ "_"
+ taskNumber
+ getExtension());
} else {
// 写单个对象时
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
// 当写入模式是overwrite时
s3Config.setObject(s3Config.getObject() + getExtension());
} else {
// 当写入模式是append时
s3Config.setObject(
s3Config.getObject() + "_" + jobId + "_" + taskNumber + getExtension());
}
}
log.info("current write object name: {}", s3Config.getObject());
List<FieldConfig> column = s3Config.getColumn();
columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList());
columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList());
openSource();
restore();
checkOutputDir();
createActionFinishedTag();
nextBlock();
List<FieldConfig> column = s3Config.getColumn();
columnNameList = column.stream().map(FieldConfig::getName).collect(Collectors.toList());
columnTypeList = column.stream().map(FieldConfig::getType).collect(Collectors.toList());
}

private void openSource() {
this.amazonS3 = S3Util.getS3Client(s3Config);
this.myPartETags = new ArrayList<>();
this.currentPartNumber = taskNumber - numTasks + 1;
this.currentPartNumber = 0;
beforeWriteRecords();
}

Expand All @@ -95,17 +123,50 @@ private void restore() {
}

private void checkOutputDir() {
if (S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) {
if (OVERWRITE_MODE.equalsIgnoreCase(s3Config.getWriteMode())) {
S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject());
// 覆盖写单个对象时
if (s3Config.isWriteSingleObject()
&& S3Util.doesObjectExist(amazonS3, s3Config.getBucket(), s3Config.getObject())) {
S3Util.deleteObject(amazonS3, s3Config.getBucket(), s3Config.getObject());
}
// 覆盖写多个对象时
if (!s3Config.isWriteSingleObject()) {
List<String> subObjects;
if (s3Config.isUseV2()) {
subObjects =
S3Util.listObjectsKeyByPrefix(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
s3Config.getFetchSize());
} else {
subObjects =
S3Util.listObjectsByv1(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
s3Config.getFetchSize());
}
String[] keys = subObjects.toArray(new String[] {});
S3Util.deleteObjects(amazonS3, s3Config.getBucket(), keys);
log.info("delete objects num:" + keys.length);
log.debug("delete objects list:" + StringUtils.join(keys, ","));
}
}

public String getExtension() {
if (StringUtils.isNotBlank(s3Config.getSuffix())) {
return s3Config.getSuffix();
} else {
return "";
}
}

private void nextBlock() {
sw = new StringWriter();
if (sw == null) {
sw = new StringWriter();
}
this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter());
this.currentPartNumber = this.currentPartNumber + numTasks;
this.currentPartNumber = this.currentPartNumber + 1;
}

/** Create file multipart upload ID */
Expand Down Expand Up @@ -159,6 +220,7 @@ protected void flushDataInternal() {
myPartETags.stream().map(Objects::toString).collect(Collectors.joining(",")));
writerUtil.close();
writerUtil = null;
sw = null;
}
}

Expand Down Expand Up @@ -192,7 +254,7 @@ public void closeInternal() {
flushDataInternal();
completeMultipartUploadFile();
S3Util.closeS3(amazonS3);
Log.info("S3Client close!");
log.info("S3Client close!");
}

@Override
Expand All @@ -218,7 +280,7 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
}
String[] stringRecord = new String[columnNameList.size()];
// convert row to string
rowConverter.toExternal(rowData, stringRecord);
stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord);
try {
for (int i = 0; i < columnNameList.size(); ++i) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.dtstack.chunjun.connector.s3.source;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.RestoreConfig;
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.converter.S3SqlConverter;
Expand Down Expand Up @@ -70,6 +71,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
}
s3Config.setColumn(columnList);
S3InputFormatBuilder builder = new S3InputFormatBuilder(new S3InputFormat());
builder.setRestoreConf(new RestoreConfig());
builder.setRowConverter(
new S3SqlConverter(InternalTypeInfo.of(logicalType).toRowType(), s3Config));
builder.setS3Conf(s3Config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.dtstack.chunjun.config.RestoreConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.enums.CompressType;
import com.dtstack.chunjun.connector.s3.util.ReaderUtil;
import com.dtstack.chunjun.connector.s3.util.S3SimpleObject;
import com.dtstack.chunjun.connector.s3.util.S3Util;
Expand All @@ -40,6 +41,7 @@
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -50,6 +52,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;

@Slf4j
public class S3InputFormat extends BaseRichInputFormat {
Expand Down Expand Up @@ -195,9 +198,15 @@ public boolean reachedEndWithoutCheckState() throws IOException {
// the file has not been read
S3Object o = amazonS3.getObject(rangeObjectRequest);
S3ObjectInputStream s3is = o.getObjectContent();
InputStream inputStream = s3is;
if (StringUtils.isNotEmpty(s3Config.getCompress())) {
if (CompressType.GZIP.name().equalsIgnoreCase(s3Config.getCompress())) {
inputStream = new GZIPInputStream(s3is);
}
}
readerUtil =
new ReaderUtil(
new InputStreamReader(s3is, s3Config.getEncoding()),
new InputStreamReader(inputStream, s3Config.getEncoding()),
s3Config.getFieldDelimiter(),
0L,
s3Config.isSafetySwitch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dtstack.chunjun.connector.s3.sink.S3DynamicTableSink;
import com.dtstack.chunjun.connector.s3.source.S3DynamicTableSource;
import com.dtstack.chunjun.connector.s3.table.options.S3Options;
import com.dtstack.chunjun.table.options.SinkOptions;
import com.dtstack.chunjun.util.GsonUtil;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -58,7 +59,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
s3Config.setEncoding(options.get(S3Options.ENCODING));
s3Config.setRegion(options.get(S3Options.REGION));
s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER));

s3Config.setEndpoint(options.get(S3Options.ENDPOINT));
s3Config.setCompress(options.get(S3Options.COMPRESS));
return new S3DynamicTableSource(context.getCatalogTable().getResolvedSchema(), s3Config);
}

Expand All @@ -85,6 +87,13 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(S3Options.IS_FIRST_LINE_HEADER);
options.add(S3Options.OBJECTS);
options.add(S3Options.OBJECT);
options.add(S3Options.ENDPOINT);
options.add(S3Options.COMPRESS);
options.add(S3Options.WRITE_SINGLE_OBJECT);
options.add(S3Options.USE_V2);
options.add(S3Options.SUFFIX);
options.add(SinkOptions.SINK_PARALLELISM);
options.add(S3Options.WRITE_MODE);
return options;
}

Expand All @@ -105,6 +114,13 @@ public DynamicTableSink createDynamicTableSink(Context context) {
s3Config.setEncoding(options.get(S3Options.ENCODING));
s3Config.setRegion(options.get(S3Options.REGION));
s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER));
s3Config.setEndpoint(options.get(S3Options.ENDPOINT));
s3Config.setCompress(options.get(S3Options.COMPRESS));
s3Config.setWriteSingleObject(options.get(S3Options.WRITE_SINGLE_OBJECT));
s3Config.setUseV2(options.get(S3Options.USE_V2));
s3Config.setSuffix(options.get(S3Options.SUFFIX));
s3Config.setParallelism(options.get(SinkOptions.SINK_PARALLELISM));
s3Config.setWriteMode(options.get(S3Options.WRITE_MODE));

return new S3DynamicTableSink(context.getCatalogTable().getResolvedSchema(), s3Config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,31 @@ public class S3Options {
.defaultValue(false)
.withDescription(
"whether the first line is a header line, if so, the first line is not read");

public static final ConfigOption<String> ENDPOINT =
key("endpoint").stringType().noDefaultValue().withDescription("endpoint");

public static final ConfigOption<String> COMPRESS =
key("compress")
.stringType()
.noDefaultValue()
.withDescription("s3 file compression type");

public static final ConfigOption<Boolean> WRITE_SINGLE_OBJECT =
key("writeSingleObject")
.booleanType()
.defaultValue(true)
.withDescription("whether to write a single or multiple objects");

public static final ConfigOption<Boolean> USE_V2 =
key("useV2")
.booleanType()
.defaultValue(true)
.withDescription("Get the api version of the number of files in a directory");

public static final ConfigOption<String> SUFFIX =
key("suffix").stringType().noDefaultValue().withDescription("s3 file suffix");

public static final ConfigOption<String> WRITE_MODE =
key("writeMode").stringType().defaultValue("overwrite").withDescription("writeMode");
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
Expand Down Expand Up @@ -196,6 +197,13 @@ public static void deleteObject(AmazonS3 s3Client, String bucketName, String obj
s3Client.deleteObject(bucketName, object);
}

public static void deleteObjects(AmazonS3 s3Client, String bucketName, String[] keys) {
if (keys.length > 0) {
DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
s3Client.deleteObjects(request);
}
}

public static void closeS3(AmazonS3 amazonS3) {
if (amazonS3 != null) {
amazonS3.shutdown();
Expand Down
Loading

0 comments on commit 415d7df

Please sign in to comment.