diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 4a1bd15..afc20e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -60,15 +60,16 @@ import org.apache.hadoop.util.ReflectionUtils; /** - * ConvertJoinMapJoin is an optimization that replaces a common join - * (aka shuffle join) with a map join (aka broadcast or fragment replicate - * join when possible. Map joins have restrictions on which joins can be - * converted (e.g.: full outer joins cannot be handled as map joins) as well - * as memory restrictions (one side of the join has to fit into memory). + * ConvertJoinMapJoin is an optimization that replaces a common join (aka + * shuffle join) with a map join (aka broadcast or fragment replicate join when + * possible. Map joins have restrictions on which joins can be converted (e.g.: + * full outer joins cannot be handled as map joins) as well as memory + * restrictions (one side of the join has to fit into memory). */ public class ConvertJoinMapJoin implements NodeProcessor { - static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); + static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class + .getName()); @Override /* @@ -76,19 +77,20 @@ * since we need to walk the tree at any time when we modify the operator, we * might as well do it here. */ - public Object - process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; JoinOperator joinOp = (JoinOperator) nd; - TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx( + context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); + Object retval = checkAndConvertSMBJoin(context, joinOp, + tezBucketJoinProcCtx); if (retval == null) { return retval; } else { @@ -103,17 +105,19 @@ // reducers from the parent operators. int numBuckets = -1; int estimatedBuckets = -1; - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { - for (OperatorparentOp : joinOp.getParentOperators()) { + if (context.conf + .getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { + for (Operator parentOp : joinOp + .getParentOperators()) { if (parentOp.getOpTraits().getNumBuckets() > 0) { - numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? - parentOp.getOpTraits().getNumBuckets() : numBuckets; + numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? parentOp + .getOpTraits().getNumBuckets() : numBuckets; } if (parentOp instanceof ReduceSinkOperator) { - ReduceSinkOperator rs = (ReduceSinkOperator)parentOp; - estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? - rs.getConf().getNumReducers() : estimatedBuckets; + ReduceSinkOperator rs = (ReduceSinkOperator) parentOp; + estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? rs + .getConf().getNumReducers() : estimatedBuckets; } } @@ -127,22 +131,27 @@ numBuckets = 1; } LOG.info("Estimated number of buckets " + numBuckets); - int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets); + int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, + numBuckets); if (mapJoinConversionPos < 0) { - Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); + Object retval = checkAndConvertSMBJoin(context, joinOp, + tezBucketJoinProcCtx); if (retval == null) { return retval; } else { - // only case is full outer join with SMB enabled which is not possible. Convert to regular - // join. - convertJoinSMBJoin(joinOp, context, 0, 0, false, false); - return null; + // only case is full outer join with SMB enabled which is not possible. + // Convert to regular + // join. + convertJoinSMBJoin(joinOp, context, 0, 0, false, false); + return null; } } if (numBuckets > 1) { - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { - if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + if (context.conf + .getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) { + if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, + tezBucketJoinProcCtx)) { return null; } } @@ -159,12 +168,16 @@ return null; } - MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); - // map join operator by default has no bucket cols - mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, + mapJoinConversionPos); + // map join operator by default has no bucket cols and num of reduce sinks + // reduced by 1 + mapJoinOp.setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits() + .getNumReduceSinks() - 1)); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS - for (Operator childOp : mapJoinOp.getChildOperators()) { + for (Operator childOp : mapJoinOp + .getChildOperators()) { setAllChildrenTraitsToNull(childOp); } @@ -176,7 +189,8 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) { + if ((context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) + || (joinOp.getOpTraits().getNumReduceSinks() >= 2)) { convertJoinSMBJoin(joinOp, context, 0, 0, false, false); return null; } @@ -211,9 +225,11 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat return null; } - if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { + if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, + tezBucketJoinProcCtx)) { convertJoinSMBJoin(joinOp, context, mapJoinConversionPos, - tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true); + tezBucketJoinProcCtx.getNumBuckets(), + tezBucketJoinProcCtx.isSubQuery(), true); } else { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. @@ -251,9 +267,14 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont CommonMergeJoinOperator mergeJoinOp = (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, isSubQuery, mapJoinConversionPos, mapJoinDesc), joinOp.getSchema()); + int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks(); + if (adjustParentsChildren) { + // converted to SMB meaning we eliminated one of the RS + numReduceSinks -= 1; + } OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits() - .getSortCols()); + .getSortCols(), numReduceSinks); mergeJoinOp.setOpTraits(opTraits); mergeJoinOp.setStatistics(joinOp.getStatistics()); @@ -269,13 +290,15 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont childOp.getParentOperators().add(pos, mergeJoinOp); } - List> childOperators = mergeJoinOp.getChildOperators(); + List> childOperators = mergeJoinOp + .getChildOperators(); if (childOperators == null) { childOperators = new ArrayList>(); mergeJoinOp.setChildOperators(childOperators); } - List> parentOperators = mergeJoinOp.getParentOperators(); + List> parentOperators = mergeJoinOp + .getParentOperators(); if (parentOperators == null) { parentOperators = new ArrayList>(); mergeJoinOp.setParentOperators(parentOperators); @@ -289,10 +312,11 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont if (adjustParentsChildren) { mergeJoinOp.getConf().setGenJoinKeys(true); - List> newParentOpList = - new ArrayList>(); - for (Operator parentOp : mergeJoinOp.getParentOperators()) { - for (Operator grandParentOp : parentOp.getParentOperators()) { + List> newParentOpList = new ArrayList>(); + for (Operator parentOp : mergeJoinOp + .getParentOperators()) { + for (Operator grandParentOp : parentOp + .getParentOperators()) { grandParentOp.getChildOperators().remove(parentOp); grandParentOp.getChildOperators().add(mergeJoinOp); newParentOpList.add(grandParentOp); @@ -300,8 +324,8 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont } mergeJoinOp.getParentOperators().clear(); mergeJoinOp.getParentOperators().addAll(newParentOpList); - List> parentOps = - new ArrayList>(mergeJoinOp.getParentOperators()); + List> parentOps = new ArrayList>( + mergeJoinOp.getParentOperators()); for (Operator parentOp : parentOps) { int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp); if (parentIndex == mapJoinConversionPos) { @@ -310,8 +334,10 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont // insert the dummy store operator here DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator(); - dummyStoreOp.setParentOperators(new ArrayList>()); - dummyStoreOp.setChildOperators(new ArrayList>()); + dummyStoreOp + .setParentOperators(new ArrayList>()); + dummyStoreOp + .setChildOperators(new ArrayList>()); dummyStoreOp.getChildOperators().add(mergeJoinOp); int index = parentOp.getChildOperators().indexOf(mergeJoinOp); parentOp.getChildOperators().remove(index); @@ -324,43 +350,54 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); } - private void setAllChildrenTraitsToNull(Operator currentOp) { + private void setAllChildrenTraitsToNull( + Operator currentOp) { if (currentOp instanceof ReduceSinkOperator) { return; } - currentOp.setOpTraits(new OpTraits(null, -1, null)); - for (Operator childOp : currentOp.getChildOperators()) { - if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { + currentOp.setOpTraits(new OpTraits(null, -1, null, currentOp.getOpTraits() + .getNumReduceSinks() - 1)); + for (Operator childOp : currentOp + .getChildOperators()) { + if ((childOp instanceof ReduceSinkOperator) + || (childOp instanceof GroupByOperator)) { break; } setAllChildrenTraitsToNull(childOp); } } - private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, - int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + private boolean convertJoinBucketMapJoin(JoinOperator joinOp, + OptimizeTezProcContext context, int bigTablePosition, + TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { - if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) { + if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, + tezBucketJoinProcCtx)) { LOG.info("Check conversion to bucket map join failed."); return false; } - MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, + bigTablePosition); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); // we can set the traits for this join operator OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets(), null); + tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits() + .getNumReduceSinks() - 1); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); - // Once the conversion is done, we can set the partitioner to bucket cols on the small table + // Once the conversion is done, we can set the partitioner to bucket cols on + // the small table Map bigTableBucketNumMapping = new HashMap(); - bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets()); + bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), + tezBucketJoinProcCtx.getNumBuckets()); joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping); - LOG.info("Setting legacy map join to " + (!tezBucketJoinProcCtx.isSubQuery())); + LOG.info("Setting legacy map join to " + + (!tezBucketJoinProcCtx.isSubQuery())); joinDesc.setCustomBucketMapJoin(!tezBucketJoinProcCtx.isSubQuery()); return true; @@ -372,32 +409,35 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon * can convert the join to an SMB. Otherwise retain the bucket map join as it * is still more efficient than a regular join. */ - private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, - int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, + OptimizeTezProcContext context, int bigTablePosition, + TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { - ReduceSinkOperator bigTableRS = - (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); + ReduceSinkOperator bigTableRS = (ReduceSinkOperator) joinOp + .getParentOperators().get(bigTablePosition); int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits() - .getNumBuckets(); + .getNumBuckets(); // the sort and bucket cols have to match on both sides for this // transformation of the join operation - for (Operator parentOp : joinOp.getParentOperators()) { + for (Operator parentOp : joinOp + .getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { // could be mux/demux operators. Currently not supported LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time."); return false; } ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; - if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp - .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) { + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits() + .getSortCols(), rsOp.getOpTraits().getSortCols(), + rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) { LOG.info("We cannot convert to SMB because the sort column names do not match."); return false; } - if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp - .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) - == false) { + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits() + .getBucketColNames(), rsOp.getOpTraits().getBucketColNames(), + rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) { LOG.info("We cannot convert to SMB because bucket column names do not match."); return false; } @@ -414,10 +454,12 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont return true; } - private void setNumberOfBucketsOnChildren(Operator currentOp) { + private void setNumberOfBucketsOnChildren( + Operator currentOp) { int numBuckets = currentOp.getOpTraits().getNumBuckets(); - for (Operatorop : currentOp.getChildOperators()) { - if (!(op instanceof ReduceSinkOperator) && !(op instanceof GroupByOperator)) { + for (Operator op : currentOp.getChildOperators()) { + if (!(op instanceof ReduceSinkOperator) + && !(op instanceof GroupByOperator)) { op.getOpTraits().setNumBuckets(numBuckets); setNumberOfBucketsOnChildren(op); } @@ -430,24 +472,26 @@ private void setNumberOfBucketsOnChildren(Operator curre */ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, int bigTablePosition, - TezBucketJoinProcCtx tezBucketJoinProcCtx) - throws SemanticException { + TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { // bail on mux-operator because mux operator masks the emit keys of the // constituent reduce sinks if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) { - LOG.info("Operator is " + joinOp.getParentOperators().get(0).getName() + - ". Cannot convert to bucket map join"); + LOG.info("Operator is " + joinOp.getParentOperators().get(0).getName() + + ". Cannot convert to bucket map join"); return false; } - ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); + ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators() + .get(bigTablePosition); List> parentColNames = rs.getOpTraits().getBucketColNames(); - Operator parentOfParent = rs.getParentOperators().get(0); - List> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames(); + Operator parentOfParent = rs.getParentOperators() + .get(0); + List> grandParentColNames = parentOfParent.getOpTraits() + .getBucketColNames(); int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); // all keys matched. - if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(), - tezBucketJoinProcCtx) == false) { + if (checkColEquality(grandParentColNames, parentColNames, + rs.getColumnExprMap(), tezBucketJoinProcCtx) == false) { LOG.info("No info available to check for bucket map join. Cannot convert"); return false; } @@ -504,14 +548,15 @@ private boolean checkColEquality(List> grandParentColNames, return false; } - public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, - int buckets) throws SemanticException { + public int getMapJoinConversionPos(JoinOperator joinOp, + OptimizeTezProcContext context, int buckets) throws SemanticException { /* - * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is - * an outer join down the join tree that requires filterTag. We disable this conversion to map - * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a - * new operation to be able to support this. This seems like a corner case enough to special - * case this for now. + * HIVE-9038: Join tests fail in tez when we have more than 1 join on the + * same key and there is an outer join down the join tree that requires + * filterTag. We disable this conversion to map join here now. We need to + * emulate the behavior of HashTableSinkOperator as in MR or create a new + * operation to be able to support this. This seems like a corner case + * enough to special case this for now. */ if (joinOp.getConf().getConds().length > 1) { boolean hasOuter = false; @@ -530,18 +575,19 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c break; default: - throw new SemanticException("Unknown join type " + joinCondDesc.getType()); + throw new SemanticException("Unknown join type " + + joinCondDesc.getType()); } } if (hasOuter) { return -1; } } - Set bigTableCandidateSet = - MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds()); + Set bigTableCandidateSet = MapJoinProcessor + .getBigTableCandidates(joinOp.getConf().getConds()); - long maxSize = context.conf.getLongVar( - HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + long maxSize = context.conf + .getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); int bigTablePosition = -1; @@ -553,18 +599,18 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // max. This table is either the the big table or we cannot convert. boolean bigTableFound = false; - for (Operator parentOp : joinOp.getParentOperators()) { + for (Operator parentOp : joinOp + .getParentOperators()) { Statistics currInputStat = parentOp.getStatistics(); if (currInputStat == null) { - LOG.warn("Couldn't get statistics from: "+parentOp); + LOG.warn("Couldn't get statistics from: " + parentOp); return -1; } long inputSize = currInputStat.getDataSize(); - if ((bigInputStat == null) || - ((bigInputStat != null) && - (inputSize > bigInputStat.getDataSize()))) { + if ((bigInputStat == null) + || ((bigInputStat != null) && (inputSize > bigInputStat.getDataSize()))) { if (bigTableFound) { // cannot convert to map join; we've already chosen a big table @@ -572,7 +618,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c return -1; } - if (inputSize/buckets > maxSize) { + if (inputSize / buckets > maxSize) { if (!bigTableCandidateSet.contains(pos)) { // can't use the current table as the big table, but it's too // big for the map side. @@ -588,7 +634,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c totalSize += bigInputStat.getDataSize(); } - if (totalSize/buckets > maxSize) { + if (totalSize / buckets > maxSize) { // sum of small tables size in this join exceeds configured limit // hence cannot convert. return -1; @@ -600,7 +646,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c } } else { totalSize += currInputStat.getDataSize(); - if (totalSize/buckets > maxSize) { + if (totalSize / buckets > maxSize) { // cannot hold all map tables in memory. Cannot convert. return -1; } @@ -657,11 +703,13 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo } } mapJoinOp.getParentOperators().remove(bigTablePosition); - if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) { + if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp + .getParentOperators().get(0)))) { mapJoinOp.getParentOperators().add(bigTablePosition, parentBigTableOp.getParentOperators().get(0)); } - parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp); + parentBigTableOp.getParentOperators().get(0) + .removeChild(parentBigTableOp); for (Operator op : mapJoinOp.getParentOperators()) { if (!(op.getChildOperators().contains(mapJoinOp))) { op.getChildOperators().add(mapJoinOp); @@ -676,15 +724,17 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo private boolean hasDynamicPartitionBroadcast(Operator parent) { boolean hasDynamicPartitionPruning = false; - for (Operator op: parent.getChildOperators()) { + for (Operator op : parent.getChildOperators()) { while (op != null) { - if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { + if (op instanceof AppMasterEventOperator + && op.getConf() instanceof DynamicPruningEventDesc) { // found dynamic partition pruning operator hasDynamicPartitionPruning = true; break; } if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) { - // crossing reduce sink or file sink means the pruning isn't for this parent. + // crossing reduce sink or file sink means the pruning isn't for this + // parent. break; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 4e49260..05d649a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -74,7 +74,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @SuppressWarnings("unchecked") - Operator op = (Operator)nd; + Operator op = (Operator) nd; op.setOpTraits(op.getParentOperators().get(0).getOpTraits()); return null; } @@ -82,8 +82,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } /* - * Reduce sink operator is the de-facto operator - * for determining keyCols (emit keys of a map phase) + * Reduce sink operator is the de-facto operator for determining keyCols (emit + * keys of a map phase) */ public static class ReduceSinkRule implements NodeProcessor { @@ -91,11 +91,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - ReduceSinkOperator rs = (ReduceSinkOperator)nd; + ReduceSinkOperator rs = (ReduceSinkOperator) nd; List bucketCols = new ArrayList(); if (rs.getColumnExprMap() != null) { for (ExprNodeDesc exprDesc : rs.getConf().getKeyCols()) { - for (Entry entry : rs.getColumnExprMap().entrySet()) { + for (Entry entry : rs.getColumnExprMap() + .entrySet()) { if (exprDesc.isSame(entry.getValue())) { bucketCols.add(entry.getKey()); } @@ -106,34 +107,40 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> listBucketCols = new ArrayList>(); listBucketCols.add(bucketCols); int numBuckets = -1; - OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits(); + int numReduceSinks = 1; + OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf() + .getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); + numReduceSinks += parentOpTraits.getNumReduceSinks(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, + listBucketCols, numReduceSinks); rs.setOpTraits(opTraits); return null; } } /* - * Table scan has the table object and pruned partitions that has information such as - * bucketing, sorting, etc. that is used later for optimization. + * Table scan has the table object and pruned partitions that has information + * such as bucketing, sorting, etc. that is used later for optimization. */ public static class TableScanRule implements NodeProcessor { - public boolean checkBucketedTable(Table tbl, - ParseContext pGraphContext, + public boolean checkBucketedTable(Table tbl, ParseContext pGraphContext, PrunedPartitionList prunedParts) throws SemanticException { if (tbl.isPartitioned()) { List partitions = prunedParts.getNotDeniedPartns(); - // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number) + // construct a mapping of (Partition->bucket file names) and (Partition + // -> bucket number) if (!partitions.isEmpty()) { for (Partition p : partitions) { - List fileNames = - AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), pGraphContext); - // The number of files for the table should be same as number of buckets. + List fileNames = AbstractBucketJoinProc + .getBucketFilePathsOfPartition(p.getDataLocation(), + pGraphContext); + // The number of files for the table should be same as number of + // buckets. int bucketCount = p.getBucketCount(); if (fileNames.size() != 0 && fileNames.size() != bucketCount) { @@ -143,11 +150,12 @@ public boolean checkBucketedTable(Table tbl, } } else { - List fileNames = - AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext); + List fileNames = AbstractBucketJoinProc + .getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext); Integer num = new Integer(tbl.getNumBuckets()); - // The number of files for the table should be same as number of buckets. + // The number of files for the table should be same as number of + // buckets. if (fileNames.size() != 0 && fileNames.size() != num) { return false; } @@ -183,7 +191,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } sortedColsList.add(sortCols); } - OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList); + // num reduce sinks hardcoded to 0 because TS has no parents + OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, + sortedColsList, 0); ts.setOpTraits(opTraits); return null; } @@ -197,10 +207,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - GroupByOperator gbyOp = (GroupByOperator)nd; + GroupByOperator gbyOp = (GroupByOperator) nd; List gbyKeys = new ArrayList(); for (ExprNodeDesc exprDesc : gbyOp.getConf().getKeys()) { - for (Entry entry : gbyOp.getColumnExprMap().entrySet()) { + for (Entry entry : gbyOp.getColumnExprMap() + .entrySet()) { if (exprDesc.isSame(entry.getValue())) { gbyKeys.add(entry.getKey()); } @@ -208,8 +219,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } List> listBucketCols = new ArrayList>(); + int numReduceSinks = 0; + OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits(); + if (parentOpTraits != null) { + numReduceSinks = parentOpTraits.getNumReduceSinks(); + } listBucketCols.add(gbyKeys); - OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols); + OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, + numReduceSinks); gbyOp.setOpTraits(opTraits); return null; } @@ -217,17 +234,19 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, public static class SelectRule implements NodeProcessor { - public List> getConvertedColNames(List> parentColNames, - SelectOperator selOp) { + public List> getConvertedColNames( + List> parentColNames, SelectOperator selOp) { List> listBucketCols = new ArrayList>(); if (selOp.getColumnExprMap() != null) { if (parentColNames != null) { for (List colNames : parentColNames) { List bucketColNames = new ArrayList(); for (String colName : colNames) { - for (Entry entry : selOp.getColumnExprMap().entrySet()) { + for (Entry entry : selOp.getColumnExprMap() + .entrySet()) { if (entry.getValue() instanceof ExprNodeColumnDesc) { - if (((ExprNodeColumnDesc) (entry.getValue())).getColumn().equals(colName)) { + if (((ExprNodeColumnDesc) (entry.getValue())).getColumn() + .equals(colName)) { bucketColNames.add(entry.getKey()); } } @@ -244,9 +263,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - SelectOperator selOp = (SelectOperator)nd; - List> parentBucketColNames = - selOp.getParentOperators().get(0).getOpTraits().getBucketColNames(); + SelectOperator selOp = (SelectOperator) nd; + List> parentBucketColNames = selOp.getParentOperators() + .get(0).getOpTraits().getBucketColNames(); List> listBucketCols = null; List> listSortCols = null; @@ -254,18 +273,22 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (parentBucketColNames != null) { listBucketCols = getConvertedColNames(parentBucketColNames, selOp); } - List> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits() - .getSortCols(); + List> parentSortColNames = selOp.getParentOperators() + .get(0).getOpTraits().getSortCols(); if (parentSortColNames != null) { listSortCols = getConvertedColNames(parentSortColNames, selOp); } } int numBuckets = -1; - if (selOp.getParentOperators().get(0).getOpTraits() != null) { - numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets(); + int numReduceSinks = 0; + OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits(); + if (parentOpTraits != null) { + numBuckets = parentOpTraits.getNumBuckets(); + numReduceSinks = parentOpTraits.getNumReduceSinks(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, + listSortCols, numReduceSinks); selOp.setOpTraits(opTraits); return null; } @@ -276,31 +299,40 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - JoinOperator joinOp = (JoinOperator)nd; + JoinOperator joinOp = (JoinOperator) nd; List> bucketColsList = new ArrayList>(); List> sortColsList = new ArrayList>(); byte pos = 0; - for (Operator parentOp : joinOp.getParentOperators()) { + int numReduceSinks = 0; // will be set to the larger of the parents + for (Operator parentOp : joinOp + .getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { // can be mux operator break; } - ReduceSinkOperator rsOp = (ReduceSinkOperator)parentOp; + ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; if (rsOp.getOpTraits() == null) { ReduceSinkRule rsRule = new ReduceSinkRule(); rsRule.process(rsOp, stack, procCtx, nodeOutputs); } - bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos)); - sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos)); + OpTraits parentOpTraits = rsOp.getOpTraits(); + bucketColsList.add(getOutputColNames(joinOp, + parentOpTraits.getBucketColNames(), pos)); + sortColsList.add(getOutputColNames(joinOp, + parentOpTraits.getSortCols(), pos)); + if (parentOpTraits.getNumReduceSinks() > numReduceSinks) { + numReduceSinks = parentOpTraits.getNumReduceSinks(); + } pos++; } - joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList)); + joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, + numReduceSinks)); return null; } - private List getOutputColNames(JoinOperator joinOp, List> parentColNames, - byte pos) { + private List getOutputColNames(JoinOperator joinOp, + List> parentColNames, byte pos) { if (parentColNames != null) { List bucketColNames = new ArrayList(); @@ -311,8 +343,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, for (String colName : colNames) { for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) { if (exprNode instanceof ExprNodeColumnDesc) { - if(((ExprNodeColumnDesc)(exprNode)).getColumn().equals(colName)) { - for (Entry entry : joinOp.getColumnExprMap().entrySet()) { + if (((ExprNodeColumnDesc) (exprNode)).getColumn().equals(colName)) { + for (Entry entry : joinOp + .getColumnExprMap().entrySet()) { if (entry.getValue().isSame(exprNode)) { bucketColNames.add(entry.getKey()); // we have found the colName @@ -338,20 +371,20 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } /* - * When we have operators that have multiple parents, it is not - * clear which parent's traits we need to propagate forward. + * When we have operators that have multiple parents, it is not clear which + * parent's traits we need to propagate forward. */ public static class MultiParentRule implements NodeProcessor { @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - OpTraits opTraits = new OpTraits(null, -1, null); + OpTraits opTraits = new OpTraits(null, -1, null, -1); @SuppressWarnings("unchecked") - Operator operator = (Operator)nd; + Operator operator = (Operator) nd; operator.setOpTraits(opTraits); return null; - } + } } public static NodeProcessor getTableScanRule() { @@ -361,7 +394,7 @@ public static NodeProcessor getTableScanRule() { public static NodeProcessor getReduceSinkRule() { return new ReduceSinkRule(); } - + public static NodeProcessor getSelectRule() { return new SelectRule(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java index c2b3664..a687a3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java @@ -25,11 +25,14 @@ List> bucketColNames; List> sortColNames; int numBuckets; + int numReduceSinks; - public OpTraits(List> bucketColNames, int numBuckets, List> sortColNames) { + public OpTraits(List> bucketColNames, int numBuckets, + List> sortColNames, int numReduceSinks) { this.bucketColNames = bucketColNames; this.numBuckets = numBuckets; this.sortColNames = sortColNames; + this.numReduceSinks = numReduceSinks; } public List> getBucketColNames() { @@ -55,4 +58,12 @@ public void setSortColNames(List> sortColNames) { public List> getSortCols() { return sortColNames; } + + public void setNumReduceSinks(int numReduceSinks) { + this.numReduceSinks = numReduceSinks; + } + + public int getNumReduceSinks() { + return this.numReduceSinks; + } } diff --git ql/src/test/queries/clientpositive/tez_join.q ql/src/test/queries/clientpositive/tez_join.q new file mode 100644 index 0000000..8318725 --- /dev/null +++ ql/src/test/queries/clientpositive/tez_join.q @@ -0,0 +1,48 @@ +set hive.auto.convert.sortmerge.join = true; + +create table t1( +id string, +od string); + +create table t2( +id string, +od string); + +explain +select vt1.id from +(select rt1.id from +(select t1.id, row_number() over (partition by id order by od desc) as row_no from t1) rt1 +where rt1.row_no=1) vt1 +join +(select rt2.id from +(select t2.id, row_number() over (partition by id order by od desc) as row_no from t2) rt2 +where rt2.row_no=1) vt2 +where vt1.id=vt2.id; + +select vt1.id from +(select rt1.id from +(select t1.id, row_number() over (partition by id order by od desc) as row_no from t1) rt1 +where rt1.row_no=1) vt1 +join +(select rt2.id from +(select t2.id, row_number() over (partition by id order by od desc) as row_no from t2) rt2 +where rt2.row_no=1) vt2 +where vt1.id=vt2.id; + +explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id; + +explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id; diff --git ql/src/test/results/clientpositive/tez/tez_join.q.out ql/src/test/results/clientpositive/tez/tez_join.q.out new file mode 100644 index 0000000..e1b64be --- /dev/null +++ ql/src/test/results/clientpositive/tez/tez_join.q.out @@ -0,0 +1,434 @@ +PREHOOK: query: create table t1( +id string, +od string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table t1( +id string, +od string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: create table t2( +id string, +od string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t2 +POSTHOOK: query: create table t2( +id string, +od string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t2 +PREHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, row_number() over (partition by id order by od desc) as row_no from t1) rt1 +where rt1.row_no=1) vt1 +join +(select rt2.id from +(select t2.id, row_number() over (partition by id order by od desc) as row_no from t2) rt2 +where rt2.row_no=1) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, row_number() over (partition by id order by od desc) as row_no from t1) rt1 +where rt1.row_no=1) vt1 +join +(select rt2.id from +(select t2.id, row_number() over (partition by id order by od desc) as row_no from t2) rt2 +where rt2.row_no=1) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) + Reducer 5 <- Map 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: id (type: string), od (type: string) + sort order: +- + Map-reduce partition columns: id (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: id (type: string), od (type: string) + Map 4 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: id (type: string), od (type: string) + sort order: +- + Map-reduce partition columns: id (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: id (type: string), od (type: string) + Reducer 2 + Reduce Operator Tree: + Extract + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + PTF Operator + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: ((_wcol0 = 1) and _col0 is not null) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {KEY.reducesinkkey0} + 1 {KEY.reducesinkkey0} + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Reduce Operator Tree: + Extract + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + PTF Operator + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: ((_wcol0 = 1) and _col0 is not null) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select vt1.id from +(select rt1.id from +(select t1.id, row_number() over (partition by id order by od desc) as row_no from t1) rt1 +where rt1.row_no=1) vt1 +join +(select rt2.id from +(select t2.id, row_number() over (partition by id order by od desc) as row_no from t2) rt2 +where rt2.row_no=1) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t2 +#### A masked pattern was here #### +POSTHOOK: query: select vt1.id from +(select rt1.id from +(select t1.id, row_number() over (partition by id order by od desc) as row_no from t1) rt1 +where rt1.row_no=1) vt1 +join +(select rt2.id from +(select t2.id, row_number() over (partition by id order by od desc) as row_no from t2) rt2 +where rt2.row_no=1) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t2 +#### A masked pattern was here #### +PREHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) + Reducer 5 <- Map 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: id is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: id (type: string), od (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Map 4 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: id is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: id (type: string), od (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {KEY.reducesinkkey0} + 1 {KEY.reducesinkkey0} + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) + Reducer 5 <- Map 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: id is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: id (type: string), od (type: string) + outputColumnNames: id, od + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Group By Operator + aggregations: count() + keys: id (type: string), od (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: _col2 (type: bigint) + Map 4 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: id is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: id (type: string), od (type: string) + outputColumnNames: id, od + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Group By Operator + aggregations: count() + keys: id (type: string), od (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: _col2 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {KEY.reducesinkkey0} + 1 {KEY.reducesinkkey0} + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +