diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index b1e22d5..238e948 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.text.NumberFormat; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,6 +55,15 @@ private TezProcessorContext processorContext; + protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); + protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); + static { + taskIdFormat.setGroupingUsed(false); + taskIdFormat.setMinimumIntegerDigits(6); + jobIdFormat.setGroupingUsed(false); + jobIdFormat.setMinimumIntegerDigits(4); + } + public TezProcessor(boolean isMap) { this.isMap = isMap; } @@ -79,9 +89,31 @@ public void initialize(TezProcessorContext processorContext) byte[] userPayload = processorContext.getUserPayload(); Configuration conf = TezUtils.createConfFromUserPayload(userPayload); this.jobConf = new JobConf(conf); + setupMRLegacyConfigs(processorContext); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } + private void setupMRLegacyConfigs(TezProcessorContext processorContext) { + StringBuilder taskAttemptIdBuilder = new StringBuilder("task"); + taskAttemptIdBuilder.append(processorContext.getApplicationId().getClusterTimestamp()) + .append("_") + .append(jobIdFormat.format(processorContext.getApplicationId().getId())) + .append("_"); + if (isMap) { + taskAttemptIdBuilder.append("m_"); + } else { + taskAttemptIdBuilder.append("r_"); + } + taskAttemptIdBuilder.append(taskIdFormat.format(processorContext.getTaskIndex())) + .append("_") + .append(processorContext.getTaskAttemptNumber()); + + // In MR, mapreduce.task.attempt.id is same as mapred.task.id. Go figure. + String taskAttemptIdStr = taskAttemptIdBuilder.toString(); + this.jobConf.set("mapred.task.id", taskAttemptIdStr); + this.jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr); + } + @Override public void run(Map inputs, Map outputs) throws Exception {