diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 024849e..cb6aa4c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -632,11 +633,10 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), bigTablePosition, true, removeReduceSink); - mapJoinOp.getConf().setHybridHashJoin(HiveConf.getBoolVar(context.conf, - HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)); - List joinExprs = mapJoinOp.getConf().getKeys().values().iterator().next(); - if (joinExprs.size() == 0) { // In case of cross join, we disable hybrid grace hash join - mapJoinOp.getConf().setHybridHashJoin(false); + + if (HiveConf.getBoolVar(context.conf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN) && + qualifyHybridHashJoin(mapJoinOp, context)) { + mapJoinOp.getConf().setHybridHashJoin(true); } Operator parentBigTableOp = @@ -809,4 +809,87 @@ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContex LOG.info("Fallback to common merge join operator"); convertJoinSMBJoin(joinOp, context, pos, 0, false); } + + /** + * Check if the Map Join operator qualifies as Hybrid Grace Hash Join + * @param mapJoinOp the Map Join operator + * @param context Tez context + * @return true if it qualifies, false if not + */ + private boolean qualifyHybridHashJoin(MapJoinOperator mapJoinOp, OptimizeTezProcContext context) { + MapJoinDesc joinDesc = mapJoinOp.getConf(); + + // Case 1: For cross join, it doesn't qualify + List joinExprs = joinDesc.getKeys().values().iterator().next(); + if (joinExprs.size() == 0) { + LOG.info("Map Join doesn't qualify Hybrid Grace Hash Join, since it's a cross join."); + return false; + } + + // Case 2: When the estimated memory usage for the small table(s) is quite small (smaller than + // half noconditionaltask size), it doesn't qualify + long smallTableParentsDataSizes = 0; + long bigTableParentDataSize = 0; + String bigTableParent = ""; + for (Operator parentOp : mapJoinOp.getParentOperators()) { + long dataSize = parentOp.getStatistics().getDataSize(); + if (dataSize > bigTableParentDataSize) { + bigTableParentDataSize = dataSize; + bigTableParent = parentOp.toString(); + } + smallTableParentsDataSizes += dataSize; + } + smallTableParentsDataSizes -= bigTableParentDataSize; + + if (smallTableParentsDataSizes < + context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD) * 0.5) { + LOG.info("Map Join doesn't qualify Hybrid Grace Hash Join, since the small table is too small."); + return false; + } + + // Case 3: When Number of Distinct Values is smaller than minimum number of partitions to create + // in Hybrid case, it doesn't qualify + if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS) && + joinDesc.getStatistics() != null && joinDesc.getStatistics().getColumnStats() != null) { + + for (Operator parentOp : mapJoinOp.getParentOperators()) { + if (parentOp.toString().equals(bigTableParent)) { + continue; + } + + } + + + Map> keyLists = joinDesc.getKeys(); + for (byte tag = 0; tag < joinDesc.getTagLength(); tag++) { + if (tag == joinDesc.getPosBigTable()) { + continue; + } + List keys = keyLists.get(tag); // join exprs between a small tbl and big tbl + for (ExprNodeDesc joinExpr : keys) { + List columns = joinExpr.getCols(); // a join expr may reference multiple columns + // For a join expression with multiple columns, use the one with biggest NDV as lower bound + long biggestNDV = 0; + for (String column : columns) { + ColStatistics colStats = joinDesc.getStatistics().getColumnStatisticsFromColName(column); + if (colStats == null) { + return true; // we don't have enough info to rule out Hybrid join + } else { + long ndv = joinDesc.getStatistics().getColumnStatisticsFromColName(column) + .getCountDistint(); + biggestNDV = biggestNDV < ndv ? ndv : biggestNDV; + } + } + if (biggestNDV != 0 && biggestNDV < + context.conf.getLongVar(HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS)) { + LOG.info("Map Join doesn't qualify Hybrid Grace Hash Join, since NDV is less than" + + "default number of partitions."); + return false; + } + } + } + } + + return true; + } } diff --git ql/src/test/queries/clientpositive/hybridgrace_hashjoin_3.q ql/src/test/queries/clientpositive/hybridgrace_hashjoin_3.q new file mode 100644 index 0000000..84f6e34 --- /dev/null +++ ql/src/test/queries/clientpositive/hybridgrace_hashjoin_3.q @@ -0,0 +1,76 @@ +set hive.explain.user=false; +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask.size=2000000; +set hive.mapjoin.hybridgrace.hashtable=true; +set hive.stats.fetch.column.stats=true; + +-- This test is to verify under certain circumstances, Hybrid Grace Hash Join will be disabled, +-- even if the flag is set to true. + +-- Case 1. Hybrid join is disabled because small table data size(s) is too small + +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.stats.fetch.column.stats=true; + +-- Case 2. Hybrid join is not disabled due to lack of column stats for join key + +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +-- Case 3. Hybrid join is disabled since column stats indicates NDV is smaller than # or partitions + +CREATE TABLE t1 (key int); +LOAD DATA LOCAL INPATH '../../data/files/T1.txt' OVERWRITE INTO TABLE t1; + +CREATE TABLE t3 (key int); +LOAD DATA LOCAL INPATH '../../data/files/T3.txt' OVERWRITE INTO TABLE t3; + +analyze table t1 compute statistics; +analyze table t1 compute statistics for columns; +analyze table t3 compute statistics; +analyze table t3 compute statistics for columns; + +set hive.auto.convert.join.noconditionaltask.size=20; + +explain +select count(*) +from t1 join t3 +on t1.key = t3.key; + +select count(*) +from t1 join t3 +on t1.key = t3.key; + +drop table t1; +drop table t3;