diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 76fee61..3a1dfe6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1538,6 +1538,17 @@ public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws I Pattern.compile("^.*?([0-9]+)(_[0-9]{1,6})?(\\..*)?$"); /** + * Some jobs like "INSERT INTO" jobs create copies of files like 0000001_0_copy_2. For such files, + * Group 1: 00000001 [taskId] + * Group 3: 0 [task attempId] + * Group 4: _copy_2 [copy suffix] + * Group 6: copy [copy keyword] + * Group 8: 2 [copy file index] + */ + private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX = + Pattern.compile("^.*?([0-9]+)(_)([0-9]{1,6})?((_)(\\Bcopy\\B)(_)([0-9]{1,6})$)?(\\..*)?$"); + + /** * This retruns prefix part + taskID for bucket join for partitioned table */ private static final Pattern FILE_NAME_PREFIXED_TASK_ID_REGEX = @@ -1862,21 +1873,42 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I // speculative runs), but the largest should be the correct one since the result // of a successful run should never be smaller than a failed/speculative run. FileStatus toDelete = null; - if (otherFile.getLen() >= one.getLen()) { - toDelete = one; - } else { - toDelete = otherFile; - taskIdToFile.put(taskId, one); - } - long len1 = toDelete.getLen(); - long len2 = taskIdToFile.get(taskId).getLen(); - if (!fs.delete(toDelete.getPath(), true)) { - throw new IOException("Unable to delete duplicate file: " + toDelete.getPath() - + ". Existing file: " + taskIdToFile.get(taskId).getPath()); + + // "LOAD .. INTO" and "INSERT INTO" commands will generate files with + // "_copy_x" suffix. These files are usually read by map tasks and the + // task output gets written to some tmp path. The output file names will + // be of format taskId_attemptId. The usual path for all these tasks is + // srcPath -> taskTmpPath -> tmpPath -> finalPath. + // But, MergeFileTask can move files directly from src path to final path + // without copying it to tmp path. In such cases, different files with + // "_copy_x" suffix will be identified as duplicates (change in value + // of x is wrongly identified as attempt id) and will be deleted. + // To avoid that we will ignore files with "_copy_x" suffix from duplicate + // elimination. + if (!isCopyFile(one.getPath().getName())) { + if (otherFile.getLen() >= one.getLen()) { + toDelete = one; + } else { + toDelete = otherFile; + taskIdToFile.put(taskId, one); + } + long len1 = toDelete.getLen(); + long len2 = taskIdToFile.get(taskId).getLen(); + if (!fs.delete(toDelete.getPath(), true)) { + throw new IOException( + "Unable to delete duplicate file: " + toDelete.getPath() + + ". Existing file: " + + taskIdToFile.get(taskId).getPath()); + } else { + LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + + " with length " + + len1 + ". Existing file: " + + taskIdToFile.get(taskId).getPath() + " with length " + + len2); + } } else { - LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " - + len1 + ". Existing file: " + taskIdToFile.get(taskId).getPath() + " with length " - + len2); + LOG.info(one.getPath() + " file identified as duplicate. This file is" + + " not deleted as it has copySuffix."); } } } @@ -1884,6 +1916,29 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I return taskIdToFile; } + public static boolean isCopyFile(String filename) { + String taskId = filename; + String copyFileSuffix = null; + int dirEnd = filename.lastIndexOf(Path.SEPARATOR); + if (dirEnd != -1) { + taskId = filename.substring(dirEnd + 1); + } + Matcher m = COPY_FILE_NAME_TO_TASK_ID_REGEX.matcher(taskId); + if (!m.matches()) { + LOG.warn("Unable to verify if file name " + filename + " has _copy_ suffix."); + } else { + taskId = m.group(1); + copyFileSuffix = m.group(4); + } + + LOG.debug("Filename: " + filename + " TaskId: " + taskId + " CopySuffix: " + copyFileSuffix); + if (taskId != null && copyFileSuffix != null) { + return true; + } + + return false; + } + public static String getNameMessage(Exception e) { return e.getClass().getName() + "(" + e.getMessage() + ")"; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java index beb4f7d..6c691b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.io.merge; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; @@ -31,6 +29,10 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + public class MergeMapper extends MapReduceBase { protected JobConf jc; protected Path finalPath; @@ -48,6 +50,7 @@ protected Path tmpPath; protected Path taskTmpPath; protected Path dpPath; + protected Set incompatFileSet; public final static Log LOG = LogFactory.getLog("MergeMapper"); @@ -62,6 +65,7 @@ public void configure(JobConf job) { HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH); Path specPath = MergeOutputFormat.getMergeOutputPath(job); + incompatFileSet = new HashSet(); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); updatePaths(tmpPath, taskTmpPath); @@ -176,6 +180,23 @@ public void close() throws IOException { if (!fs.rename(outPath, finalPath)) { throw new IOException("Unable to rename output to " + finalPath); } + + // move any incompatible files to final path + if (!incompatFileSet.isEmpty()) { + for (Path incompatFile : incompatFileSet) { + String fileName = incompatFile.getName(); + Path destFile = new Path(finalPath.getParent(), fileName); + try { + Utilities.renameOrMoveFiles(fs, incompatFile, destFile); + LOG.info("Moved incompatible file " + incompatFile + " to " + + destFile); + } catch (HiveException e) { + LOG.error("Unable to move " + incompatFile + " to " + destFile); + throw new IOException(e); + } + } + } + } else { if (!autoDelete) { fs.delete(outPath, true); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java index b36152a..13ec642 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java @@ -18,20 +18,24 @@ package org.apache.hadoop.hive.ql.io.orc; -import java.io.IOException; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.merge.MergeMapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.shims.CombineHiveKey; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + /** * Map task fast merging of ORC files. */ @@ -96,31 +100,9 @@ public void map(Object key, OrcFileValueWrapper value, OutputCollector