Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimization of partition folder rename operations for ctas and inser… #385

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,8 @@ private void commitShared()
checkHoldsLock();

Committer committer = new Committer();
Map<String, Optional<Path>> ctasTablesMap = new HashMap<>();

try {
List<? extends ListenableFuture<?>> tableActionsFutures = this.tableActions.entrySet().stream()
.map(entry -> hiveMetastoreClientService.submit(() -> {
Expand All @@ -1237,6 +1239,7 @@ private void commitShared()
committer.prepareAlterTable(action.getHdfsContext(), action.getIdentity(), action.getData());
break;
case ADD:
ctasTablesMap.put(schemaTableName.toString(), action.getData().getCurrentLocation());
committer.prepareAddTable(action.getHdfsContext(), action.getData());
break;
case INSERT_EXISTING:
Expand All @@ -1257,6 +1260,21 @@ private void commitShared()
HiveIdentity identity = values.iterator().next().getIdentity();
Table table = getTable(identity, schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
AtomicBoolean isCtasWithPartitions = new AtomicBoolean(false);
String tableKey = table.getDatabaseName() + "." + table.getTableName();
if (ctasTablesMap.containsKey(tableKey)) {
//do the rename operation here from the staging dir to actual table dir for ctas operation
long partitionsSize = tableEntry.getValue().entrySet().stream().count();
if (partitionsSize > 0) {
isCtasWithPartitions.set(true);
Optional<Path> currentPath = ctasTablesMap.get(tableKey);
if (currentPath.isPresent()) {
HdfsContext hdfsContext = values.iterator().next().getHdfsContext();
committer.renameStagingDir(hdfsContext, currentPath.get(), new Path(table.getStorage().getLocation()));
}
}
}

List<? extends ListenableFuture<?>> partitionActionsFutures = tableEntry.getValue().entrySet().stream()
.map(partitionEntry -> hiveMetastoreClientService.submit(() -> {
List<String> partitionValues = partitionEntry.getKey();
Expand All @@ -1269,7 +1287,8 @@ private void commitShared()
committer.prepareAlterPartition(table, action.getHdfsContext(), action.getIdentity(), action.getData());
break;
case ADD:
committer.prepareAddPartition(table, action.getHdfsContext(), action.getIdentity(), action.getData());
committer.prepareAddPartition(table, action.getHdfsContext(), action.getIdentity(), action.getData(),
isCtasWithPartitions);
break;
case INSERT_EXISTING:
committer.prepareInsertExistingPartition(table, action.getHdfsContext(), action.getIdentity(), action.getData());
Expand Down Expand Up @@ -1660,7 +1679,8 @@ private PartitionStatistics getExistingPartitionStatistics(HiveIdentity identity
}
}

private void prepareAddPartition(Table table, HdfsContext hdfsContext, HiveIdentity identity, PartitionAndMore partitionAndMore)
private void prepareAddPartition(Table table, HdfsContext hdfsContext, HiveIdentity identity, PartitionAndMore partitionAndMore,
AtomicBoolean isCtasWithPartitions)
{
deleteOnly = false;

Expand All @@ -1673,20 +1693,17 @@ private void prepareAddPartition(Table table, HdfsContext hdfsContext, HiveIdent
partition.getSchemaTableName(),
ignored -> new PartitionAdder(partitionAndMore.getIdentity(), partition.getDatabaseName(), partition.getTableName(), delegate,
partitionCommitBatchSize, updateStatisticsOperations));

if (pathExists(hdfsContext, hdfsEnvironment, currentPath)) {
if (!targetPath.equals(currentPath)) {
renameNewPartitionDirectory(
hdfsContext,
hdfsEnvironment,
currentPath,
targetPath,
cleanUpTasksForAbort);
if (!isCtasWithPartitions.get()) {
if (pathExists(hdfsContext, hdfsEnvironment, currentPath)) {
if (!targetPath.equals(currentPath)) {
renameNewPartitionDirectory(hdfsContext, hdfsEnvironment, currentPath, targetPath,
cleanUpTasksForAbort);
}
}
else {
cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true));
createDirectory(hdfsContext, hdfsEnvironment, targetPath);
}
}
else {
cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true));
createDirectory(hdfsContext, hdfsEnvironment, targetPath);
}
String partitionName = getPartitionName(table, partition.getValues());
partitionAdder.addPartition(new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate(),
Expand Down Expand Up @@ -1944,6 +1961,23 @@ private void executeIrreversibleMetastoreOperations()
throw prestoException;
}
}

private void renameStagingDir(HdfsContext hdfsContext, Path sourcePath, Path targetPath)
{
//if the target table path already exists, then delete first and do rename
if (pathExists(hdfsContext, hdfsEnvironment, targetPath)) {
try {
if (!hdfsEnvironment.getFileSystem(hdfsContext, targetPath).delete(targetPath, false)) {
throw new IOException("delete returned false");
}
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Failed to delete target dir %s ", targetPath), e);
}
}
renameDirectory(hdfsContext, hdfsEnvironment, sourcePath, targetPath,
() -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true)));
}
}

@GuardedBy("this")
Expand Down Expand Up @@ -2263,11 +2297,18 @@ private static void renameNewPartitionDirectory(HdfsContext context,
FileStatus fileStatus = getFileStatus(context, hdfsEnvironment, source);
if (fileStatus.isDirectory()) {
FileStatus[] children = getChildren(context, hdfsEnvironment, source);
int childFileCounter = 0;
for (FileStatus child : children) {
Path subTarget = new Path(target, child.getPath().getName());
renameDirectory(context, hdfsEnvironment, child.getPath(),
subTarget,
() -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, subTarget, true)));
if (childFileCounter == 0) {
renameFile(context, hdfsEnvironment, child.getPath(), subTarget,
() -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, subTarget, true)), true);
}
else {
renameFile(context, hdfsEnvironment, child.getPath(), subTarget,
() -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, subTarget, true)), false);
}
childFileCounter++;
}
}
else {
Expand Down Expand Up @@ -2298,6 +2339,24 @@ private static void renameDirectory(HdfsEnvironment.HdfsContext context, HdfsEnv
}
}

private static void renameFile(HdfsEnvironment.HdfsContext context, HdfsEnvironment hdfsEnvironment, Path source, Path target, Runnable runWhenRenameSuccess, boolean createParentDir)
{
if (createParentDir) {
if (!pathExists(context, hdfsEnvironment, target.getParent())) {
createDirectory(context, hdfsEnvironment, target.getParent());
}
}
try {
if (!hdfsEnvironment.getFileSystem(context, source).rename(source, target)) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Failed to rename file from %s to %s: rename returned false", source, target));
}
runWhenRenameSuccess.run();
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Failed to rename file from %s to %s", source, target), e);
}
}

/**
* Attempts to remove the file or empty directory.
*
Expand Down