diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index a51c352..a86c4b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -70,6 +70,7 @@ protected String operatorId; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; + private transient Map dummyOpsMap = null; private static AtomicInteger seqId; @@ -1353,9 +1354,13 @@ public OperatorType getType() { public Map getTagToOperatorTree() { if ((parentOperators == null) || (parentOperators.size() == 0)) { - return null; + return dummyOpsMap; } - Map dummyOps = parentOperators.get(0).getTagToOperatorTree(); - return dummyOps; + dummyOpsMap = parentOperators.get(0).getTagToOperatorTree(); + return dummyOpsMap; + } + + public void setTagToOperatorTree(Map dummyOps) { + this.dummyOpsMap = dummyOps; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index f2ba3c5..2f1811c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -208,6 +208,8 @@ public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; public static final String HIVE_ADDED_JARS = "hive.added.jars"; + public static String MAPNAME = "Map"; + public static String REDUCENAME = "Reduce"; /** * ReduceField: @@ -239,6 +241,7 @@ private Utilities() { private static ThreadLocal> gWorkMap = new ThreadLocal>() { + @Override protected Map initialValue() { return new HashMap(); } @@ -304,12 +307,14 @@ public static ReduceWork getReduceWork(Configuration conf) { public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir, boolean useCache) { for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) { - setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache); + String prefix = getMergeInputName(baseWork); + baseWork.setName(prefix); + setBaseWork(conf, baseWork, mrScratchDir, prefix + MERGE_PLAN_NAME, useCache); String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); if (prefixes == null) { - prefixes = baseWork.getName(); + prefixes = new String(prefix); } else { - prefixes = prefixes + "," + baseWork.getName(); + prefixes = prefixes + "," + prefix; } conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes); } @@ -429,7 +434,11 @@ private static BaseWork getBaseWork(Configuration conf, String name) { + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } } else if (name.contains(MERGE_PLAN_NAME)) { - gWork = deserializePlan(in, MapWork.class, conf); + if (name.contains(MAPNAME)) { + gWork = deserializePlan(in, MapWork.class, conf); + } else { + gWork = deserializePlan(in, ReduceWork.class, conf); + } } gWorkMap.get().put(path, gWork); } else if (LOG.isDebugEnabled()) { @@ -3792,4 +3801,8 @@ public static String getQualifiedPath(HiveConf conf, Path path) throws HiveExcep public static boolean isDefaultNameNode(HiveConf conf) { return !conf.getChangedProperties().containsKey(HiveConf.ConfVars.HADOOPFS.varname); } + + public static String getMergeInputName(BaseWork work) { + return work.getName(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index d4d1b83..09d9b38 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -120,7 +120,7 @@ public Object call() { key = queryId + prefix; cacheKeys.add(key); - mergeWorkList.add( + mergeWorkList.add( (MapWork) cache.retrieve(key, new Callable() { @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 63c63b8..b38ee3c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -19,15 +19,18 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -64,78 +67,136 @@ public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); - private ReduceWork redWork; + private ReduceWork reduceWork; + + List mergeWorkList = null; + private boolean foundCachedMergeWork = false; + private final Map connectOps = + new TreeMap(); + private final Map tagToReducerMap = new HashMap(); private Operator reducer; private ReduceRecordSource[] sources; - private final byte position = 0; + private byte bigTablePosition = 0; private boolean abort; - @Override - void init(final JobConf jconf, ProcessorContext processorContext, - MRTaskReporter mrReporter, Map inputs, - Map outputs) throws Exception { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, processorContext, mrReporter, inputs, outputs); + public ReduceRecordProcessor(final JobConf jconf) throws Exception { ObjectCache cache = ObjectCacheFactory.getCache(jconf); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); cacheKey = queryId + REDUCE_PLAN_KEY; - redWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { + reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { @Override public Object call() { return Utilities.getReduceWork(jconf); + } + }); + + Utilities.setReduceWork(jconf, reduceWork); + + String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); + if (prefixes != null) { + mergeWorkList = new ArrayList(); + for (String prefix : prefixes.split(",")) { + cacheKey = queryId + prefix; + ReduceWork mergeReduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { + @Override + public Object call() throws Exception { + return Utilities.getMergeWork(jconf); + } + }); + if (mergeReduceWork != null) { + l4j.info("Found merge work in cache"); + foundCachedMergeWork = true; + mergeWorkList.add(mergeReduceWork); + continue; + } + if (foundCachedMergeWork) { + throw new Exception( + "Should find all work in cache else operator pipeline will be in non-deterministic state"); } - }); - Utilities.setReduceWork(jconf, redWork); - reducer = redWork.getReducer(); - reducer.getParentOperators().clear(); - reducer.setParentOperators(null); // clear out any parents as reducer is the root + if ((prefix != null) && (prefix.isEmpty() == false)) { + mergeReduceWork = (ReduceWork) Utilities.getMergeWork(jconf, prefix); + mergeWorkList.add(mergeReduceWork); + } + } + } + } - int numTags = redWork.getTagToValueDesc().size(); + @Override + void init(JobConf jconf, ProcessorContext processorContext, + MRTaskReporter mrReporter, Map inputs, + Map outputs) throws Exception { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); + super.init(jconf, processorContext, mrReporter, inputs, outputs); - ObjectInspector[] ois = new ObjectInspector[numTags]; - sources = new ReduceRecordSource[numTags]; + connectOps.clear(); + ReduceWork redWork = reduceWork; + tagToReducerMap.put(redWork.getTag(), redWork); + if (mergeWorkList != null) { + for (ReduceWork mergeReduceWork : mergeWorkList) { + reducer = mergeReduceWork.getReducer(); + DummyStoreOperator dummyStoreOp = getJoinParentOp(reducer); + connectOps.put(mergeReduceWork.getTag(), dummyStoreOp); + tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork); + } - for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { - TableDesc keyTableDesc = redWork.getKeyDesc(); - TableDesc valueTableDesc = redWork.getTagToValueDesc().get(tag); - - // make the reader ready for prime time - Input input = inputs.get(redWork.getTagToInput().get(tag)); - input.start(); - processorContext.waitForAnyInputReady(Collections.singleton(input)); - KeyValuesReader reader = (KeyValuesReader) input.getReader(); - - // now we can setup the record source - sources[tag] = new ReduceRecordSource(); - sources[tag].init(jconf, reducer, redWork.getVectorMode(), keyTableDesc, valueTableDesc, - reader, tag == position, (byte) tag, - redWork.getAllScratchColumnVectorTypeMaps()); - ois[tag] = sources[tag].getObjectInspector(); + bigTablePosition = (byte) reduceWork.getTag(); + redWork.getReducer().setTagToOperatorTree(connectOps); } + ObjectInspector[] mainWorkOIs = null; MapredContext.init(false, new JobConf(jconf)); ((TezContext) MapredContext.get()).setInputs(inputs); ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); - ((TezContext) MapredContext.get()).setRecordSources(sources); + int numTags = reduceWork.getTagToValueDesc().size(); + reducer = reduceWork.getReducer(); + if (numTags > 1) { + sources = new ReduceRecordSource[numTags]; + mainWorkOIs = new ObjectInspector[numTags]; + initializeMultipleSources(reduceWork, numTags, mainWorkOIs, sources); + ((TezContext) MapredContext.get()).setRecordSources(sources); + reducer.initialize(jconf, mainWorkOIs); + } else { + numTags = tagToReducerMap.keySet().size(); + sources = new ReduceRecordSource[numTags]; + mainWorkOIs = new ObjectInspector[numTags]; + for (int i : tagToReducerMap.keySet()) { + redWork = tagToReducerMap.get(i); + reducer = redWork.getReducer(); + initializeSourceForTag(redWork, i, mainWorkOIs, sources, + redWork.getTagToValueDesc().get(0), redWork.getTagToInput().get(0)); + reducer.initializeLocalWork(jconf); + } + reducer = reduceWork.getReducer(); + ((TezContext) MapredContext.get()).setRecordSources(sources); + reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[bigTablePosition] }); + for (int i : tagToReducerMap.keySet()) { + if (i == bigTablePosition) { + continue; + } + redWork = tagToReducerMap.get(i); + reducer = redWork.getReducer(); + reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] }); + } + } + reducer = reduceWork.getReducer(); // initialize reduce operator tree try { l4j.info(reducer.dump(0)); - reducer.initialize(jconf, ois); // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the // dummy parent operators as well. List dummyOps = redWork.getDummyOps(); if (dummyOps != null) { - for (Operator dummyOp : dummyOps){ + for (Operator dummyOp : dummyOps) { dummyOp.initialize(jconf, null); } } @@ -165,12 +226,41 @@ public Object call() { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } + private void initializeMultipleSources(ReduceWork redWork, int numTags, ObjectInspector[] ois, + ReduceRecordSource[] sources) throws Exception { + for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { + if (redWork.getTagToValueDesc().get(tag) == null) { + continue; + } + initializeSourceForTag(redWork, tag, ois, sources, redWork.getTagToValueDesc().get(tag), + redWork.getTagToInput().get(tag)); + } + } + + private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector[] ois, + ReduceRecordSource[] sources, TableDesc valueTableDesc, String inputName) + throws Exception { + reducer = redWork.getReducer(); + reducer.getParentOperators().clear(); + reducer.setParentOperators(null); // clear out any parents as reducer is the root + + TableDesc keyTableDesc = redWork.getKeyDesc(); + KeyValuesReader reader = (KeyValuesReader) inputs.get(inputName).getReader(); + + sources[tag] = new ReduceRecordSource(); + sources[tag].init(jconf, redWork.getReducer(), redWork.getVectorMode(), keyTableDesc, + valueTableDesc, reader, tag == bigTablePosition, (byte) tag, + redWork.getAllScratchColumnVectorTypeMaps()); + ois[tag] = sources[tag].getObjectInspector(); + } + @Override void run() throws Exception { List shuffleInputs = getShuffleInputs(inputs); if (shuffleInputs != null) { l4j.info("Waiting for ShuffleInputs to become ready"); - processorContext.waitForAllInputsReady(new ArrayList(shuffleInputs)); + processorContext + .waitForAllInputsReady(new ArrayList(shuffleInputs)); } for (Entry outputEntry : outputs.entrySet()) { @@ -180,19 +270,21 @@ void run() throws Exception { } // run the operator pipeline - while (sources[position].pushRecord()) {} + while (sources[bigTablePosition].pushRecord()) { + } } /** * Get the inputs that should be streamed through reduce plan. + * * @param inputs * @return */ private List getShuffleInputs(Map inputs) { - //the reduce plan inputs have tags, add all inputs that have tags - Map tagToinput = redWork.getTagToInput(); + // the reduce plan inputs have tags, add all inputs that have tags + Map tagToinput = reduceWork.getTagToInput(); ArrayList shuffleInputs = new ArrayList(); - for(String inpStr : tagToinput.values()){ + for (String inpStr : tagToinput.values()) { if (inputs.get(inpStr) == null) { throw new AssertionError("Cound not find input: " + inpStr); } @@ -213,13 +305,18 @@ void close(){ } reducer.close(abort); + if (mergeWorkList != null) { + for (ReduceWork redWork : mergeWorkList) { + redWork.getReducer().close(abort); + } + } // Need to close the dummyOps as well. The operator pipeline // is not considered "closed/done" unless all operators are // done. For broadcast joins that includes the dummy parents. - List dummyOps = redWork.getDummyOps(); + List dummyOps = reduceWork.getDummyOps(); if (dummyOps != null) { - for (Operator dummyOp : dummyOps){ + for (Operator dummyOp : dummyOps) { dummyOp.close(abort); } } @@ -230,8 +327,8 @@ void close(){ if (!abort) { // signal new failure to map-reduce l4j.error("Hit error while closing operators - failing tree"); - throw new RuntimeException("Hive Runtime Error while closing operators: " - + e.getMessage(), e); + throw new RuntimeException( + "Hive Runtime Error while closing operators: " + e.getMessage(), e); } } finally { Utilities.clearWorkMap(); @@ -239,4 +336,17 @@ void close(){ } } + private DummyStoreOperator getJoinParentOp( + Operator mergeReduceOp) { + for (Operator childOp : mergeReduceOp + .getChildOperators()) { + if ((childOp.getChildOperators() == null) + || (childOp.getChildOperators().isEmpty())) { + return (DummyStoreOperator) childOp; + } else { + return getJoinParentOp(childOp); + } + } + return null; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 1a43b72..fcb959c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -68,7 +68,7 @@ private boolean abort = false; - private static Deserializer inputKeyDeserializer; + private Deserializer inputKeyDeserializer; // Input value serde needs to be an array to support different SerDe // for different tags diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 95bf8c9..f073deb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -131,7 +131,7 @@ public void run(Map inputs, Map out if (isMap) { rproc = new MapRecordProcessor(jobConf); } else { - rproc = new ReduceRecordProcessor(); + rproc = new ReduceRecordProcessor(jobConf); } initializeAndRunProcessor(inputs, outputs); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index fb553f1..3c5e198 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -178,8 +178,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. - if ((context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) - || (joinOp.getOpTraits().getNumReduceSinks() >= 2)) { + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) { convertJoinSMBJoin(joinOp, context, 0, 0, false); return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java index 5b16e5b..5b73866 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java @@ -70,12 +70,11 @@ context.opMergeJoinWorkMap.put(mergeJoinOp, mergeWork); } + mergeWork.addMergedWork(null, parentWork, context.leafOperatorToFollowingWork); mergeWork.setMergeJoinOperator(mergeJoinOp); - mergeWork.addMergedWork(null, parentWork); tezWork.setVertexType(mergeWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); for (BaseWork grandParentWork : tezWork.getParents(parentWork)) { - parentWork.setName(grandParentWork.getName()); TezEdgeProperty edgeProp = tezWork.getEdgeProperty(grandParentWork, parentWork); tezWork.disconnect(grandParentWork, parentWork); tezWork.connect(grandParentWork, mergeWork, edgeProp); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 5f0c0ef..6592eec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -175,12 +175,20 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0]; - Operator rootOp = OperatorUtils.findSingleOperatorUpstream(mapJoinOp.getParentOperators() - .get(joinConf.getPosBigTable()), TableScanOperator.class); + Operator rootOp = + OperatorUtils.findSingleOperatorUpstream( + mapJoinOp.getParentOperators().get(joinConf.getPosBigTable()), + ReduceSinkOperator.class); + if (rootOp == null) { + rootOp = + OperatorUtils.findSingleOperatorUpstream( + mapJoinOp.getParentOperators().get(joinConf.getPosBigTable()), + TableScanOperator.class); + } - if (rootOp instanceof TableScanOperator) { // we will run in mapper + if (rootOp instanceof TableScanOperator) { // we will run in reducer edgeType = EdgeType.CUSTOM_EDGE; - } else { // we will run in reducer + } else { // we will run in mapper edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index e67d98b..0990894 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -166,7 +166,7 @@ public Object process(Node nd, Stack stack, } // connect the work correctly. work.addSortCols(root.getOpTraits().getSortCols().get(0)); - mergeJoinWork.addMergedWork(work, null); + mergeJoinWork.addMergedWork(work, null, context.leafOperatorToFollowingWork); Operator parentOp = getParentFromStack(context.currentMergeJoinOperator, stack); int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); @@ -268,6 +268,7 @@ public Object process(Node nd, Stack stack, if (LOG.isDebugEnabled()) { LOG.debug("Removing " + parent + " as parent from " + root); } + context.leafOperatorToFollowingWork.remove(parent); context.leafOperatorToFollowingWork.put(parent, work); root.removeParent(parent); } @@ -326,7 +327,7 @@ public Object process(Node nd, Stack stack, MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork; CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator(); work.setTag(mergeJoinOp.getTagForOperator(operator)); - mergeJoinWork.addMergedWork(null, work); + mergeJoinWork.addMergedWork(null, work, context.leafOperatorToFollowingWork); tezWork.setVertexType(mergeJoinWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); for (BaseWork parentWork : tezWork.getParents(work)) { TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work); @@ -399,7 +400,7 @@ public Object process(Node nd, Stack stack, return null; } - private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) + private int getFollowingWorkIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) throws SemanticException { int index = 0; for (BaseWork baseWork : tezWork.getChildren(unionWork)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java index b2369fa..1c61186 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -21,13 +21,17 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.mapred.JobConf; +import com.google.common.collect.BiMap; + public class MergeJoinWork extends BaseWork { private CommonMergeJoinOperator mergeJoinOp = null; @@ -65,7 +69,8 @@ public void setMergeJoinOperator(CommonMergeJoinOperator mergeJoinOp) { this.mergeJoinOp = mergeJoinOp; } - public void addMergedWork(BaseWork work, BaseWork connectWork) { + public void addMergedWork(BaseWork work, BaseWork connectWork, + Map, BaseWork> leafOperatorToFollowingWork) { if (work != null) { if ((bigTableWork != null) && (bigTableWork != work)) { assert false; @@ -76,6 +81,26 @@ public void addMergedWork(BaseWork work, BaseWork connectWork) { if (connectWork != null) { this.mergeWorkList.add(connectWork); + if ((connectWork instanceof ReduceWork) && (bigTableWork != null)) { + setReduceSinkOutputName(connectWork, leafOperatorToFollowingWork, bigTableWork.getName()); + } + } + + if (work != null) { + for (BaseWork mergeWork : mergeWorkList) { + if (mergeWork instanceof ReduceWork) { + setReduceSinkOutputName(mergeWork, leafOperatorToFollowingWork, work.getName()); + } + } + } + } + + private void setReduceSinkOutputName(BaseWork mergeWork, + Map, BaseWork> leafOperatorToFollowingWork, String name) { + for (Entry, BaseWork> entry : leafOperatorToFollowingWork.entrySet()) { + if (entry.getValue() == mergeWork) { + ((ReduceSinkOperator) entry.getKey()).getConf().setOutputName(name); + } } } diff --git ql/src/test/queries/clientpositive/tez_smb_main.q ql/src/test/queries/clientpositive/tez_smb_main.q index 4f178f7..e49b5cb 100644 --- ql/src/test/queries/clientpositive/tez_smb_main.q +++ ql/src/test/queries/clientpositive/tez_smb_main.q @@ -82,3 +82,43 @@ UNION ALL select s2.key as key, s2.value as value from tab s2 ) a join tab_part b on (a.key = b.key); +select count(*) from (select s1.key as key, s1.value as value from tab s1 join tab s3 on s1.key=s3.key +UNION ALL +select s2.key as key, s2.value as value from tab s2 +) a join tab_part b on (a.key = b.key); + +explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id; + +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id; + +set hive.auto.convert.join=false; + +explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id; + +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id; diff --git ql/src/test/results/clientpositive/tez/tez_join.q.out ql/src/test/results/clientpositive/tez/tez_join.q.out index a051dc7..af5070a 100644 --- ql/src/test/results/clientpositive/tez/tez_join.q.out +++ ql/src/test/results/clientpositive/tez/tez_join.q.out @@ -48,9 +48,7 @@ STAGE PLANS: Stage: Stage-1 Tez Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE) - Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) - Reducer 5 <- Map 4 (SIMPLE_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -69,7 +67,7 @@ STAGE PLANS: key expressions: _col0 (type: string), _col1 (type: string) sort order: ++ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Map 4 + Map 3 Map Operator Tree: TableScan alias: t2 @@ -91,46 +89,33 @@ STAGE PLANS: expressions: KEY.reducesinkkey0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reducer 3 - Reduce Operator Tree: - Merge Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: string) - 1 _col0 (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Filter Operator - predicate: (_col0 = _col1) (type: boolean) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Reducer 5 Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator @@ -186,9 +171,7 @@ STAGE PLANS: Stage: Stage-1 Tez Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE) - Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) - Reducer 5 <- Map 4 (SIMPLE_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -211,7 +194,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: string), _col1 (type: string) Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE value expressions: _col2 (type: bigint) - Map 4 + Map 3 Map Operator Tree: TableScan alias: t2 @@ -243,36 +226,6 @@ STAGE PLANS: expressions: _col0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reducer 3 - Reduce Operator Tree: - Merge Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: string) - 1 _col0 (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Filter Operator - predicate: (_col0 = _col1) (type: boolean) - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Reducer 5 Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) @@ -284,11 +237,28 @@ STAGE PLANS: expressions: _col0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator diff --git ql/src/test/results/clientpositive/tez/tez_smb_main.q.out ql/src/test/results/clientpositive/tez/tez_smb_main.q.out index f0e4b34..7879c03 100644 --- ql/src/test/results/clientpositive/tez/tez_smb_main.q.out +++ ql/src/test/results/clientpositive/tez/tez_smb_main.q.out @@ -1194,3 +1194,319 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: select count(*) from (select s1.key as key, s1.value as value from tab s1 join tab s3 on s1.key=s3.key +UNION ALL +select s2.key as key, s2.value as value from tab s2 +) a join tab_part b on (a.key = b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from (select s1.key as key, s1.value as value from tab s1 join tab s3 on s1.key=s3.key +UNION ALL +select s2.key as key, s2.value as value from tab s2 +) a join tab_part b on (a.key = b.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +1646 +PREHOOK: query: explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Reducer 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Map 3 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reducer 4 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col1 + input vertices: + 0 Reducer 2 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 5 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +480 +PREHOOK: query: explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 4 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) + Reducer 5 <- Reducer 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Map 3 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reducer 4 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 5 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select rt1.id from +(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +480