diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index c77e081..4d2b040 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; @@ -240,7 +241,7 @@ private void initializeMapRecordSources() throws Exception { KeyValueReader reader = legacyMRInput.getReader(); position = mapOp.getConf().getTag(); sources[position] = new MapRecordSource(); - sources[position].init(jconf, mapOp, reader); + sources[position].init(jconf, mapOp, reader, execContext); for (MapOperator mapOp : mergeMapOpList) { int tag = mapOp.getConf().getTag(); sources[tag] = new MapRecordSource(); @@ -250,7 +251,7 @@ private void initializeMapRecordSources() throws Exception { l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName); List kvReaderList = new ArrayList(kvReaders); reader = new KeyValueInputMerger(kvReaderList); - sources[tag].init(jconf, mapOp, reader); + sources[tag].init(jconf, mapOp, reader, execContext); } ((TezContext) MapredContext.get()).setRecordSources(sources); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java index 0419568..6b92593 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java @@ -44,8 +44,8 @@ private KeyValueReader reader = null; private final boolean grouped = false; - void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException { - execContext = new ExecMapperContext(jconf); + void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader, ExecMapperContext execContext) throws IOException { + this.execContext = execContext; this.mapOp = mapOp; this.reader = reader; }