Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java =================================================================== --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 956664) +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (working copy) @@ -23,6 +23,8 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -436,4 +438,34 @@ // but can be backported. So we disable setup/cleanup in all versions >= 0.19 conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false); } + + /** + * The first group will contain the task id. The second group is the optional + * extension. The file name looks like: "24931_r_000000_0" or + * "24931_r_000000_0.gz" + */ + private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$"); + + /** + * Local job name looks like "job_local_1_map_0000", where 1 is job ID and 0000 is task ID. + */ + private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*local.*_([0-9]*)$"); + + /** + * Get the task id from the filename. E.g., get "000000" out of + * "24931_r_000000_0" or "24931_r_000000_0.gz" + */ + public String getTaskIdFromFilename(String fileName, boolean localMode) { + String taskId = null; + Matcher m = null; + if (localMode) { + m = fileNameLocalTaskIdRegex.matcher(fileName); + } else { + m = fileNameTaskIdRegex.matcher(fileName); + } + if (m.matches()) { + taskId = m.group(1); + } + return taskId; + } } Index: shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java =================================================================== --- shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java (revision 956664) +++ shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java (working copy) @@ -31,6 +31,8 @@ import org.apache.hadoop.mapred.lib.NullOutputFormat; import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Implemention of shims against Hadoop 0.17.0. @@ -133,5 +135,34 @@ String archiveName) throws Exception { throw new RuntimeException("Not implemented in this Hadoop version"); } + + /** + * The first group will contain the task id. The second group is the optional + * extension. The file name looks like: "24931_r_000000_0" or + * "24931_r_000000_0.gz" + */ + private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$"); + /** + * Local job name looks like 'reduce_7wb578' in local mode where job ID is 7wb578. + */ + private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*[_map|reduce]_(.*)$"); + + /** + * Get the task id from the filename. E.g., get "000000" out of + * "24931_r_000000_0" or "24931_r_000000_0.gz" + */ + public String getTaskIdFromFilename(String fileName, boolean localMode) { + String taskId = null; + Matcher m = null; + if (localMode) { + m = fileNameLocalTaskIdRegex.matcher(fileName); + } else { + m = fileNameTaskIdRegex.matcher(fileName); + } + if (m.matches()) { + taskId = m.group(1); + } + return taskId; + } } Index: shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java =================================================================== --- shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java (revision 956664) +++ shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java (working copy) @@ -33,6 +33,8 @@ import org.apache.hadoop.mapred.lib.NullOutputFormat; import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Implemention of shims against Hadoop 0.18.0. @@ -137,4 +139,34 @@ public void setNullOutputFormat(JobConf conf) { conf.setOutputFormat(NullOutputFormat.class); } + + /** + * The first group will contain the task id. The second group is the optional + * extension. The file name looks like: "24931_r_000000_0" or + * "24931_r_000000_0.gz" + */ + private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$"); + + /** + * Local job name looks like "job_local_1_map_0000", where 1 is job ID and 0000 is task ID. + */ + private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*local.*_([0-9]*)$"); + + /** + * Get the task id from the filename. E.g., get "000000" out of + * "24931_r_000000_0" or "24931_r_000000_0.gz" + */ + public String getTaskIdFromFilename(String fileName, boolean localMode) { + String taskId = null; + Matcher m = null; + if (localMode) { + m = fileNameLocalTaskIdRegex.matcher(fileName); + } else { + m = fileNameTaskIdRegex.matcher(fileName); + } + if (m.matches()) { + taskId = m.group(1); + } + return taskId; + } } Index: shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java =================================================================== --- shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java (revision 956664) +++ shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java (working copy) @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.shims; -import java.io.DataInput; -import java.io.DataOutput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -37,9 +35,6 @@ import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; - -import java.io.IOException; -import java.lang.reflect.Constructor; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.MultiFileInputFormat; import org.apache.hadoop.mapred.MultiFileSplit; @@ -47,6 +42,13 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.RunningJob; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * Implemention of shims against Hadoop 0.19.0. */ @@ -512,4 +514,34 @@ // but can be backported. So we disable setup/cleanup in all versions >= 0.19 conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false); } + + /** + * The first group will contain the task id. The second group is the optional + * extension. The file name looks like: "24931_r_000000_0" or + * "24931_r_000000_0.gz" + */ + private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$"); + + /** + * Local job name looks like "job_local_1_map_0000", where 1 is job ID and 0000 is task ID. + */ + private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*local.*_([0-9]*)$"); + + /** + * Get the task id from the filename. E.g., get "000000" out of + * "24931_r_000000_0" or "24931_r_000000_0.gz" + */ + public String getTaskIdFromFilename(String fileName, boolean localMode) { + String taskId = null; + Matcher m = null; + if (localMode) { + m = fileNameLocalTaskIdRegex.matcher(fileName); + } else { + m = fileNameTaskIdRegex.matcher(fileName); + } + if (m.matches()) { + taskId = m.group(1); + } + return taskId; + } } Index: shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (revision 956664) +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (working copy) @@ -201,4 +201,6 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporter, Class> rrClass) throws IOException; } + + public String getTaskIdFromFilename(String fileName, boolean localMode); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (revision 956664) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -224,7 +225,8 @@ log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath); Utilities.rename(fs, tmpPath, intermediatePath); // Step2: remove any tmp file or double-committed output files - Utilities.removeTempOrDuplicateFiles(fs, intermediatePath); + boolean localMode = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPJT).equals("local"); + Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, localMode); // Step3: move to the file destination log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath); Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 956664) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack.FeedBackType; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -404,12 +405,14 @@ private void createBucketFiles(FSPaths fsp) throws HiveException { try { int filesIdx = 0; + boolean localMode = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPJT).equals("local"); Set seenBuckets = new HashSet(); for (int idx = 0; idx < totalFiles; idx++) { if (this.getExecContext() != null && this.getExecContext().getFileId() != -1) { LOG.info("replace taskId from execContext "); - taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId()); + taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId(), + localMode); LOG.info("new taskId: FS " + taskId); @@ -424,7 +427,8 @@ int numReducers = totalFiles/numFiles; if (numReducers > 1) { - int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf))); + int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename( + Utilities.getTaskId(hconf), localMode)); int reducerIdx = prtner.getPartition(key, null, numReducers); if (currReducer != reducerIdx) { @@ -439,7 +443,8 @@ seenBuckets.add(bucketNum); bucketMap.put(bucketNum, filesIdx); - taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); + taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum, + localMode); } if (isNativeTable) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); @@ -690,8 +695,9 @@ log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath); Utilities.rename(fs, tmpPath, intermediatePath); // Step2: remove any tmp file or double-committed output files + boolean localMode = "local".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPJT)); ArrayList emptyBuckets = - Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx); + Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx, localMode); // create empty buckets if necessary if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 956664) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -52,8 +52,6 @@ import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; @@ -85,6 +83,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; @@ -957,36 +957,22 @@ } /** - * The first group will contain the task id. The second group is the optional - * extension. The file name looks like: "24931_r_000000_0" or - * "24931_r_000000_0.gz" + * Get the task ID from the file name and whether the job is running in local mode. + * The pattern of the task ID in the file name depends on the hadoop version + * and whether or not the job is running in local mode. + * @param fileName: output file name generated by tasks. + * @param localMode: whether the job is running in local mode. + * @return non-null task ID if matches the pattern, null if cannot find the pattern. */ - private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$"); - - /** - * Local job name looks like "job_local_1_map_0000", where 1 is job ID and 0000 is task ID. - */ - private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*local.*_([0-9]*)$"); - - /** - * Get the task id from the filename. E.g., get "000000" out of - * "24931_r_000000_0" or "24931_r_000000_0.gz" - */ - public static String getTaskIdFromFilename(String filename) { - String taskId = filename; - Matcher m = fileNameTaskIdRegex.matcher(filename); - if (!m.matches()) { - Matcher m2 = fileNameLocalTaskIdRegex.matcher(filename); - if (!m2.matches()) { - LOG.warn("Unable to get task id from file name: " + filename - + ". Using full filename as task id."); - } else { - taskId = m2.group(1); - } - } else { - taskId = m.group(1); + public static String getTaskIdFromFilename(String fileName, boolean localMode) { + HadoopShims shims = ShimLoader.getHadoopShims(); + String taskId = shims.getTaskIdFromFilename(fileName, localMode); + if (taskId == null) { + LOG.warn("Unable to get task id from file name: " + fileName + + ". Using full filename as task id."); + taskId = fileName; } - LOG.debug("TaskId for " + filename + " = " + taskId); + LOG.debug("TaskId for " + fileName + " = " + taskId); return taskId; } @@ -995,8 +981,9 @@ * "24931_r_000000_0" or "24931_r_000000_0.gz" by 33 to * "24931_r_000033_0" or "24931_r_000033_0.gz" */ - public static String replaceTaskIdFromFilename(String filename, int bucketNum) { - String taskId = getTaskIdFromFilename(filename); + public static String replaceTaskIdFromFilename(String filename, int bucketNum, + boolean localMode) { + String taskId = getTaskIdFromFilename(filename, localMode); String newTaskId = replaceTaskId(taskId, bucketNum); return replaceTaskIdFromFilename(filename, taskId, newTaskId); } @@ -1066,8 +1053,9 @@ * given directory. * @return a list of path names corresponding to should-be-created empty buckets. */ - public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException { - removeTempOrDuplicateFiles(fs, path, null); + public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean localMode) + throws IOException { + removeTempOrDuplicateFiles(fs, path, null, localMode); } /** @@ -1075,7 +1063,8 @@ * given directory. * @return a list of path names corresponding to should-be-created empty buckets. */ - public static ArrayList removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx) + public static ArrayList removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx, + boolean localMode) throws IOException { if (path == null) { return null; @@ -1089,7 +1078,7 @@ for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir(): "dynamic partition " + parts[i].getPath() + " is not a direcgtory"; FileStatus[] items = fs.listStatus(parts[i].getPath()); - taskIDToFile = removeTempOrDuplicateFiles(items, fs); + taskIDToFile = removeTempOrDuplicateFiles(items, fs, localMode); // if the table is bucketed and enforce bucketing, we should check and generate all buckets if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null) { // refresh the file list @@ -1101,7 +1090,8 @@ String taskID2 = replaceTaskId(taskID1, j); if (!taskIDToFile.containsKey(taskID2)) { // create empty bucket, file name should be derived from taskID2 - String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j); + String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j, + localMode); result.add(path2); } } @@ -1109,13 +1099,13 @@ } } else { FileStatus[] items = fs.listStatus(path); - removeTempOrDuplicateFiles(items, fs); + removeTempOrDuplicateFiles(items, fs, localMode); } return result; } public static HashMap removeTempOrDuplicateFiles( - FileStatus[] items, FileSystem fs) + FileStatus[] items, FileSystem fs, boolean localMode) throws IOException { if (items == null || fs == null) { @@ -1130,7 +1120,7 @@ throw new IOException("Unable to delete tmp file: " + one.getPath()); } } else { - String taskId = getTaskIdFromFilename(one.getPath().getName()); + String taskId = getTaskIdFromFilename(one.getPath().getName(), localMode); FileStatus otherFile = taskIdToFile.get(taskId); if (otherFile == null) { taskIdToFile.put(taskId, one);