diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 551639f..d596c85 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -173,6 +173,7 @@
COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""),
+ MRAMRECOVER("hive.exec.mr.am.recover", "true"),
BYTESPERREDUCER("hive.exec.reducers.bytes.per.reducer", (long) (1000 * 1000 * 1000)),
MAXREDUCERS("hive.exec.reducers.max", 999),
PREEXECHOOKS("hive.exec.pre.hooks", ""),
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 6523097..22c9723 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -55,7 +55,9 @@
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.WebHCatJTShim23;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@ -576,4 +578,66 @@ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec)
public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) {
return context.getConfiguration();
}
+
+ /**
+ * This class is for Hadoop 2.x and 0.23.x where MR AM restart is supported.
+ * This will override the two new APIs called isRecoverySupported
+ * and recoverTask().
+ *
+ */
+ public static class NullOutputCommitter extends OutputCommitter {
+ public static final String HIVE_EXEC_MR_AM_RECOVER = "hive.exec.mr.am.recover";
+ private boolean isRecoverySupported = true;
+
+ @Override
+ public void setupJob(org.apache.hadoop.mapred.JobContext jobContext) {
+ }
+
+ @Override
+ public void cleanupJob(org.apache.hadoop.mapred.JobContext jobContext) {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) {
+ isRecoverySupported = taskContext.getConfiguration().getBoolean(HIVE_EXEC_MR_AM_RECOVER, false);
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) {
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) {
+ }
+
+ @Override
+ public boolean isRecoverySupported() {
+ return isRecoverySupported;
+ }
+
+ /*
+ * This method basically a no-op. Because Hive infrastructure is writing the
+ * task output in the same location for all tasks and for all attempts.
+ * Hive has a existing logic (implemented in
+ * Utilities.removeTempOrDuplicateFiles)
+ * to take the correct attempt outputs as final result.
+ */
+ @Override
+ public void recoverTask(TaskAttemptContext context) throws IOException {
+ LOG.info("Recovering task attempt ID :"
+ + (context == null ? null : context.getTaskAttemptID()));
+ }
+ }
+
+ @Override
+ public void prepareJobOutput(JobConf conf) {
+ super.prepareJobOutput(conf);
+ conf.setOutputCommitter(NullOutputCommitter.class);
+ }
+
}
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
index bf9c84f..bf3a52e 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
@@ -149,6 +149,7 @@ public static String getMajorVersion() {
case 1:
return "0.20S";
case 2:
+ case 3:
return "0.23";
default:
throw new IllegalArgumentException("Unrecognized Hadoop major version number: " + vers);