diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 024849e..4c1156f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -34,9 +34,11 @@ import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -47,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; @@ -634,8 +637,8 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo joinOp.getConf().getMapAliases(), bigTablePosition, true, removeReduceSink); mapJoinOp.getConf().setHybridHashJoin(HiveConf.getBoolVar(context.conf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)); - List joinExprs = mapJoinOp.getConf().getKeys().values().iterator().next(); - if (joinExprs.size() == 0) { // In case of cross join, we disable hybrid grace hash join + + if (!qualifyHybridHashJoin(mapJoinOp.getConf(), context)) { mapJoinOp.getConf().setHybridHashJoin(false); } @@ -809,4 +812,53 @@ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContex LOG.info("Fallback to common merge join operator"); convertJoinSMBJoin(joinOp, context, pos, 0, false); } + + /** + * Check if the Map Join operator qualifies as Hybrid Grace Hash Join + * @param joinDesc the Map Join operator descriptor + * @param context Tez context + * @return true if it qualifies, false if not + */ + private boolean qualifyHybridHashJoin(MapJoinDesc joinDesc, OptimizeTezProcContext context) { + // Case 1: For cross join, it doesn't qualify + List joinExprs = joinDesc.getKeys().values().iterator().next(); + if (joinExprs.size() == 0) { + return false; + } + + // Case 2: When the estimated memory usage for the small table(s) is quite small (smaller than + // half noconditionaltask size), it doesn't qualify + if (joinDesc.getMemoryNeeded() < + context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD) * 0.5) { + return false; + } + + // Case 3: When Number of Distinct Values is smaller than minimum number of partitions to create + // in Hybrid case, it doesn't qualify + if (joinDesc.getStatistics() != null) { + Map> keyLists = joinDesc.getKeys(); + for (byte tag = 0; tag < joinDesc.getTagLength(); tag++) { + if (tag == joinDesc.getPosBigTable()) { + continue; + } + List keys = keyLists.get(tag); // join exprs between a small tbl and big tbl + for (ExprNodeDesc joinExpr : keys) { + List columns = joinExpr.getCols(); // a join expr may reference multiple columns + // For a join expression with multiple columns, use the one with biggest NDV as lower bound + long biggestNDV = 0; + for (String column : columns) { + long ndv = joinDesc.getStatistics().getColumnStatisticsFromColName(column) + .getCountDistint(); + biggestNDV = biggestNDV < ndv ? ndv : biggestNDV; + } + if (biggestNDV != 0 && biggestNDV < + context.conf.getLongVar(HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS)) { + return false; + } + } + } + } + + return true; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index da1d9eb..676a81b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1706,8 +1706,7 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi // With the fast hash table implementation, we currently do not support // Hybrid Grace Hash Join. - if (HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)) { + if (desc.isHybridHashJoin()) { specialize = false; } }