diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java index add7d08c40..d440aeaa8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; @@ -46,6 +48,10 @@ void init(JobConf jconf, AbstractMapOperator mapOp, KeyValueReader reader) throws IOException { execContext = mapOp.getExecContext(); this.mapOp = mapOp; + if (reader instanceof KeyValueInputMerger) { + KeyValueInputMerger kvMerger = (KeyValueInputMerger) reader; + kvMerger.setIOCxt(execContext.getIoCxt()); + } this.reader = reader; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java index 698fa7f69e..b3030b2308 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java @@ -25,8 +25,13 @@ import java.util.Map; import java.util.PriorityQueue; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.IOContext; +import org.apache.hadoop.mapred.split.TezGroupedSplit; +import org.apache.tez.mapreduce.lib.MRReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -35,6 +40,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; import org.apache.tez.runtime.library.api.KeyValueReader; /** @@ -50,12 +56,16 @@ public static final Logger l4j = LoggerFactory.getLogger(KeyValueInputMerger.class); private PriorityQueue pQueue = null; private KeyValueReader nextKVReader = null; + private KeyValueReader prevKVReader = null; private ObjectInspector[] inputObjInspectors = null; private Deserializer deserializer = null; private List structFields = null; private List fieldOIs = null; private final Map> kvReaderStandardObjMap = new HashMap>(); + private final Map kvReaderPathMap = + new HashMap<>(); + private IOContext ioCxt = null; public KeyValueInputMerger(List multiMRInputs, Deserializer deserializer, ObjectInspector[] inputObjInspectors, List sortCols) throws Exception { @@ -76,11 +86,22 @@ public KeyValueInputMerger(List multiMRInputs, Deserializer dese } l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size()); for (KeyValueReader input : multiMRInputs) { + TezGroupedSplit split = (TezGroupedSplit) ((MRReader) input).getSplit(); + List splits = split.getGroupedSplits(); + Path path = ((HiveInputFormat.HiveInputSplit) splits.get(0)).getPath(); + kvReaderPathMap.put(input, path); addToQueue(input); } } /** + * + */ + public void setIOCxt(IOContext ioCxt) { + this.ioCxt = ioCxt; + } + + /** * Add KeyValueReader to queue if it has more key-value * * @param kvReader @@ -106,7 +127,15 @@ public boolean next() throws IOException { //get the new nextKVReader with lowest key nextKVReader = pQueue.poll(); - return nextKVReader != null; + if (nextKVReader == null) + return false; + + if (nextKVReader != prevKVReader) { + prevKVReader = nextKVReader; + // update path in IOContext + ioCxt.setInputPath(kvReaderPathMap.get(nextKVReader)); + } + return true; } @Override