Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-#1889][ftp] read support column index and sheetNO #1890

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,39 @@

package com.dtstack.chunjun.connector.ftp.client.excel;

import com.dtstack.chunjun.connector.ftp.extend.ftp.IFormatConfig;

import com.alibaba.excel.ExcelReader;
import com.alibaba.excel.read.metadata.ReadSheet;

import java.util.ArrayList;
import java.util.List;

public class ExcelReaderExecutor implements Runnable {

private final ExcelReader reader;
private ExcelSubExceptionCarrier ec;
private IFormatConfig config;

public ExcelReaderExecutor(ExcelReader reader, ExcelSubExceptionCarrier ec) {
public ExcelReaderExecutor(
ExcelReader reader, ExcelSubExceptionCarrier ec, IFormatConfig config) {
this.reader = reader;
this.ec = ec;
this.config = config;
}

@Override
public void run() {
try {
reader.readAll();
if (config.getSheetNo() != null) {
List<ReadSheet> readSheetList = new ArrayList<>();
for (int i = 0; i < config.getSheetNo().size(); i++) {
readSheetList.add(new ReadSheet(config.getSheetNo().get(i)));
}
reader.read(readSheetList);
} else {
reader.readAll();
}
} catch (Exception e) {
ec.setThrowable(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.List;
import java.util.Map;

import static com.dtstack.chunjun.connector.ftp.config.ConfigConstants.DEFAULT_FTP_PORT;
Expand Down Expand Up @@ -91,4 +92,10 @@ public void setDefaultPort() {
port = DEFAULT_FTP_PORT;
}
}

/** 工作表 */
public List<Integer> sheetNo;

/** 字段对应的列索引 */
public List<Integer> columnIndex;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Data;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

@Data
Expand All @@ -45,4 +46,6 @@ public class IFormatConfig implements Serializable {

/* 行分隔符 */
private String columnDelimiter;

public List<Integer> sheetNo;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void open(File file, InputStream inputStream, IFormatConfig config) {
.namingPattern("excel-schedule-pool-%d")
.daemon(false)
.build());
ExcelReaderExecutor executor = new ExcelReaderExecutor(reader, ec);
ExcelReaderExecutor executor = new ExcelReaderExecutor(reader, ec, config);
executorService.execute(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,14 @@ public class FtpOptions extends BaseFileOptions {
.stringType()
.noDefaultValue()
.withDescription("compress type");
public static final ConfigOption<String> SHEET_NO =
ConfigOptions.key("sheet-no")
.stringType()
.noDefaultValue()
.withDescription("sheet no, Multiple numbers separated by commas(,)");
public static final ConfigOption<String> COLUMN_INDEX =
ConfigOptions.key("column-index")
.stringType()
.noDefaultValue()
.withDescription("column index, Multiple numbers separated by commas(,)");
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException
}

if (rowConverter instanceof FtpSqlConverter) {
// 处理字段配置了对应的列索引
if (ftpConfig.getColumnIndex() != null) {
List<FieldConfig> columns = ftpConfig.getColumn();
String[] fieldsData = new String[columns.size()];
for (int i = 0; i < CollectionUtils.size(columns); i++) {
FieldConfig fieldConfig = columns.get(i);
if (fieldConfig.getIndex() >= fields.length) {
String errorMessage =
String.format(
"The column index is greater than the data size."
+ " The current column index is [%s], but the data size is [%s]. Data loss may occur.",
fieldConfig.getIndex(), fields.length);
throw new IllegalArgumentException(errorMessage);
}
fieldsData[i] = fields[fieldConfig.getIndex()];
}
fields = fieldsData;
}
// 解决数据里包含特殊符号(逗号、换行符)
rowData = rowConverter.toInternal(fields);
} else if (rowConverter instanceof FtpSyncConverter) {
Expand Down Expand Up @@ -278,6 +296,7 @@ private IFormatConfig buildIFormatConfig(FtpConfig ftpConfig) {
iFormatConfig.setFetchMaxSize(ftpConfig.getMaxFetchSize());
iFormatConfig.setParallelism(ftpConfig.getParallelism());
iFormatConfig.setColumnDelimiter(ftpConfig.getColumnDelimiter());
iFormatConfig.setSheetNo(ftpConfig.getSheetNo());

return iFormatConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class FtpDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

Expand Down Expand Up @@ -99,6 +101,20 @@ private static FtpConfig getFtpConfByOptions(ReadableConfig config) {
if (config.get(FtpOptions.FIRST_LINE_HEADER) != null) {
ftpConfig.setFirstLineHeader(config.get(FtpOptions.FIRST_LINE_HEADER));
}
if (StringUtils.isNotBlank(config.get(FtpOptions.SHEET_NO))) {
List<Integer> sheetNo =
Arrays.stream(config.get(FtpOptions.SHEET_NO).split(","))
.map(Integer::parseInt)
.collect(Collectors.toList());
ftpConfig.setSheetNo(sheetNo);
}
if (StringUtils.isNotBlank(config.get(FtpOptions.COLUMN_INDEX))) {
List<Integer> columnIndex =
Arrays.stream(config.get(FtpOptions.COLUMN_INDEX).split(","))
.map(Integer::parseInt)
.collect(Collectors.toList());
ftpConfig.setColumnIndex(columnIndex);
}
return ftpConfig;
}

Expand All @@ -118,13 +134,24 @@ public DynamicTableSource createDynamicTableSource(Context context) {

List<Column> columns = resolvedSchema.getColumns();
FtpConfig ftpConfig = getFtpConfByOptions(config);
if (ftpConfig.getColumnIndex() != null
&& columns.size() != ftpConfig.getColumnIndex().size()) {
throw new IllegalArgumentException(
String.format(
"The number of fields (%s) is inconsistent with the number of indexes (%s).",
columns.size(), ftpConfig.getColumnIndex().size()));
}
List<FieldConfig> columnList = new ArrayList<>(columns.size());
for (Column column : columns) {
FieldConfig field = new FieldConfig();
field.setName(column.getName());
field.setType(
TypeConfig.fromString(column.getDataType().getLogicalType().asSummaryString()));
field.setIndex(columns.indexOf(column));
int index =
ftpConfig.getColumnIndex() != null
? ftpConfig.getColumnIndex().get(columns.indexOf(column))
: columns.indexOf(column);
field.setIndex(index);
columnList.add(field);
}
ftpConfig.setColumn(columnList);
Expand Down Expand Up @@ -199,6 +226,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(FtpOptions.COMPRESS_TYPE);
options.add(BaseFileOptions.NEXT_CHECK_ROWS);
options.add(BaseFileOptions.WRITE_MODE);
options.add(FtpOptions.SHEET_NO);
options.add(FtpOptions.COLUMN_INDEX);
return options;
}
}
Loading