diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 3a5a325..a0b7c3b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -133,6 +134,10 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag overrideTblProps(job, t.getParameters(), ci.properties); } setColumnTypes(job, sd.getCols()); + //with feature on, multiple tasks may get into conflict creating/using TMP_LOCATION and if we were + //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter + //to do the final move + job.setBoolean("mapreduce.map.speculative", false); return job; } @@ -624,7 +629,7 @@ public void map(WritableComparable key, CompactorInputSplit split, AcidInputFormat aif = instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME)); ValidTxnList txnList = - new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); + new ValidCompactorTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); boolean isMajor = jobConf.getBoolean(IS_MAJOR, false); AcidInputFormat.RawReader reader =