diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index f2ed5e5f5..64530bcd7 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -1224,6 +1224,8 @@ private void commitShared() checkHoldsLock(); Committer committer = new Committer(); + Map> ctasTablesMap = new HashMap<>(); + try { List> tableActionsFutures = this.tableActions.entrySet().stream() .map(entry -> hiveMetastoreClientService.submit(() -> { @@ -1237,6 +1239,7 @@ private void commitShared() committer.prepareAlterTable(action.getHdfsContext(), action.getIdentity(), action.getData()); break; case ADD: + ctasTablesMap.put(schemaTableName.toString(), action.getData().getCurrentLocation()); committer.prepareAddTable(action.getHdfsContext(), action.getData()); break; case INSERT_EXISTING: @@ -1257,6 +1260,21 @@ private void commitShared() HiveIdentity identity = values.iterator().next().getIdentity(); Table table = getTable(identity, schemaTableName.getSchemaName(), schemaTableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(schemaTableName)); + AtomicBoolean isCtasWithPartitions = new AtomicBoolean(false); + String tableKey = table.getDatabaseName() + "." + table.getTableName(); + if (ctasTablesMap.containsKey(tableKey)) { + //do the rename operation here from the staging dir to actual table dir for ctas operation + long partitionsSize = tableEntry.getValue().entrySet().stream().count(); + if (partitionsSize > 0) { + isCtasWithPartitions.set(true); + Optional currentPath = ctasTablesMap.get(tableKey); + if (currentPath.isPresent()) { + HdfsContext hdfsContext = values.iterator().next().getHdfsContext(); + committer.renameStagingDir(hdfsContext, currentPath.get(), new Path(table.getStorage().getLocation())); + } + } + } + List> partitionActionsFutures = tableEntry.getValue().entrySet().stream() .map(partitionEntry -> hiveMetastoreClientService.submit(() -> { List partitionValues = partitionEntry.getKey(); @@ -1269,7 +1287,8 @@ private void commitShared() committer.prepareAlterPartition(table, action.getHdfsContext(), action.getIdentity(), action.getData()); break; case ADD: - committer.prepareAddPartition(table, action.getHdfsContext(), action.getIdentity(), action.getData()); + committer.prepareAddPartition(table, action.getHdfsContext(), action.getIdentity(), action.getData(), + isCtasWithPartitions); break; case INSERT_EXISTING: committer.prepareInsertExistingPartition(table, action.getHdfsContext(), action.getIdentity(), action.getData()); @@ -1660,7 +1679,8 @@ private PartitionStatistics getExistingPartitionStatistics(HiveIdentity identity } } - private void prepareAddPartition(Table table, HdfsContext hdfsContext, HiveIdentity identity, PartitionAndMore partitionAndMore) + private void prepareAddPartition(Table table, HdfsContext hdfsContext, HiveIdentity identity, PartitionAndMore partitionAndMore, + AtomicBoolean isCtasWithPartitions) { deleteOnly = false; @@ -1673,20 +1693,17 @@ private void prepareAddPartition(Table table, HdfsContext hdfsContext, HiveIdent partition.getSchemaTableName(), ignored -> new PartitionAdder(partitionAndMore.getIdentity(), partition.getDatabaseName(), partition.getTableName(), delegate, partitionCommitBatchSize, updateStatisticsOperations)); - - if (pathExists(hdfsContext, hdfsEnvironment, currentPath)) { - if (!targetPath.equals(currentPath)) { - renameNewPartitionDirectory( - hdfsContext, - hdfsEnvironment, - currentPath, - targetPath, - cleanUpTasksForAbort); + if (!isCtasWithPartitions.get()) { + if (pathExists(hdfsContext, hdfsEnvironment, currentPath)) { + if (!targetPath.equals(currentPath)) { + renameNewPartitionDirectory(hdfsContext, hdfsEnvironment, currentPath, targetPath, + cleanUpTasksForAbort); + } + } + else { + cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true)); + createDirectory(hdfsContext, hdfsEnvironment, targetPath); } - } - else { - cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true)); - createDirectory(hdfsContext, hdfsEnvironment, targetPath); } String partitionName = getPartitionName(table, partition.getValues()); partitionAdder.addPartition(new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate(), @@ -1944,6 +1961,23 @@ private void executeIrreversibleMetastoreOperations() throw prestoException; } } + + private void renameStagingDir(HdfsContext hdfsContext, Path sourcePath, Path targetPath) + { + //if the target table path already exists, then delete first and do rename + if (pathExists(hdfsContext, hdfsEnvironment, targetPath)) { + try { + if (!hdfsEnvironment.getFileSystem(hdfsContext, targetPath).delete(targetPath, false)) { + throw new IOException("delete returned false"); + } + } + catch (IOException e) { + throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Failed to delete target dir %s ", targetPath), e); + } + } + renameDirectory(hdfsContext, hdfsEnvironment, sourcePath, targetPath, + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, true))); + } } @GuardedBy("this") @@ -2263,11 +2297,18 @@ private static void renameNewPartitionDirectory(HdfsContext context, FileStatus fileStatus = getFileStatus(context, hdfsEnvironment, source); if (fileStatus.isDirectory()) { FileStatus[] children = getChildren(context, hdfsEnvironment, source); + int childFileCounter = 0; for (FileStatus child : children) { Path subTarget = new Path(target, child.getPath().getName()); - renameDirectory(context, hdfsEnvironment, child.getPath(), - subTarget, - () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, subTarget, true))); + if (childFileCounter == 0) { + renameFile(context, hdfsEnvironment, child.getPath(), subTarget, + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, subTarget, true)), true); + } + else { + renameFile(context, hdfsEnvironment, child.getPath(), subTarget, + () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, subTarget, true)), false); + } + childFileCounter++; } } else { @@ -2298,6 +2339,24 @@ private static void renameDirectory(HdfsEnvironment.HdfsContext context, HdfsEnv } } + private static void renameFile(HdfsEnvironment.HdfsContext context, HdfsEnvironment hdfsEnvironment, Path source, Path target, Runnable runWhenRenameSuccess, boolean createParentDir) + { + if (createParentDir) { + if (!pathExists(context, hdfsEnvironment, target.getParent())) { + createDirectory(context, hdfsEnvironment, target.getParent()); + } + } + try { + if (!hdfsEnvironment.getFileSystem(context, source).rename(source, target)) { + throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Failed to rename file from %s to %s: rename returned false", source, target)); + } + runWhenRenameSuccess.run(); + } + catch (IOException e) { + throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Failed to rename file from %s to %s", source, target), e); + } + } + /** * Attempts to remove the file or empty directory. *