diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1d32ba0172..ac89dd9efe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1431,17 +1431,28 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // 3) Rename/move the temp directory to specPath FileSystem fs = specPath.getFileSystem(hconf); - boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); + PerfLogger perfLogger = SessionState.getPerfLogger(); + boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); + boolean avoidRename = false; + boolean shouldAvoidRename = shouldAvoidRename(conf, hconf); + + if(isBlobStorage && (shouldAvoidRename|| ((conf != null) && conf.isCTASorCM())) + || (!isBlobStorage && shouldAvoidRename)) { + avoidRename = true; + } if (success) { - if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath) && !isBlobStorage) { + if (!avoidRename && fs.exists(tmpPath)) { // 1) Rename tmpPath to a new directory name to prevent additional files // from being added by runaway processes. + // this is only done for all statements except SELECT, CTAS and Create MV Path tmpPathOriginal = tmpPath; tmpPath = new Path(tmpPath.getParent(), tmpPath.getName() + ".moved"); - LOG.debug("Moving/Renaming " + tmpPathOriginal + " to " + tmpPath); + LOG.debug("shouldAvoidRename is false therefore moving/renaming " + tmpPathOriginal + " to " + tmpPath); + perfLogger.PerfLogBegin("FileSinkOperator", "rename"); Utilities.rename(fs, tmpPathOriginal, tmpPath); + perfLogger.PerfLogEnd("FileSinkOperator", "rename"); } // Remove duplicates from tmpPath @@ -1449,7 +1460,6 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); FileStatus[] statuses = statusList.toArray(new FileStatus[statusList.size()]); if(statuses != null && statuses.length > 0) { - PerfLogger perfLogger = SessionState.getPerfLogger(); Set filesKept = new HashSet<>(); perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // remove any tmp file or double-committed output files @@ -1471,12 +1481,19 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, // move to the file destination Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath); if(shouldAvoidRename(conf, hconf)){ + // for SELECT statements LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString()); conf.getFilesToFetch().addAll(filesKept); - } else if (isBlobStorage) { + } else if (conf !=null && conf.isCTASorCM() && isBlobStorage) { + // for CTAS or Create MV statements + perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus"); + LOG.debug("CTAS/Create MV: Files being renamed: " + filesKept.toString()); Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept); + perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus"); } else { + // for rest of the statement e.g. INSERT, LOAD etc perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); + LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath); Utilities.renameOrMoveFiles(fs, tmpPath, specPath); perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8ff00fbb1f..826b23e5fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -8168,7 +8168,8 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery()); + dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(), + qb.isCTAS() || qb.isMaterializedView()); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 61ea28a5f5..72ecde4255 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -115,6 +115,8 @@ private boolean isQuery = false; + private boolean isCTASorCM = false; + public FileSinkDesc() { } @@ -125,7 +127,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, - Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery) { + Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -143,6 +145,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.isMmCtas = isMmCtas; this.isInsertOverwrite = isInsertOverwrite; this.isQuery = isQuery; + this.isCTASorCM = isCTASorCM; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -164,7 +167,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery); + partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, isCTASorCM); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -181,6 +184,7 @@ public Object clone() throws CloneNotSupportedException { ret.setIsMerge(isMerge); ret.setFilesToFetch(filesToFetch); ret.setIsQuery(isQuery); + ret.setIsCTASorCM(isCTASorCM); return ret; } @@ -188,6 +192,10 @@ public void setFilesToFetch(Set filesToFetch) { this.filesToFetch = filesToFetch; } + public void setIsCTASorCM(boolean isCTASorCM) { + this.isCTASorCM = isCTASorCM; + } + public void setIsQuery(boolean isQuery) { this.isQuery = isQuery; } @@ -591,6 +599,15 @@ public boolean isMmCtas() { return isMmCtas; } + /** + * Whether this is CREATE TABLE SELECT or CREATE MATERIALIZED VIEW statemet + * Set by semantic analyzer this is required because CTAS/CM requires some special logic + * in mvFileToFinalPath + */ + public boolean isCTASorCM() { + return isCTASorCM; + } + public class FileSinkOperatorExplainVectorization extends OperatorExplainVectorization { public FileSinkOperatorExplainVectorization(VectorFileSinkDesc vectorFileSinkDesc) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index a75103d60d..2c4b69b2fe 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -286,7 +286,7 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, - false, 1, 1, partCols, dpCtx, null, null, false, false, false); + false, 1, 1, partCols, dpCtx, null, null, false, false, false, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); }