diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index 7befaee..4d6e197 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -22,6 +22,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; @@ -45,10 +46,12 @@ public HiveMapFunction(byte[] buffer) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); } - SparkMapRecordHandler mapper = new SparkMapRecordHandler(); - mapper.configure(jobConf); + SparkMapRecordHandler mapRecordHandler = new SparkMapRecordHandler(); + HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler); + //TODO we need to implement a Spark specified Reporter to collect stats, refer to HIVE-7709. + mapRecordHandler.init(jobConf, result, Reporter.NULL); - return new HiveMapFunctionResultList(jobConf, it, mapper); + return result; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java index 8e05011..2510324 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java @@ -44,7 +44,7 @@ public HiveMapFunctionResultList(Configuration conf, @Override protected void processNextRecord(Tuple2 inputRecord) throws IOException { - recordHandler.map(inputRecord._1(), inputRecord._2(), this, Reporter.NULL); + recordHandler.process(inputRecord._2()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 52324c9..e44ca23 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -77,7 +77,7 @@ private final ExecMapperContext execContext = new ExecMapperContext(); - public void configure(JobConf job) { + public void init(JobConf job, OutputCollector output, Reporter reporter) { // Allocate the bean at the beginning - memoryMXBean = ManagementFactory.getMemoryMXBean(); l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); @@ -128,6 +128,12 @@ public void configure(JobConf job) { mo.initializeLocalWork(jc); mo.initialize(jc, null); + oc = output; + rp = reporter; + OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); + mo.setReporter(rp); + MapredContext.get().setReporter(reporter); + if (localWork == null) { return; } @@ -152,15 +158,7 @@ public void configure(JobConf job) { } } - public void map(Object key, Object value, OutputCollector output, - Reporter reporter) throws IOException { - if (oc == null) { - oc = output; - rp = reporter; - OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); - mo.setReporter(rp); - MapredContext.get().setReporter(reporter); - } + public void process(Object value) throws IOException { // reset the execContext for each new row execContext.resetRow();