diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 9fcb9d9..4b62406 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -349,6 +349,8 @@ " A subpartition value is specified without specififying the parent partition's value"), TABLES_INCOMPATIBLE_SCHEMAS(10235, "Tables have incompatible schemas and their partitions " + " cannot be exchanged."), + INVALID_TABLES_MAPJOIN_HINT(10236, "Table(s) specified in map-join hint is(are) " + + "not valid for this map-join"), SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index cc9de54..c1d4ddd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; @@ -470,8 +472,16 @@ protected boolean canConvertJoinToBucketMapJoin( BigTableSelectorForAutoSMJ bigTableMatcher = (BigTableSelectorForAutoSMJ) ReflectionUtils.newInstance(bigTableMatcherClass, null); + JoinDesc joinDesc = joinOp.getConf(); + JoinCondDesc[] joinCondns = joinDesc.getConds(); + ArrayList joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns); + if (joinCandidates == null) { + // This is a full outer join. This can never be a map-join + // of any type. So return false. + return false; + } int bigTablePosition = - bigTableMatcher.getBigTablePosition(pGraphContext, joinOp); + bigTableMatcher.getBigTablePosition(pGraphContext, joinOp, joinCandidates); if (bigTablePosition < 0) { // contains aliases from sub-query return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java index 5320143..d55e90d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java @@ -45,7 +45,8 @@ private static final Log LOG = LogFactory .getLog(AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.class.getName()); - public int getBigTablePosition(ParseContext parseCtx, JoinOperator joinOp) + public int getBigTablePosition(ParseContext parseCtx, JoinOperator joinOp, + ArrayList bigTableCandidates) throws SemanticException { int bigTablePos = 0; long maxSize = -1; @@ -57,6 +58,11 @@ public int getBigTablePosition(ParseContext parseCtx, JoinOperator joinOp) getListTopOps(joinOp, topOps); int currentPos = 0; for (TableScanOperator topOp : topOps) { + if (!bigTableCandidates.contains(currentPos)) { + currentPos++; + continue; + } + if (topOp == null) { return -1; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java index db5ff0f..96e347d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.ArrayList; + import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -28,6 +30,7 @@ * decide the big table based on size or position of the tables. */ public interface BigTableSelectorForAutoSMJ { - public int getBigTablePosition(ParseContext parseContext, JoinOperator joinOp) + public int getBigTablePosition(ParseContext parseContext, JoinOperator joinOp, + ArrayList joinCandidates) throws SemanticException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java index db3c9e7..9c6793c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.ArrayList; + import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.parse.ParseContext; @@ -26,7 +28,8 @@ * sort merge join. The leftmost table is chosen as the join table. */ public class LeftmostBigTableSelectorForAutoSMJ implements BigTableSelectorForAutoSMJ { - public int getBigTablePosition(ParseContext parseContext, JoinOperator joinOp) { + public int getBigTablePosition(ParseContext parseContext, JoinOperator joinOp, + ArrayList bigTableCandidates) { return 0; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index d83fb66..c373225 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -327,7 +327,9 @@ public static MapJoinOperator convertMapJoin( Byte[] tagOrder = desc.getTagOrder(); if (!noCheckOuterJoin) { - checkMapJoin(mapJoinPos, condns); + if (!checkMapJoin(mapJoinPos, condns)) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } } RowResolver oldOutputRS = opParseCtxMap.get(op).getRowResolver(); @@ -621,12 +623,12 @@ public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator o * @param condns * @return list of big table candidates */ - public static HashSet getBigTableCandidates(JoinCondDesc[] condns) { - HashSet bigTableCandidates = new HashSet(); + public static ArrayList getBigTableCandidates(JoinCondDesc[] condns) { + ArrayList bigTableCandidates = new ArrayList(); boolean seenOuterJoin = false; - Set seenPostitions = new HashSet(); - Set leftPosListOfLastRightOuterJoin = new HashSet(); + ArrayList seenPostitions = new ArrayList(); + ArrayList leftPosListOfLastRightOuterJoin = new ArrayList(); // is the outer join that we saw most recently is a right outer join? boolean lastSeenRightOuterJoin = false; @@ -676,13 +678,17 @@ public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator o return bigTableCandidates; } - public static void checkMapJoin(int mapJoinPos, JoinCondDesc[] condns) throws SemanticException { - HashSet bigTableCandidates = MapJoinProcessor.getBigTableCandidates(condns); + public static boolean checkMapJoin(int mapJoinPos, JoinCondDesc[] condns) throws SemanticException { + ArrayList bigTableCandidates = MapJoinProcessor.getBigTableCandidates(condns); + + if (bigTableCandidates == null) { + return false; + } - if (bigTableCandidates == null || !bigTableCandidates.contains(mapJoinPos)) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + if (!bigTableCandidates.contains(mapJoinPos)) { + throw new SemanticException(ErrorMsg.INVALID_TABLES_MAPJOIN_HINT.getMsg()); } - return; + return true; } private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java index b882f87..70fb204 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java @@ -38,7 +38,8 @@ */ public class TableSizeBasedBigTableSelectorForAutoSMJ extends SizeBasedBigTableSelectorForAutoSMJ implements BigTableSelectorForAutoSMJ { - public int getBigTablePosition(ParseContext parseCtx, JoinOperator joinOp) + public int getBigTablePosition(ParseContext parseCtx, JoinOperator joinOp, + ArrayList bigTableCandidates) throws SemanticException { int bigTablePos = 0; long maxSize = -1; @@ -52,6 +53,10 @@ public int getBigTablePosition(ParseContext parseCtx, JoinOperator joinOp) if (topOp == null) { return -1; } + if (!bigTableCandidates.contains(currentPos)) { + currentPos++; + continue; + } Table table = parseCtx.getTopToTable().get(topOp); long currentSize = 0; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 8139f4a..e717ea0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -23,7 +23,6 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; @@ -445,7 +444,7 @@ private void mergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Configura long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currWork, pathToAliases, aliasToSize); - HashSet bigTableCandidates = MapJoinProcessor.getBigTableCandidates(joinDesc + List bigTableCandidates = MapJoinProcessor.getBigTableCandidates(joinDesc .getConds()); // no table could be the big table; there is no need to convert diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 5592ab1..4f7534a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -285,7 +285,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) SMBJoinDesc originalSMBJoinDesc = originalSMBJoinOp.getConf(); Byte[] order = originalSMBJoinDesc.getTagOrder(); int numAliases = order.length; - Set bigTableCandidates = + List bigTableCandidates = MapJoinProcessor.getBigTableCandidates(originalSMBJoinDesc.getConds()); // no table could be the big table; there is no need to convert diff --git ql/src/test/queries/clientpositive/auto_sortmerge_join_15.q ql/src/test/queries/clientpositive/auto_sortmerge_join_15.q new file mode 100644 index 0000000..c7bcae6 --- /dev/null +++ ql/src/test/queries/clientpositive/auto_sortmerge_join_15.q @@ -0,0 +1,23 @@ +set hive.enforce.bucketing = true; +set hive.enforce.sorting = true; +set hive.exec.reducers.max = 1; + +CREATE TABLE tbl1(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS; +CREATE TABLE tbl2(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS; + +insert overwrite table tbl1 select * from src where key < 20; +insert overwrite table tbl2 select * from src where key < 10; + +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; +set hive.auto.convert.sortmerge.join.to.mapjoin=true; +set hive.auto.convert.sortmerge.join=true; +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.auto.convert.join=true; + +explain +select count(*) FROM tbl1 a LEFT OUTER JOIN tbl2 b ON a.key = b.key; + +explain +select count(*) FROM tbl1 a RIGHT OUTER JOIN tbl2 b ON a.key = b.key; diff --git ql/src/test/results/clientnegative/join2.q.out ql/src/test/results/clientnegative/join2.q.out index b53b3a1..d65573f 100644 --- ql/src/test/results/clientnegative/join2.q.out +++ ql/src/test/results/clientnegative/join2.q.out @@ -1 +1 @@ -FAILED: SemanticException [Error 10057]: MAPJOIN cannot be performed with OUTER JOIN +FAILED: SemanticException [Error 10234]: Table(s) specified in map-join hint is(are) not valid for this map-join diff --git ql/src/test/results/clientnegative/smb_bucketmapjoin.q.out ql/src/test/results/clientnegative/smb_bucketmapjoin.q.out index 7a5b8c1..f596d5a 100644 --- ql/src/test/results/clientnegative/smb_bucketmapjoin.q.out +++ ql/src/test/results/clientnegative/smb_bucketmapjoin.q.out @@ -34,4 +34,4 @@ POSTHOOK: Lineage: smb_bucket4_1.key EXPRESSION [(src)src.FieldSchema(name:key, POSTHOOK: Lineage: smb_bucket4_1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] POSTHOOK: Lineage: smb_bucket4_2.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] POSTHOOK: Lineage: smb_bucket4_2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] -FAILED: SemanticException [Error 10057]: MAPJOIN cannot be performed with OUTER JOIN +FAILED: SemanticException [Error 10234]: Table(s) specified in map-join hint is(are) not valid for this map-join diff --git ql/src/test/results/clientpositive/auto_sortmerge_join_15.q.out ql/src/test/results/clientpositive/auto_sortmerge_join_15.q.out new file mode 100644 index 0000000..6a74033 --- /dev/null +++ ql/src/test/results/clientpositive/auto_sortmerge_join_15.q.out @@ -0,0 +1,176 @@ +PREHOOK: query: CREATE TABLE tbl1(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE tbl1(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tbl1 +PREHOOK: query: CREATE TABLE tbl2(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE tbl2(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tbl2 +PREHOOK: query: insert overwrite table tbl1 select * from src where key < 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tbl1 +POSTHOOK: query: insert overwrite table tbl1 select * from src where key < 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tbl1 +POSTHOOK: Lineage: tbl1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table tbl2 select * from src where key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tbl2 +POSTHOOK: query: insert overwrite table tbl2 select * from src where key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tbl2 +POSTHOOK: Lineage: tbl1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tbl2.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: explain +select count(*) FROM tbl1 a LEFT OUTER JOIN tbl2 b ON a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) FROM tbl1 a LEFT OUTER JOIN tbl2 b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Lineage: tbl1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tbl2.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LEFTOUTERJOIN (TOK_TABREF (TOK_TABNAME tbl1) a) (TOK_TABREF (TOK_TABNAME tbl2) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Sorted Merge Bucket Map Join Operator + condition map: + Left Outer Join0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: explain +select count(*) FROM tbl1 a RIGHT OUTER JOIN tbl2 b ON a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) FROM tbl1 a RIGHT OUTER JOIN tbl2 b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Lineage: tbl1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tbl2.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_RIGHTOUTERJOIN (TOK_TABREF (TOK_TABNAME tbl1) a) (TOK_TABREF (TOK_TABNAME tbl2) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + b + TableScan + alias: b + Sorted Merge Bucket Map Join Operator + condition map: + Right Outer Join0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 1 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + +