diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index 1674d4b..12a43c0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; @@ -47,6 +48,8 @@ public HiveMapFunction(byte[] buffer) { call(Iterator> it) throws Exception { if (jobConf == null) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); + // set mapred.task.partition in executor side. + jobConf.setInt("mapred.task.partition", TaskContext.get().getPartitionId()); } SparkRecordHandler mapRecordHandler;