diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index b35f075..387f47d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -87,6 +87,7 @@ OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; JoinOperator joinOp = (JoinOperator) nd; + long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { @@ -110,7 +111,7 @@ numBuckets = 1; } LOG.info("Estimated number of buckets " + numBuckets); - int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets); + int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize); if (mapJoinConversionPos < 0) { Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); if (retval == null) { @@ -134,7 +135,7 @@ // check if we can convert to map join no bucket scaling. LOG.info("Convert to non-bucketed map join"); if (numBuckets != 1) { - mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1); + mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize); } if (mapJoinConversionPos < 0) { // we are just converting to a common merge join operator. The shuffle @@ -359,7 +360,7 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont // MapRecordProcessor and ReduceRecordProcessor with respect to the sources. @SuppressWarnings({"rawtypes","unchecked"}) Set set = - OperatorUtils.findOperatorsUpstream((Collection)parentOp.getParentOperators(), + OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(), ReduceSinkOperator.class); if (size < 0) { size = set.size(); @@ -505,44 +506,42 @@ private boolean checkColEquality(List> grandParentColNames, } public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, - int buckets) throws SemanticException { - /* - * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is - * an outer join down the join tree that requires filterTag. We disable this conversion to map - * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a - * new operation to be able to support this. This seems like a corner case enough to special - * case this for now. - */ - if (joinOp.getConf().getConds().length > 1) { - boolean hasOuter = false; - for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) { - switch (joinCondDesc.getType()) { - case JoinDesc.INNER_JOIN: - case JoinDesc.LEFT_SEMI_JOIN: - case JoinDesc.UNIQUE_JOIN: - hasOuter = false; - break; - - case JoinDesc.FULL_OUTER_JOIN: - case JoinDesc.LEFT_OUTER_JOIN: - case JoinDesc.RIGHT_OUTER_JOIN: - hasOuter = true; - break; - - default: - throw new SemanticException("Unknown join type " + joinCondDesc.getType()); + int buckets, boolean skipJoinTypeChecks, long maxSize) throws SemanticException { + if (!skipJoinTypeChecks) { + /* + * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is + * an outer join down the join tree that requires filterTag. We disable this conversion to map + * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a + * new operation to be able to support this. This seems like a corner case enough to special + * case this for now. + */ + if (joinOp.getConf().getConds().length > 1) { + boolean hasOuter = false; + for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) { + switch (joinCondDesc.getType()) { + case JoinDesc.INNER_JOIN: + case JoinDesc.LEFT_SEMI_JOIN: + case JoinDesc.UNIQUE_JOIN: + hasOuter = false; + break; + + case JoinDesc.FULL_OUTER_JOIN: + case JoinDesc.LEFT_OUTER_JOIN: + case JoinDesc.RIGHT_OUTER_JOIN: + hasOuter = true; + break; + + default: + throw new SemanticException("Unknown join type " + joinCondDesc.getType()); + } + } + if (hasOuter) { + return -1; } - } - if (hasOuter) { - return -1; } } Set bigTableCandidateSet = MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds()); - - long maxSize = context.conf.getLongVar( - HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - int bigTablePosition = -1; // big input cumulative row count long bigInputCumulativeCardinality = -1L; @@ -576,7 +575,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // on size and there's another one that's bigger. return -1; } - + if (inputSize/buckets > maxSize) { if (!bigTableCandidateSet.contains(pos)) { // can't use the current table as the big table, but it's too @@ -826,7 +825,9 @@ private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, Optim // Since we don't have big table index yet, must start with estimate of numReducers int numReducers = estimateNumBuckets(joinOp, false); LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers"); - int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers); + int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, + context.conf.getLongVar( + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD)); if (bigTablePos >= 0) { // Now that we have the big table index, get real numReducers value based on big table RS ReduceSinkOperator bigTableParentRS = @@ -869,9 +870,14 @@ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContex } } + int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false), + true, Long.MAX_VALUE); + if (pos < 0) { + LOG.info("Could not get a valid join position. Defaulting to position 0"); + pos = 0; + } // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - int pos = 0; // it doesn't matter which position we use in this case. LOG.info("Fallback to common merge join operator"); convertJoinSMBJoin(joinOp, context, pos, 0, false); } diff --git ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out index 3d1f22f..0e04e64 100644 --- ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out +++ ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out @@ -2673,7 +2673,7 @@ STAGE PLANS: 0 p_partkey (type: int) 1 _col0 (type: int) outputColumnNames: _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20 - Position of Big Table: 0 + Position of Big Table: 1 Statistics: Num rows: 28 Data size: 17646 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col12 (type: int), _col13 (type: string), _col14 (type: string), _col15 (type: string), _col16 (type: string), _col17 (type: int), _col18 (type: string), _col19 (type: double), _col20 (type: string)