diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java index add7d08c40..daafc2e522 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; @@ -46,6 +48,10 @@ void init(JobConf jconf, AbstractMapOperator mapOp, KeyValueReader reader) throws IOException { execContext = mapOp.getExecContext(); this.mapOp = mapOp; + if (reader instanceof KeyValueInputMerger) { + KeyValueInputMerger kvMerger = (KeyValueInputMerger) reader; + kvMerger.setIOCxt(execContext.getIoCxt()); + } this.reader = reader; } @@ -103,6 +109,12 @@ private void closeReader() { LOG.warn("Cannot close " + (reader == null ? null : reader.getClass())); return; } + if (reader instanceof KeyValueInputMerger) { + // cleanup + KeyValueInputMerger kvMerger = (KeyValueInputMerger) reader; + kvMerger.clean(); + } + LOG.info("Closing MRReader on error"); MRReader mrReader = (MRReader)reader; try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java index 698fa7f69e..f5591d1bb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java @@ -21,12 +21,18 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.IOContext; +import org.apache.hadoop.mapred.split.TezGroupedSplit; +import org.apache.tez.mapreduce.lib.MRReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -35,6 +41,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; import org.apache.tez.runtime.library.api.KeyValueReader; /** @@ -50,12 +57,16 @@ public static final Logger l4j = LoggerFactory.getLogger(KeyValueInputMerger.class); private PriorityQueue pQueue = null; private KeyValueReader nextKVReader = null; + private KeyValueReader prevKVReader = null; private ObjectInspector[] inputObjInspectors = null; private Deserializer deserializer = null; private List structFields = null; private List fieldOIs = null; private final Map> kvReaderStandardObjMap = new HashMap>(); + private final Map kvReaderPathMap = + new IdentityHashMap<>(); + private IOContext ioCxt = null; public KeyValueInputMerger(List multiMRInputs, Deserializer deserializer, ObjectInspector[] inputObjInspectors, List sortCols) throws Exception { @@ -76,11 +87,31 @@ public KeyValueInputMerger(List multiMRInputs, Deserializer dese } l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size()); for (KeyValueReader input : multiMRInputs) { + TezGroupedSplit split = (TezGroupedSplit) ((MRReader) input).getSplit(); + List splits = split.getGroupedSplits(); + // There maybe more than 1 splits in the group, however, they all have 1 unique path. + // Assert that. + Path path = ((HiveInputFormat.HiveInputSplit) splits.get(0)).getPath(); + Path pathFromMap = kvReaderPathMap.putIfAbsent(input, path); + if (pathFromMap != null) { + assert pathFromMap.equals(path); + } addToQueue(input); } } /** + * Set the IOContext reference so that input path can be changed. + * This is needed because there can be more than one inputs at play + * at a given time, however, there is one IOContext which needs + * correct input path. In other joins, even when there are multiple + * inputs, they are read sequentially which is not the case here. + */ + public void setIOCxt(IOContext ioCxt) { + this.ioCxt = ioCxt; + } + + /** * Add KeyValueReader to queue if it has more key-value * * @param kvReader @@ -106,7 +137,15 @@ public boolean next() throws IOException { //get the new nextKVReader with lowest key nextKVReader = pQueue.poll(); - return nextKVReader != null; + if (nextKVReader == null) + return false; + + if (nextKVReader != prevKVReader) { + prevKVReader = nextKVReader; + // update path in IOContext + ioCxt.setInputPath(kvReaderPathMap.get(nextKVReader)); + } + return true; } @Override @@ -120,6 +159,14 @@ public Object getCurrentValue() throws IOException { } /** + * Cleanup references + */ + public void clean() { + ioCxt = null; + prevKVReader = null; + } + + /** * Comparator that compares KeyValuesReader on their current key */ class KVReaderComparator implements Comparator { diff --git a/ql/src/test/results/clientpositive/llap/llap_smb.q.out b/ql/src/test/results/clientpositive/llap/llap_smb.q.out index a75b3dadac..f5eec29172 100644 --- a/ql/src/test/results/clientpositive/llap/llap_smb.q.out +++ b/ql/src/test/results/clientpositive/llap/llap_smb.q.out @@ -321,8 +321,26 @@ POSTHOOK: Input: default@orc_a@y=2001/q=8 POSTHOOK: Input: default@orc_a@y=2001/q=9 POSTHOOK: Input: default@orc_b #### A masked pattern was here #### -2001 4 139630 -2001 7 52 +2000 2 6578 +2001 8 9438 +2000 3 6149 +2000 5 5720 +2000 9 8151 +2001 0 6721 +2001 1 7493 +2001 2 6578 +2001 4 7865 +2001 9 8151 +2000 1 7493 +2000 7 6149 +2000 8 9438 +2001 6 5577 +2001 7 6149 +2000 0 6721 +2000 4 7865 +2000 6 5577 +2001 3 6149 +2001 5 5720 PREHOOK: query: DROP TABLE orc_a PREHOOK: type: DROPTABLE PREHOOK: Input: default@orc_a