Skip to content

Commit

Permalink
[Feature-DTStack#1877][ftp] Supports reading Chinese paths and optimi…
Browse files Browse the repository at this point in the history
…zing reading excel

[Feature-DTStack#1877][ftp] Supports reading Chinese paths and optimizing reading excel
  • Loading branch information
libailin authored and lihongwei committed Mar 1, 2024
1 parent 03ab05a commit 8ed1323
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 14 deletions.
2 changes: 1 addition & 1 deletion chunjun-connectors/chunjun-connector-ftp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ under the License.
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>3.0.1</version>
<version>3.2.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,35 @@

package com.dtstack.chunjun.connector.ftp.client.excel;

import com.dtstack.chunjun.util.DateUtil;

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;

import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ExcelReadListener implements ReadListener<Map<Integer, String>> {
public class ExcelReadListener implements ReadListener<Map<Integer, Object>> {

private final BlockingQueue<Row> queue = new LinkedBlockingQueue<>(4096);

@Override
public void invoke(Map<Integer, String> data, AnalysisContext context) {
public void invoke(Map<Integer, Object> data, AnalysisContext context) {
String[] piece = new String[data.size()];
for (Map.Entry<Integer, String> entry : data.entrySet()) {
piece[entry.getKey()] = entry.getValue() == null ? "" : entry.getValue();
for (Map.Entry<Integer, Object> entry : data.entrySet()) {
String value = "";
if (entry.getValue() != null) {
if (entry.getValue() instanceof LocalDateTime) {
value =
DateUtil.timestampToString(
DateUtil.localDateTimetoDate((LocalDateTime) entry.getValue()));
} else {
value = String.valueOf(entry.getValue());
}
}
piece[entry.getKey()] = value;
}
Row row =
new Row(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@
package com.dtstack.chunjun.connector.ftp.converter;

import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.IDeserializationConverter;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public class FtpSqlConverter extends AbstractRowConverter<String, String, String, LogicalType> {
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;

public class FtpSqlConverter extends AbstractRowConverter<String[], String, String, LogicalType> {

private static final long serialVersionUID = 4127516611259169686L;

Expand All @@ -41,10 +51,59 @@ public FtpSqlConverter(SerializationSchema<RowData> valueSerialization) {
this.valueSerialization = valueSerialization;
}

public FtpSqlConverter(RowType rowType) {
super(rowType);
for (int i = 0; i < rowType.getFieldCount(); i++) {
toInternalConverters.add(
wrapIntoNullableInternalConverter(
createInternalConverter(rowType.getTypeAt(i))));
}
}

@Override
protected IDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case INTEGER:
return val -> Integer.valueOf((String) val);
case BIGINT:
return val -> Long.valueOf((String) val);
case FLOAT:
return val -> Float.valueOf((String) val);
case DOUBLE:
return val -> Double.valueOf((String) val);
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return val -> {
BigDecimal decimal = new BigDecimal(String.valueOf(val));
return DecimalData.fromBigDecimal(decimal, precision, scale);
};
case CHAR:
case VARCHAR:
return val -> StringData.fromString((String) val);
case DATE:
return val ->
(int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return val ->
(int)
((Time.valueOf(String.valueOf(val))).toLocalTime().toNanoOfDay()
/ 1_000_000L);
default:
throw new UnsupportedOperationException(type.toString());
}
}

@Override
public RowData toInternal(String input) throws Exception {
valueDeserialization.open(new DummyInitializationContext());
return valueDeserialization.deserialize(input.getBytes());
public RowData toInternal(String[] input) throws Exception {
GenericRowData rowData = new GenericRowData(input.length);
for (int i = 0; i < fieldTypes.length; i++) {
rowData.setField(i, toInternalConverters.get(i).deserialize(input[i]));
}
return rowData;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class FtpHandler implements DTFtpHandler {
private FTPClient ftpClient = null;
private String controlEncoding;
private FtpConfig ftpConfig;
private boolean isEncodePath = true;

public FTPClient getFtpClient() {
return ftpClient;
Expand All @@ -57,6 +58,8 @@ public void loginFtpServer(FtpConfig ftpConfig) {
controlEncoding = ftpConfig.getControlEncoding();
ftpClient = new FTPClient();
try {
// 设置编码: 解决中文路径问题, 需要在连接前设置编码
ftpClient.setControlEncoding(controlEncoding);
// 连接
ftpClient.connect(ftpConfig.getHost(), ftpConfig.getPort());
// 登录
Expand Down Expand Up @@ -87,12 +90,18 @@ public void loginFtpServer(FtpConfig ftpConfig) {
log.error(message);
throw new RuntimeException(message);
}
ftpClient.setControlEncoding(ftpConfig.getControlEncoding());
ftpClient.setListHiddenFiles(ftpConfig.isListHiddenFiles());
if (StringUtils.isNotEmpty(ftpConfig.getCompressType())) {
// 设置文件传输类型为二进制
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
}
// 开启服务器对UTF-8的支持,解决读取中文路径或者中文文件名失败的问题
if (FTPReply.isPositiveCompletion(ftpClient.sendCommand("OPTS UTF8", "ON"))) {
log.info("ftp server support UTF-8");
isEncodePath = false;
} else {
log.warn("ftp server not support UTF-8");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -440,7 +449,10 @@ private boolean isExist(String path) {
}

private String encodePath(String path) throws UnsupportedEncodingException {
return new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING);
if (isEncodePath) {
return new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING);
}
return new String(path.getBytes(controlEncoding));
}

public void reconnectFtp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.alibaba.excel.enums.ReadDefaultReturnEnum.ACTUAL_DATA;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

@Slf4j
Expand All @@ -62,13 +63,17 @@ public void open(File file, InputStream inputStream, IFormatConfig config) {
ExcelReadListener listener = new ExcelReadListener();
this.queue = listener.getQueue();
this.ec = new ExcelSubExceptionCarrier();

ExcelReaderBuilder builder = EasyExcel.read(inputStream, listener);
if (!config.isFirstLineHeader()) {
builder.headRowNumber(0);
}
builder.ignoreEmptyRow(true);
builder.autoCloseStream(true);
// @since 3.2.0
// STRING:会返回一个Map<Integer,String>的数组,返回值就是你在excel里面不点击单元格看到的内容
// ACTUAL_DATA:会返回一个Map<Integer,Object>的数组,返回实际上存储的数据,会帮自动转换类型,Object类型为BigDecimal、Boolean、String、LocalDateTime、null,中的一个,
// READ_CELL_DATA: 会返回一个Map<Integer,ReadCellData<?>>的数组,其中?类型参照ACTUAL_DATA的
builder.readDefaultReturn(ACTUAL_DATA);
ExcelReader reader = builder.build();

this.sheetNum = reader.excelExecutor().sheetList().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
builder.setFtpConfig(ftpConfig);
builder.setRowConverter(
new FtpSqlConverter(
decodingFormat.createRuntimeDecoder(runtimeProviderContext, dataType)));
InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType())
.toRowType()));

return ParallelSourceFunctionProvider.of(
new DtInputFormatSourceFunction<>(builder.finish(), typeInformation),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException
}

if (rowConverter instanceof FtpSqlConverter) {
rowData = rowConverter.toInternal(String.join(",", fields));
// 解决数据里包含特殊符号(逗号、换行符)
rowData = rowConverter.toInternal(fields);
} else if (rowConverter instanceof FtpSyncConverter) {

List<FieldConfig> columns = ftpConfig.getColumn();
Expand Down

0 comments on commit 8ed1323

Please sign in to comment.