diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java index 7df626b..c5967ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java @@ -70,10 +70,13 @@ private void setupMRLegacyConfigs() { taskAttemptIdBuilder.append("r_"); } - // Spark task attempt id is increased by Spark context instead of task, which may introduce - // unstable qtest output, since non Hive features depends on this, we always set it to 0 here. + // Hive relies on this unique id as the FileSinkOp's output filename. Retries of the same + // partition will have the same parititionId with different attemptNumber, but have different + // taskId (MR keeps the same taskId). While hive will rely on the same taskId to figure out if + // the data are duplicate or not (see org.apache.hadoop.hive.ql.exec.Utils.removeTempOrDuplicateFiles) + // when collecting the final outputs. Thus, use partitionId instead of taskAttemptId here. taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId())) - .append("_0"); + .append("_").append(TaskContext.get().attemptNumber()); String taskAttemptIdStr = taskAttemptIdBuilder.toString(); jobConf.set("mapred.task.id", taskAttemptIdStr);