diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 551639f..d596c85 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -173,6 +173,7 @@ COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false), COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""), COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""), + MRAMRECOVER("hive.exec.mr.am.recover", "true"), BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)), MAXREDUCERS("hive.exec.reducers.max", 999), PREEXECHOOKS("hive.exec.pre.hooks", ""), diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 6523097..58026be 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -55,7 +55,9 @@ import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapred.OutputCommitter; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.WebHCatJTShim23; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -576,4 +578,66 @@ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) { return context.getConfiguration(); } + + /** + * This class is for Hadoop 2.x and 0.23.x where MR AM restart is supported. + * This will override the two new APIs called isRecoverySupported + * and recoverTask(). + * + */ + public static class NullOutputCommitter extends OutputCommitter { + public static final String HIVE_EXEC_MR_AM_RECOVER = "hive.exec.mr.am.recover"; + private boolean isRecoverySupported = true; + + @Override + public void setupJob(org.apache.hadoop.mapred.JobContext jobContext) { + isRecoverySupported = jobContext.getConfiguration().getBoolean(HIVE_EXEC_MR_AM_RECOVER, false); + } + + @Override + public void cleanupJob(org.apache.hadoop.mapred.JobContext jobContext) { + } + + @Override + public void setupTask(TaskAttemptContext taskContext) { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) { + } + + @Override + public void abortTask(TaskAttemptContext taskContext) { + } + + @Override + public boolean isRecoverySupported() { + return isRecoverySupported; + } + + /* + * This method basically a no-op. Because Hive infrastructure is writing the + * task output in the same location for all tasks and for all attempts. + * Hive has a existing logic (implemented in + * Utilities.removeTempOrDuplicateFiles) + * to take the correct attempt outputs as final result. + */ + @Override + public void recoverTask(TaskAttemptContext context) throws IOException { + LOG.info("Recovering task attempt ID :" + + (context == null ? null : context.getTaskAttemptID())); + } + } + + @Override + public void prepareJobOutput(JobConf conf) { + super.prepareJobOutput(conf); + conf.setOutputCommitter(NullOutputCommitter.class); + } + } diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java index bf9c84f..bf3a52e 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java @@ -149,6 +149,7 @@ public static String getMajorVersion() { case 1: return "0.20S"; case 2: + case 3: return "0.23"; default: throw new IllegalArgumentException("Unrecognized Hadoop major version number: " + vers);