diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 21ac7ab..c311c8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -101,6 +102,8 @@ /* this is only used in the error code path */ private List[] valueStringWriters; + private ExecMapperContext execContext = null; + public void init(JobConf job, OutputCollector output, Reporter reporter) { super.init(job, output, reporter); @@ -195,6 +198,11 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { throw new RuntimeException(e); } + execContext = new ExecMapperContext(job); + execContext.setJc(jc); + execContext.setLocalWork(gWork.getMapRedLocalWork()); + reducer.setExecContext(execContext); + reducer.setReporter(rp); OperatorUtils.setChildrenCollector( Arrays.>asList(reducer), output);