Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java =================================================================== --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 950278) +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (working copy) @@ -39,6 +39,10 @@ import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.lib.NullOutputFormat; /** * Implemention of shims against Hadoop 0.20.0. @@ -333,4 +337,25 @@ public void setFloatConf(Configuration conf, String varName, float val) { conf.setFloat(varName, val); } + + public static class NullOutputCommitter extends OutputCommitter { + public void setupJob(JobContext jobContext) { } + public void cleanupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + public void commitTask(TaskAttemptContext taskContext) { } + public void abortTask(TaskAttemptContext taskContext) { } + } + + public void setNullOutputFormat(JobConf conf) { + conf.setOutputFormat(NullOutputFormat.class); + conf.setOutputCommitter(Hadoop20Shims.NullOutputCommitter.class); + + // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463) + // but can be backported. So we disable setup/cleanup in all versions >= 0.19 + conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false); + } } Index: shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java =================================================================== --- shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java (revision 950278) +++ shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; +import org.apache.hadoop.mapred.lib.NullOutputFormat; import java.io.IOException; @@ -50,7 +51,7 @@ public boolean isJobPreparing(RunningJob job) throws IOException { return false; } - + public void inputFormatValidateInput(InputFormat fmt, JobConf conf) throws IOException { fmt.validateInput(conf); @@ -100,7 +101,7 @@ public int compareText(Text a, Text b) { return a.compareTo(b); } - + @Override public long getAccessTime(FileStatus file) { return -1; @@ -113,7 +114,7 @@ public String getInputFormatClassName() { return "org.apache.hadoop.hive.ql.io.HiveInputFormat"; } - + @Override public String [] getTaskJobIDs(TaskCompletionEvent t) { return null; @@ -122,4 +123,8 @@ public void setFloatConf(Configuration conf, String varName, float val) { conf.set(varName, Float.toString(val)); } + + public void setNullOutputFormat(JobConf conf) { + conf.setOutputFormat(NullOutputFormat.class); + } } Index: shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java =================================================================== --- shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java (revision 950278) +++ shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java (working copy) @@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskCompletionEvent; +import org.apache.hadoop.mapred.lib.NullOutputFormat; import java.io.IOException; @@ -104,11 +105,11 @@ public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() { return null; } - + public String getInputFormatClassName() { return "org.apache.hadoop.hive.ql.io.HiveInputFormat"; } - + String [] ret = new String[2]; @Override public String [] getTaskJobIDs(TaskCompletionEvent t) { @@ -117,7 +118,7 @@ ret[1] = tid.getJobID().toString(); return ret; } - + @Override public long getAccessTime(FileStatus file) { return -1; @@ -126,4 +127,8 @@ public void setFloatConf(Configuration conf, String varName, float val) { conf.set(varName, Float.toString(val)); } + + public void setNullOutputFormat(JobConf conf) { + conf.setOutputFormat(NullOutputFormat.class); + } } Index: shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java =================================================================== --- shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java (revision 950278) +++ shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java (working copy) @@ -33,6 +33,10 @@ import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.lib.NullOutputFormat; import java.io.IOException; import java.lang.reflect.Constructor; @@ -61,7 +65,7 @@ throws IOException { // gone in 0.18+ } - + public boolean isJobPreparing(RunningJob job) throws IOException { return job.getJobState() == JobStatus.PREP; } @@ -129,8 +133,8 @@ /** * MultiFileShim code here - * - * + * + * */ public abstract static class CombineFileInputFormatShim extends MultiFileInputFormat implements @@ -139,7 +143,7 @@ /** * gets the input paths from static method in parent class. Same code in the * hadoop20shim, adapted for @link{MultiFileInputFormat} - * + * * @param conf * @return Path[] of all files to be processed. */ @@ -161,7 +165,7 @@ /** * Not supported by MultiFileInputFormat so it doesn't do anything - * + * * @param conf * @param filters */ @@ -213,7 +217,7 @@ /** * tries to guesstimate the optimal number of splits. We just calculate the * total size of the job and divide it by the block size. - * + * * @param job * @param numSplits * @return @@ -372,7 +376,7 @@ /** * InputSplitShim - * + * */ public static class InputSplitShim // extends MultiFileSplit implements HadoopShims.InputSplitShim { @@ -390,7 +394,7 @@ /** * It encapsulate a set of files - * + * * @param job * @param old * @throws IOException @@ -480,4 +484,26 @@ public void setFloatConf(Configuration conf, String varName, float val) { conf.set(varName, Float.toString(val)); } + + public static class NullOutputCommitter extends OutputCommitter { + public void setupJob(JobContext jobContext) { } + public void cleanupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + public void commitTask(TaskAttemptContext taskContext) { } + public void abortTask(TaskAttemptContext taskContext) { } + } + + public void setNullOutputFormat(JobConf conf) { + conf.setOutputFormat(NullOutputFormat.class); + conf.setOutputCommitter(Hadoop19Shims.NullOutputCommitter.class); + + + // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463) + // but can be backported. So we disable setup/cleanup in all versions >= 0.19 + conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false); + } } Index: shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java =================================================================== --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (revision 950278) +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (working copy) @@ -50,7 +50,7 @@ * command line interpretation. */ boolean usesJobShell(); - + /** * Return true if the job has not switched to RUNNING state yet * and is still in PREP state @@ -126,12 +126,22 @@ /** * getTaskJobIDs returns an array of String with two elements. The first * element is a string representing the task id and the second is a string - * representing the job id. This is necessary as TaskID and TaskAttemptID + * representing the job id. This is necessary as TaskID and TaskAttemptID * are not supported in Haddop 0.17 */ String[] getTaskJobIDs(TaskCompletionEvent t); /** + * Hive uses side effect files exclusively for it's output. It also manages + * the setup/cleanup/commit of output from the hive client. As a result it does + * not need support for the same inside the MR framework + * + * This routine sets the appropriate options to set the output format and any + * options related to bypass setup/cleanup/commit support in the MR framework + */ + void setNullOutputFormat(JobConf conf); + + /** * InputSplitShim. * */ Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 950278) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -69,7 +69,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -324,7 +323,7 @@ Thread.sleep(pullInterval); } catch (InterruptedException e) { } - + if (initializing && ShimLoader.getHadoopShims().isJobPreparing(rj)) { // No reason to poll untill the job is initialized @@ -554,9 +553,6 @@ hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR); } - String jobScratchDirStr = hiveScratchDir + File.separator - + Utilities.randGen.nextInt(); - Path jobScratchDir = new Path(jobScratchDirStr); String emptyScratchDirStr = null; Path emptyScratchDir = null; @@ -580,7 +576,7 @@ } } - FileOutputFormat.setOutputPath(job, jobScratchDir); + ShimLoader.getHadoopShims().setNullOutputFormat(job); job.setMapperClass(ExecMapper.class); job.setMapOutputKeyClass(HiveKey.class); @@ -735,9 +731,7 @@ } finally { Utilities.clearMapRedWork(job); try { - FileSystem fs = jobScratchDir.getFileSystem(job); - fs.delete(jobScratchDir, true); - fs.delete(emptyScratchDir, true); + emptyScratchDir.getFileSystem(job).delete(emptyScratchDir, true); if (returnVal != 0 && rj != null) { rj.killJob(); }