diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index 6c80a14..a7c801b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -210,17 +210,15 @@ public void closeOp(boolean abort) throws HiveException { + fss.getLen()); // move any incompatible files to final path - if (!incompatFileSet.isEmpty()) { - for (Path incompatFile : incompatFileSet) { - Path destDir = finalPath.getParent(); - try { - Utilities.renameOrMoveFiles(fs, incompatFile, destDir); - LOG.info("Moved incompatible file " + incompatFile + " to " + - destDir); - } catch (HiveException e) { - LOG.error("Unable to move " + incompatFile + " to " + destDir); - throw new IOException(e); - } + for (Path incompatFile : incompatFileSet) { + Path markedPath = Utilities.toIncompatiblePath(incompatFile); + Path destPath = new Path(finalPath.getParent(), markedPath.getName()); + try { + Utilities.renameOrMoveFiles(fs, incompatFile, destPath); + LOG.info("Moved incompatible file " + incompatFile + " to " + destPath); + } catch (HiveException e) { + LOG.error("Unable to move " + incompatFile + " to " + destPath); + throw e; } } } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 583b82b..4beffdc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -84,6 +84,7 @@ public MoveTask() { private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) throws Exception { + LOG.warn("Moving " + sourcePath + " to " + targetPath); FileSystem fs = sourcePath.getFileSystem(conf); if (isDfsDir) { // Just do a rename on the URIs, they belong to the same FS diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 4170659..87f7b47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1496,6 +1496,8 @@ public static String realFile(String newFile, Configuration conf) throws IOExcep return src; } + private static final String incompatiblePrefix = "incompatible."; + private static final String tmpPrefix = "_tmp."; private static final String taskTmpPrefix = "_task_tmp."; @@ -1506,6 +1508,13 @@ public static Path toTaskTempPath(Path orig) { return new Path(orig.getParent(), taskTmpPrefix + orig.getName()); } + public static Path toIncompatiblePath(Path orig) { + if (orig.getName().startsWith(incompatiblePrefix)) { + return orig; + } + return new Path(orig.getParent(), incompatiblePrefix + orig.getName()); + } + public static Path toTempPath(Path orig) { if (orig.getName().indexOf(tmpPrefix) == 0) { return orig; @@ -1520,6 +1529,11 @@ public static Path toTempPath(String orig) { return toTempPath(new Path(orig)); } + public static boolean isIncompatiblePath(FileStatus file) { + String name = file.getPath().getName(); + return name.startsWith(incompatiblePrefix); + } + /** * Detect if the supplied file is a temporary path. */ @@ -1562,6 +1576,7 @@ public static void rename(FileSystem fs, Path src, Path dst) throws IOException, */ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws IOException, HiveException { + LOG.warn("renaming " + src + " to " + dst); if (!fs.exists(dst)) { if (!fs.rename(src, dst)) { throw new HiveException("Unable to move: " + src + " to: " + dst); @@ -1586,6 +1601,7 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I } while (fs.exists(dstFilePath)); } + LOG.warn(" ++ " + srcFilePath + " to " + dstFilePath); if (!fs.rename(srcFilePath, dstFilePath)) { throw new HiveException("Unable to move: " + src + " to: " + dst); } @@ -1653,7 +1669,7 @@ public static String getTaskIdFromFilename(String filename) { * filename to extract taskid from */ public static String getPrefixedTaskIdFromFilename(String filename) { - return getIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX); + return getIdFromFilename(filename, COPY_FILE_NAME_TO_TASK_ID_REGEX); } private static String getIdFromFilename(String filename, Pattern pattern) { @@ -1872,7 +1888,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I * * @return a list of path names corresponding to should-be-created empty buckets. */ - public static ArrayList removeTempOrDuplicateFiles(FileSystem fs, Path path, + private static ArrayList removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx) throws IOException { if (path == null) { return null; @@ -1881,7 +1897,6 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I ArrayList result = new ArrayList(); if (dpCtx != null) { FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs); - HashMap taskIDToFile = null; for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() @@ -1897,11 +1912,9 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } } - taskIDToFile = removeTempOrDuplicateFiles(items, fs); + HashMap taskIDToFile = removeTempOrDuplicateFiles(items, fs); // if the table is bucketed and enforce bucketing, we should check and generate all buckets if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null) { - // refresh the file list - items = fs.listStatus(parts[i].getPath()); // get the missing buckets and generate empty buckets String taskID1 = taskIDToFile.keySet().iterator().next(); Path bucketPath = taskIDToFile.values().iterator().next().getPath(); @@ -1922,7 +1935,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I return result; } - public static HashMap removeTempOrDuplicateFiles(FileStatus[] items, + private static HashMap removeTempOrDuplicateFiles(FileStatus[] items, FileSystem fs) throws IOException { if (items == null || fs == null) { @@ -1932,7 +1945,9 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I HashMap taskIdToFile = new HashMap(); for (FileStatus one : items) { - if (isTempPath(one)) { + if (isIncompatiblePath(one)) { + continue; + } else if (isTempPath(one)) { if (!fs.delete(one.getPath(), true)) { throw new IOException("Unable to delete tmp file: " + one.getPath()); } @@ -3721,4 +3736,8 @@ public static String getQualifiedPath(HiveConf conf, Path path) throws HiveExcep public static boolean isDefaultNameNode(HiveConf conf) { return !conf.getChangedProperties().containsKey(HiveConf.ConfVars.HADOOPFS.varname); } + + public static void main(String[] args) { + System.err.println("[Utilities/main] " + getPrefixedTaskIdFromFilename("000000_1_copy_1")); + } } diff --git ql/src/test/queries/clientpositive/orc_merge_incompat1.q ql/src/test/queries/clientpositive/orc_merge_incompat1.q index 0348948..2f16b3a 100644 --- ql/src/test/queries/clientpositive/orc_merge_incompat1.q +++ ql/src/test/queries/clientpositive/orc_merge_incompat1.q @@ -29,7 +29,7 @@ select * from orc_merge5b; set hive.merge.orcfile.stripe.level=true; alter table orc_merge5b concatenate; --- 3 file after merging - all 0.12 format files will be merged and 0.11 files will be left behind +-- 4 file after merging - all 0.12 format files will be merged and 0.11 files will be left behind analyze table orc_merge5b compute statistics noscan; dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/orc_merge5b/; select * from orc_merge5b; diff --git ql/src/test/results/clientpositive/tez/orc_merge_incompat1.q.out ql/src/test/results/clientpositive/tez/orc_merge_incompat1.q.out index ea2dd5d..392dd6c 100644 --- ql/src/test/results/clientpositive/tez/orc_merge_incompat1.q.out +++ ql/src/test/results/clientpositive/tez/orc_merge_incompat1.q.out @@ -204,12 +204,12 @@ POSTHOOK: query: alter table orc_merge5b concatenate POSTHOOK: type: ALTER_TABLE_MERGE POSTHOOK: Input: default@orc_merge5b POSTHOOK: Output: default@orc_merge5b -PREHOOK: query: -- 3 file after merging - all 0.12 format files will be merged and 0.11 files will be left behind +PREHOOK: query: -- 4 file after merging - all 0.12 format files will be merged and 0.11 files will be left behind analyze table orc_merge5b compute statistics noscan PREHOOK: type: QUERY PREHOOK: Input: default@orc_merge5b PREHOOK: Output: default@orc_merge5b -POSTHOOK: query: -- 3 file after merging - all 0.12 format files will be merged and 0.11 files will be left behind +POSTHOOK: query: -- 4 file after merging - all 0.12 format files will be merged and 0.11 files will be left behind analyze table orc_merge5b compute statistics noscan POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_merge5b