diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java index 7df626b..4f1b7d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java @@ -70,10 +70,14 @@ private void setupMRLegacyConfigs() { taskAttemptIdBuilder.append("r_"); } - // Spark task attempt id is increased by Spark context instead of task, which may introduce - // unstable qtest output, since non Hive features depends on this, we always set it to 0 here. + // Hive requires this TaskAttemptId to be unique. MR's TaskAttemptId is composed + // of "attempt_timestamp_jobNum_m/r_taskNum_attemptNum". The counterpart for + // Spark should be "attempt_timestamp_stageNum_m/r_partitionId_attemptNum". + // When there're multiple attempts for a task, Hive will rely on the partitionId + // to figure out if the data are duplicate or not when collecting the final outputs + // (see org.apache.hadoop.hive.ql.exec.Utils.removeTempOrDuplicateFiles) taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId())) - .append("_0"); + .append("_").append(TaskContext.get().attemptNumber()); String taskAttemptIdStr = taskAttemptIdBuilder.toString(); jobConf.set("mapred.task.id", taskAttemptIdStr);