Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 980277) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -471,7 +471,7 @@ } else { /* extract the task and attempt id from the hadoop taskid. in version 17 the leading component was 'task_'. thereafter - the leading component is 'attempt_'. in 17 - hadoop also + the leading component is 'attempt_'. in 17 - hadoop also seems to have used _map_ and _reduce_ to denote map/reduce task types */ @@ -980,8 +980,9 @@ public static String getTaskIdFromFilename(String filename) { String taskId = filename; int dirEnd = filename.lastIndexOf(Path.SEPARATOR); - if (dirEnd != -1) + if (dirEnd != -1) { taskId = filename.substring(dirEnd + 1); + } Matcher m = fileNameTaskIdRegex.matcher(taskId); if (!m.matches()) { @@ -1143,12 +1144,26 @@ if (otherFile == null) { taskIdToFile.put(taskId, one); } else { - if (!fs.delete(one.getPath(), true)) { + // Compare the file sizes of all the attempt files for the same task, the largest win + // any attempt files could contain partial results (due to task failures or + // speculative runs), but the largest should be the correct one since the result + // of a successful run should never be smaller than a failed/speculative run. + FileStatus toDelete = null; + if (otherFile.getLen() >= one.getLen()) { + toDelete = one; + } else { + toDelete = otherFile; + taskIdToFile.put(taskId, one); + } + long len1 = toDelete.getLen(); + long len2 = taskIdToFile.get(taskId).getLen(); + if (!fs.delete(toDelete.getPath(), true)) { throw new IOException("Unable to delete duplicate file: " - + one.getPath() + ". Existing file: " + otherFile.getPath()); + + toDelete.getPath() + ". Existing file: " + taskIdToFile.get(taskId).getPath()); } else { - LOG.warn("Duplicate taskid file removed: " + one.getPath() - + ". Existing file: " + otherFile.getPath()); + LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " + + len1 + ". Existing file: " + taskIdToFile.get(taskId).getPath() + + " with length " + len2); } } }