diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 61d376a..91dcc03 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -305,7 +305,8 @@ minitez.query.files=bucket_map_join_tez1.q,\ tez_smb_main.q,\ tez_smb_1.q,\ vectorized_dynamic_partition_pruning.q,\ - tez_multi_union.q + tez_multi_union.q,\ + tez_join.q encrypted.query.files=encryption_join_unencrypted_tbl.q,\ encryption_insert_partition_static.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 4a1bd15..a987f80 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -111,7 +111,7 @@ } if (parentOp instanceof ReduceSinkOperator) { - ReduceSinkOperator rs = (ReduceSinkOperator)parentOp; + ReduceSinkOperator rs = (ReduceSinkOperator) parentOp; estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? rs.getConf().getNumReducers() : estimatedBuckets; } @@ -133,10 +133,10 @@ if (retval == null) { return retval; } else { - // only case is full outer join with SMB enabled which is not possible. Convert to regular - // join. - convertJoinSMBJoin(joinOp, context, 0, 0, false, false); - return null; + // only case is full outer join with SMB enabled which is not possible. Convert to regular + // join. + convertJoinSMBJoin(joinOp, context, 0, 0, false, false); + return null; } } @@ -160,8 +160,10 @@ } MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); - // map join operator by default has no bucket cols - mapJoinOp.setOpTraits(new OpTraits(null, -1, null)); + // map join operator by default has no bucket cols and num of reduce sinks + // reduced by 1 + mapJoinOp + .setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks())); mapJoinOp.setStatistics(joinOp.getStatistics()); // propagate this change till the next RS for (Operator childOp : mapJoinOp.getChildOperators()) { @@ -176,7 +178,8 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. - if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) { + if ((context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) + || (joinOp.getOpTraits().getNumReduceSinks() >= 2)) { convertJoinSMBJoin(joinOp, context, 0, 0, false, false); return null; } @@ -221,7 +224,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat convertJoinSMBJoin(joinOp, context, pos, 0, false, false); } return null; -} + } // replaces the join operator with a new CommonJoinOperator, removes the // parent reduce sinks @@ -240,9 +243,9 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont new MapJoinDesc( MapJoinProcessor.getKeys(joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp).getSecond(), - null, joinDesc.getExprs(), null, null, - joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), - joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); + null, joinDesc.getExprs(), null, null, + joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), + joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); mapJoinDesc.setNullSafes(joinDesc.getNullSafes()); mapJoinDesc.setFilterMap(joinDesc.getFilterMap()); mapJoinDesc.resetOrder(); @@ -251,9 +254,13 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont CommonMergeJoinOperator mergeJoinOp = (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, isSubQuery, mapJoinConversionPos, mapJoinDesc), joinOp.getSchema()); - OpTraits opTraits = - new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits() - .getSortCols()); + int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks(); + if (adjustParentsChildren) { + // converted to SMB meaning we eliminated one of the RS + numReduceSinks -= 1; + } + OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp + .getOpTraits().getSortCols(), numReduceSinks); mergeJoinOp.setOpTraits(opTraits); mergeJoinOp.setStatistics(joinOp.getStatistics()); @@ -289,8 +296,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont if (adjustParentsChildren) { mergeJoinOp.getConf().setGenJoinKeys(true); - List> newParentOpList = - new ArrayList>(); + List> newParentOpList = new ArrayList>(); for (Operator parentOp : mergeJoinOp.getParentOperators()) { for (Operator grandParentOp : parentOp.getParentOperators()) { grandParentOp.getChildOperators().remove(parentOp); @@ -328,7 +334,8 @@ private void setAllChildrenTraitsToNull(Operator current if (currentOp instanceof ReduceSinkOperator) { return; } - currentOp.setOpTraits(new OpTraits(null, -1, null)); + currentOp.setOpTraits(new OpTraits(null, -1, null, + currentOp.getOpTraits().getNumReduceSinks())); for (Operator childOp : currentOp.getChildOperators()) { if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) { break; @@ -351,7 +358,7 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon // we can set the traits for this join operator OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), - tezBucketJoinProcCtx.getNumBuckets(), null); + tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); @@ -377,8 +384,7 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont ReduceSinkOperator bigTableRS = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); - int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits() - .getNumBuckets(); + int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits().getNumBuckets(); // the sort and bucket cols have to match on both sides for this // transformation of the join operation @@ -425,13 +431,12 @@ private void setNumberOfBucketsOnChildren(Operator curre } /* - * If the parent reduce sink of the big table side has the same emit key cols - * as its parent, we can create a bucket map join eliminating the reduce sink. + * If the parent reduce sink of the big table side has the same emit key cols as its parent, we + * can create a bucket map join eliminating the reduce sink. */ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, int bigTablePosition, - TezBucketJoinProcCtx tezBucketJoinProcCtx) - throws SemanticException { + TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { // bail on mux-operator because mux operator masks the emit keys of the // constituent reduce sinks if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) { @@ -453,8 +458,8 @@ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, } /* - * this is the case when the big table is a sub-query and is probably - * already bucketed by the join column in say a group by operation + * this is the case when the big table is a sub-query and is probably already bucketed by the + * join column in say a group by operation */ boolean isSubQuery = false; if (numBuckets < 0) { @@ -487,7 +492,8 @@ private boolean checkColEquality(List> grandParentColNames, // all columns need to be at least a subset of the parentOfParent's bucket cols ExprNodeDesc exprNodeDesc = colExprMap.get(colName); if (exprNodeDesc instanceof ExprNodeColumnDesc) { - if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) { + if (((ExprNodeColumnDesc) exprNodeDesc).getColumn() + .equals(listBucketCols.get(colCount))) { colCount++; } else { break; @@ -557,14 +563,13 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c Statistics currInputStat = parentOp.getStatistics(); if (currInputStat == null) { - LOG.warn("Couldn't get statistics from: "+parentOp); + LOG.warn("Couldn't get statistics from: " + parentOp); return -1; } long inputSize = currInputStat.getDataSize(); - if ((bigInputStat == null) || - ((bigInputStat != null) && - (inputSize > bigInputStat.getDataSize()))) { + if ((bigInputStat == null) + || ((bigInputStat != null) && (inputSize > bigInputStat.getDataSize()))) { if (bigTableFound) { // cannot convert to map join; we've already chosen a big table @@ -634,11 +639,11 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo } } - //can safely convert the join to a map join. + // can safely convert the join to a map join. MapJoinOperator mapJoinOp = MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp, - joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), - joinOp.getConf().getMapAliases(), bigTablePosition, true); + joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), + joinOp.getConf().getMapAliases(), bigTablePosition, true); Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); @@ -662,7 +667,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo parentBigTableOp.getParentOperators().get(0)); } parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp); - for (Operator op : mapJoinOp.getParentOperators()) { + for (Operatorop : mapJoinOp.getParentOperators()) { if (!(op.getChildOperators().contains(mapJoinOp))) { op.getChildOperators().add(mapJoinOp); } @@ -676,7 +681,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo private boolean hasDynamicPartitionBroadcast(Operator parent) { boolean hasDynamicPartitionPruning = false; - for (Operator op: parent.getChildOperators()) { + for (Operator op : parent.getChildOperators()) { while (op != null) { if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { // found dynamic partition pruning operator diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java index 4e49260..cb673d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java @@ -82,7 +82,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } /* - * Reduce sink operator is the de-facto operator + * Reduce sink operator is the de-facto operator * for determining keyCols (emit keys of a map phase) */ public static class ReduceSinkRule implements NodeProcessor { @@ -106,24 +106,25 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, List> listBucketCols = new ArrayList>(); listBucketCols.add(bucketCols); int numBuckets = -1; + int numReduceSinks = 1; OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits(); if (parentOpTraits != null) { numBuckets = parentOpTraits.getNumBuckets(); + numReduceSinks += parentOpTraits.getNumReduceSinks(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols, numReduceSinks); rs.setOpTraits(opTraits); return null; } } /* - * Table scan has the table object and pruned partitions that has information such as - * bucketing, sorting, etc. that is used later for optimization. + * Table scan has the table object and pruned partitions that has information + * such as bucketing, sorting, etc. that is used later for optimization. */ public static class TableScanRule implements NodeProcessor { - public boolean checkBucketedTable(Table tbl, - ParseContext pGraphContext, + public boolean checkBucketedTable(Table tbl, ParseContext pGraphContext, PrunedPartitionList prunedParts) throws SemanticException { if (tbl.isPartitioned()) { @@ -131,9 +132,11 @@ public boolean checkBucketedTable(Table tbl, // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number) if (!partitions.isEmpty()) { for (Partition p : partitions) { - List fileNames = - AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), pGraphContext); - // The number of files for the table should be same as number of buckets. + List fileNames = + AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), + pGraphContext); + // The number of files for the table should be same as number of + // buckets. int bucketCount = p.getBucketCount(); if (fileNames.size() != 0 && fileNames.size() != bucketCount) { @@ -143,8 +146,9 @@ public boolean checkBucketedTable(Table tbl, } } else { - List fileNames = - AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(), pGraphContext); + List fileNames = + AbstractBucketJoinProc.getBucketFilePathsOfPartition(tbl.getDataLocation(), + pGraphContext); Integer num = new Integer(tbl.getNumBuckets()); // The number of files for the table should be same as number of buckets. @@ -183,7 +187,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } sortedColsList.add(sortCols); } - OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList); + // num reduce sinks hardcoded to 0 because TS has no parents + OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList, 0); ts.setOpTraits(opTraits); return null; } @@ -208,8 +213,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } List> listBucketCols = new ArrayList>(); + int numReduceSinks = 0; + OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits(); + if (parentOpTraits != null) { + numReduceSinks = parentOpTraits.getNumReduceSinks(); + } listBucketCols.add(gbyKeys); - OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols); + OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, numReduceSinks); gbyOp.setOpTraits(opTraits); return null; } @@ -217,8 +227,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, public static class SelectRule implements NodeProcessor { - public List> getConvertedColNames(List> parentColNames, - SelectOperator selOp) { + public List> getConvertedColNames( + List> parentColNames, SelectOperator selOp) { List> listBucketCols = new ArrayList>(); if (selOp.getColumnExprMap() != null) { if (parentColNames != null) { @@ -244,8 +254,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - SelectOperator selOp = (SelectOperator)nd; - List> parentBucketColNames = + SelectOperator selOp = (SelectOperator) nd; + List> parentBucketColNames = selOp.getParentOperators().get(0).getOpTraits().getBucketColNames(); List> listBucketCols = null; @@ -254,18 +264,21 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (parentBucketColNames != null) { listBucketCols = getConvertedColNames(parentBucketColNames, selOp); } - List> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits() - .getSortCols(); + List> parentSortColNames = + selOp.getParentOperators().get(0).getOpTraits().getSortCols(); if (parentSortColNames != null) { listSortCols = getConvertedColNames(parentSortColNames, selOp); } } int numBuckets = -1; - if (selOp.getParentOperators().get(0).getOpTraits() != null) { - numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets(); + int numReduceSinks = 0; + OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits(); + if (parentOpTraits != null) { + numBuckets = parentOpTraits.getNumBuckets(); + numReduceSinks = parentOpTraits.getNumReduceSinks(); } - OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols); + OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, numReduceSinks); selOp.setOpTraits(opTraits); return null; } @@ -276,26 +289,31 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - JoinOperator joinOp = (JoinOperator)nd; + JoinOperator joinOp = (JoinOperator) nd; List> bucketColsList = new ArrayList>(); List> sortColsList = new ArrayList>(); byte pos = 0; + int numReduceSinks = 0; // will be set to the larger of the parents for (Operator parentOp : joinOp.getParentOperators()) { if (!(parentOp instanceof ReduceSinkOperator)) { // can be mux operator break; } - ReduceSinkOperator rsOp = (ReduceSinkOperator)parentOp; + ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; if (rsOp.getOpTraits() == null) { ReduceSinkRule rsRule = new ReduceSinkRule(); rsRule.process(rsOp, stack, procCtx, nodeOutputs); } - bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos)); - sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos)); + OpTraits parentOpTraits = rsOp.getOpTraits(); + bucketColsList.add(getOutputColNames(joinOp, parentOpTraits.getBucketColNames(), pos)); + sortColsList.add(getOutputColNames(joinOp, parentOpTraits.getSortCols(), pos)); + if (parentOpTraits.getNumReduceSinks() > numReduceSinks) { + numReduceSinks = parentOpTraits.getNumReduceSinks(); + } pos++; } - joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList)); + joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks)); return null; } @@ -311,7 +329,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, for (String colName : colNames) { for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) { if (exprNode instanceof ExprNodeColumnDesc) { - if(((ExprNodeColumnDesc)(exprNode)).getColumn().equals(colName)) { + if (((ExprNodeColumnDesc) (exprNode)).getColumn().equals(colName)) { for (Entry entry : joinOp.getColumnExprMap().entrySet()) { if (entry.getValue().isSame(exprNode)) { bucketColNames.add(entry.getKey()); @@ -338,20 +356,27 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } /* - * When we have operators that have multiple parents, it is not - * clear which parent's traits we need to propagate forward. + * When we have operators that have multiple parents, it is not clear which + * parent's traits we need to propagate forward. */ public static class MultiParentRule implements NodeProcessor { @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - OpTraits opTraits = new OpTraits(null, -1, null); @SuppressWarnings("unchecked") - Operator operator = (Operator)nd; + Operator operator = (Operator) nd; + + int numReduceSinks = 0; + for (Operator parentOp : operator.getParentOperators()) { + if (parentOp.getOpTraits().getNumReduceSinks() > numReduceSinks) { + numReduceSinks = parentOp.getOpTraits().getNumReduceSinks(); + } + } + OpTraits opTraits = new OpTraits(null, -1, null, numReduceSinks); operator.setOpTraits(opTraits); return null; - } + } } public static NodeProcessor getTableScanRule() { @@ -361,7 +386,7 @@ public static NodeProcessor getTableScanRule() { public static NodeProcessor getReduceSinkRule() { return new ReduceSinkRule(); } - + public static NodeProcessor getSelectRule() { return new SelectRule(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 39d1f18..a455175 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -103,7 +103,7 @@ } // we can set the traits for this join operator - OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null); + OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, joinOp.getOpTraits().getNumReduceSinks()); mapJoinOp.setOpTraits(opTraits); mapJoinOp.setStatistics(joinOp.getStatistics()); setNumberOfBucketsOnChildren(mapJoinOp); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java index c2b3664..a687a3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java @@ -25,11 +25,14 @@ List> bucketColNames; List> sortColNames; int numBuckets; + int numReduceSinks; - public OpTraits(List> bucketColNames, int numBuckets, List> sortColNames) { + public OpTraits(List> bucketColNames, int numBuckets, + List> sortColNames, int numReduceSinks) { this.bucketColNames = bucketColNames; this.numBuckets = numBuckets; this.sortColNames = sortColNames; + this.numReduceSinks = numReduceSinks; } public List> getBucketColNames() { @@ -55,4 +58,12 @@ public void setSortColNames(List> sortColNames) { public List> getSortCols() { return sortColNames; } + + public void setNumReduceSinks(int numReduceSinks) { + this.numReduceSinks = numReduceSinks; + } + + public int getNumReduceSinks() { + return this.numReduceSinks; + } } diff --git ql/src/test/queries/clientpositive/tez_join.q ql/src/test/queries/clientpositive/tez_join.q new file mode 100644 index 0000000..d35ec83 --- /dev/null +++ ql/src/test/queries/clientpositive/tez_join.q @@ -0,0 +1,43 @@ +set hive.auto.convert.sortmerge.join = true; + +create table t1( +id string, +od string); + +create table t2( +id string, +od string); + +explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id; + +select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id; + +explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id; + +select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id; diff --git ql/src/test/results/clientpositive/tez/tez_join.q.out ql/src/test/results/clientpositive/tez/tez_join.q.out new file mode 100644 index 0000000..a051dc7 --- /dev/null +++ ql/src/test/results/clientpositive/tez/tez_join.q.out @@ -0,0 +1,320 @@ +PREHOOK: query: create table t1( +id string, +od string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +POSTHOOK: query: create table t1( +id string, +od string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: create table t2( +id string, +od string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t2 +POSTHOOK: query: create table t2( +id string, +od string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t2 +PREHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) + Reducer 5 <- Map 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: id is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: id (type: string), od (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Map 4 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: id is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: id (type: string), od (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t2 +#### A masked pattern was here #### +POSTHOOK: query: select vt1.id from +(select rt1.id from +(select t1.id, t1.od from t1 order by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od from t2 order by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t2 +#### A masked pattern was here #### +PREHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE) + Reducer 5 <- Map 4 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: id is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Group By Operator + aggregations: count() + keys: id (type: string), od (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: _col2 (type: bigint) + Map 4 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: id is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Group By Operator + aggregations: count() + keys: id (type: string), od (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: _col2 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (_col0 = _col1) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +PREHOOK: Input: default@t2 +#### A masked pattern was here #### +POSTHOOK: query: select vt1.id from +(select rt1.id from +(select t1.id, t1.od, count(*) from t1 group by t1.id, t1.od) rt1) vt1 +join +(select rt2.id from +(select t2.id, t2.od, count(*) from t2 group by t2.id, t2.od) rt2) vt2 +where vt1.id=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +POSTHOOK: Input: default@t2 +#### A masked pattern was here ####