diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7cee344..9d93c28 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1426,9 +1426,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVECONVERTJOINMAXENTRIESHASHTABLE("hive.auto.convert.join.hashtable.max.entries", 40000000L, "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" + - "However, if it is on, and the predicated number of entries in hashtable for a given join \n" + + "However, if it is on, and the predicted number of entries in hashtable for a given join \n" + "input is larger than this number, the join will not be converted to a mapjoin. \n" + "The value \"-1\" means no limit."), + HIVECONVERTJOINMAXSHUFFLESIZE("hive.auto.convert.join.shuffle.max.size", 10000000L, + "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" + + "However, if it is on, and the predicted size of the larger input for a given join is greater \n" + + "than this number, the join will not be converted to a dynamically partitioned hash join. \n" + + "The value \"-1\" means no limit."), HIVEHASHTABLEKEYCOUNTADJUSTMENT("hive.hashtable.key.count.adjustment", 1.0f, "Adjustment to mapjoin hashtable size derived from table and column statistics; the estimate" + " of the number of keys is divided by this value. If the value is 0, statistics are not used" + diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 21d0053..9968959 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -579,12 +579,13 @@ private boolean checkColEquality(List> grandParentColNames, * for Dynamic Hash Join conversion consideration * @param skipJoinTypeChecks whether to skip join type checking * @param maxSize size threshold for Map Join conversion - * @param checkHashTableEntries whether to check threshold for distinct keys in hash table for Map Join + * @param checkDynamicPartitionedHashJoin whether to check threshold to convert to dynamically + * partitioned hash join * @return returns big table position or -1 if it cannot be determined * @throws SemanticException */ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, - int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkHashTableEntries) + int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkDynamicPartitionedHashJoin) throws SemanticException { if (!skipJoinTypeChecks) { /* @@ -693,18 +694,26 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // We are replacing the current big table with a new one, thus // we need to count the current one as a map table then. totalSize += bigInputStat.getDataSize(); - // Check if number of distinct keys is larger than given max - // number of entries for HashMap. If it is, we do not convert. - if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) { + // Check if we meet the two conditions to convert to dynamically partitioned + // hash join: + // - number of distinct keys is greater than given max number of entries + // for HashMap, and + // - size of data to shuffle (larger table) is less than given max size. + if (checkDynamicPartitionedHashJoin && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context) + && checkShuffleSizeForLargeTable(joinOp, bigTablePosition, context)) { return -1; } } else if (!selectedBigTable) { // This is not the first table and we are not using it as big table, // in fact, we're adding this table as a map table totalSize += inputSize; - // Check if number of distinct keys is larger than given max - // number of entries for HashMap. If it is, we do not convert. - if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) { + // Check if we meet the two conditions to convert to dynamic partitioned + // hash join: + // - number of distinct keys is greater than given max number of entries + // for HashMap, and + // - size of data to shuffle (larger table) is less than given max size. + if (checkDynamicPartitionedHashJoin && !checkNumberOfEntriesForHashTable(joinOp, pos, context) + && checkShuffleSizeForLargeTable(joinOp, bigTablePosition, context)) { return -1; } } @@ -1094,6 +1103,24 @@ private boolean checkNumberOfEntriesForHashTable(JoinOperator joinOp, int positi return true; } + /* Returns true if it passes the test, false otherwise. */ + private boolean checkShuffleSizeForLargeTable(JoinOperator joinOp, int position, + OptimizeTezProcContext context) { + long max = HiveConf.getLongVar(context.parseContext.getConf(), + HiveConf.ConfVars.HIVECONVERTJOINMAXSHUFFLESIZE); + if (max < 1) { + // Max is disabled, we can safely return true + return true; + } + // Evaluate + ReduceSinkOperator rsOp = (ReduceSinkOperator) joinOp.getParentOperators().get(position); + Statistics inputStats = rsOp.getStatistics(); + long inputSize = inputStats.getDataSize(); + LOG.debug("Estimated size for input {}: {}; Max size for MapJoin conversion: {}", + position, inputSize, max); + return inputSize <= max; + } + private static long estimateNDV(long numRows, List columnStats) { // If there is a single column, return the number of distinct values if (columnStats.size() == 1) { diff --git ql/src/test/queries/clientpositive/join_max_hashtable.q ql/src/test/queries/clientpositive/join_max_hashtable.q index 9c30a0d..8d0ccb7 100644 --- ql/src/test/queries/clientpositive/join_max_hashtable.q +++ ql/src/test/queries/clientpositive/join_max_hashtable.q @@ -1,6 +1,7 @@ set hive.auto.convert.join=true; set hive.optimize.dynamic.partition.hashjoin=true; set hive.auto.convert.join.hashtable.max.entries=500; +set hive.auto.convert.join.shuffle.max.size=100000; -- CONVERT EXPLAIN @@ -35,3 +36,15 @@ FROM src x JOIN src y ON (x.key = y.key); EXPLAIN SELECT x.key, x.value FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value); + +set hive.auto.convert.join.shuffle.max.size=80000; + +-- CONVERT +EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key); + +-- CONVERT +EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value); diff --git ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out index 6520fd3..83a7831 100644 --- ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out +++ ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out @@ -498,3 +498,157 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: x + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 + input vertices: + 1 Map 2 + Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + LLAP IO: no inputs + Map 2 + Map Operator Tree: + TableScan + alias: y + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: x + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string), _col1 (type: string) + 1 _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + input vertices: + 1 Map 2 + Statistics: Num rows: 2 Data size: 356 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 356 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + LLAP IO: no inputs + Map 2 + Map Operator Tree: + TableScan + alias: y + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +