Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template (revision 1365916) +++ conf/hive-default.xml.template (working copy) @@ -758,6 +758,14 @@ + hive.enforce.sortmergebucketmapjoin + false + If the user asked for sort-merge bucketed map-side join, and it cannot be performed, + should the query fail or not ? + + + + hive.metastore.ds.connection.url.hook Name of the hook to use for retriving the JDO connection URL. If empty, the value in javax.jdo.option.ConnectionURL is used Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1365916) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -465,6 +465,7 @@ HIVEENFORCEBUCKETING("hive.enforce.bucketing", false), HIVEENFORCESORTING("hive.enforce.sorting", false), HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"), + HIVEENFORCESORTMERGEBUCKETMAPJOIN("hive.enforce.sortmergebucketmapjoin", false), HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false), HIVEROWOFFSET("hive.exec.rowoffset", false), Index: ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out =================================================================== --- ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out (revision 0) +++ ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out (working copy) @@ -0,0 +1,144 @@ +PREHOOK: query: create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc) +INTO 1 BUCKETS STORED AS RCFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc) +INTO 1 BUCKETS STORED AS RCFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_asc +PREHOOK: query: create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc) +INTO 1 BUCKETS STORED AS RCFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc) +INTO 1 BUCKETS STORED AS RCFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc +PREHOOK: query: insert overwrite table table_asc select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_asc +POSTHOOK: query: insert overwrite table table_asc select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_asc +POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc +POSTHOOK: query: insert overwrite table table_desc select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc +POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- If the user asked for sort merge join to be enforced (by setting +-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since +-- one of the tables is in ascending order and the other is in descending order, +-- and sort merge bucket mapjoin cannot be performed. In the default mode, the +-- query would succeed, although a regular map-join would be performed instead of +-- what the user asked. + +explain +select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: -- If the user asked for sort merge join to be enforced (by setting +-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since +-- one of the tables is in ascending order and the other is in descending order, +-- and sort merge bucket mapjoin cannot be performed. In the default mode, the +-- query would succeed, although a regular map-join would be performed instead of +-- what the user asked. + +explain +select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_asc) a) (TOK_TABREF (TOK_TABNAME table_desc) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-3 + Map Reduce Local Work + Alias -> Map Local Tables: + a + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + a + TableScan + alias: a + HashTable Sink Operator + condition expressions: + 0 {key} {value} + 1 {key} {value} + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 1 + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + b + TableScan + alias: b + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 {key} {value} + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0, _col1, _col4, _col5 + Position of Big Table: 1 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col4 + type: int + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col4, _col5 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col4 + type: int + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col2, _col3 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Local Work: + Map Reduce Local Work + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +FAILED: SemanticException [Error 10132]: Sort merge bucketed join could not be performed. If you really want to perform the operation, either set hive.optimize.bucketmapjoin.sortedmerge=false, or set hive.enforce.sortmergebucketmapjoin=false. Index: ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q =================================================================== --- ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q (revision 0) +++ ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q (working copy) @@ -0,0 +1,28 @@ +create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc) +INTO 1 BUCKETS STORED AS RCFILE; +create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc) +INTO 1 BUCKETS STORED AS RCFILE; + +set hive.enforce.bucketing = true; +set hive.enforce.sorting = true; + +insert overwrite table table_asc select key, value from src; +insert overwrite table table_desc select key, value from src; +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- If the user asked for sort merge join to be enforced (by setting +-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since +-- one of the tables is in ascending order and the other is in descending order, +-- and sort merge bucket mapjoin cannot be performed. In the default mode, the +-- query would succeed, although a regular map-join would be performed instead of +-- what the user asked. + +explain +select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key; + +set hive.enforce.sortmergebucketmapjoin=true; + +explain +select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (revision 1365916) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (working copy) @@ -28,7 +28,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -104,7 +106,7 @@ } class SortedMergeBucketMapjoinProc implements NodeProcessor { - ParseContext pGraphContext; + private ParseContext pGraphContext; public SortedMergeBucketMapjoinProc(ParseContext pctx) { this.pGraphContext = pctx; @@ -113,23 +115,24 @@ public SortedMergeBucketMapjoinProc() { } - @Override - public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + // Return true or false based on whether the mapjoin was converted successfully to + // a sort-merge map join operator. + private boolean convertSMBJoin(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { if (nd instanceof SMBMapJoinOperator) { - return null; + return false; } MapJoinOperator mapJoinOp = (MapJoinOperator) nd; if (mapJoinOp.getConf().getAliasBucketFileNameMapping() == null || mapJoinOp.getConf().getAliasBucketFileNameMapping().size() == 0) { - return null; + return false; } boolean tableSorted = true; QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext() .get(mapJoinOp); if (joinCxt == null) { - return null; + return false; } String[] srcs = joinCxt.getBaseSrc(); int pos = 0; @@ -142,11 +145,26 @@ //this is a mapjoin but not suit for a sort merge bucket map join. check outer joins MapJoinProcessor.checkMapJoin(((MapJoinOperator) nd).getConf().getPosBigTable(), ((MapJoinOperator) nd).getConf().getConds()); - return null; + return false; } // convert a bucket map join operator to a sorted merge bucket map join // operator convertToSMBJoin(mapJoinOp, srcs); + return true; + } + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + boolean convert = convertSMBJoin(nd, stack, procCtx, nodeOutputs); + // Throw an error if the user asked for sort merge bucketed mapjoin to be enforced + // and sort merge bucketed mapjoin cannot be performed + if (!convert && + pGraphContext.getConf().getBoolVar( + HiveConf.ConfVars.HIVEENFORCESORTMERGEBUCKETMAPJOIN)) { + throw new SemanticException(ErrorMsg.SORTMERGE_MAPJOIN_FAILED.getMsg()); + } + return null; } Index: ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (revision 1365916) +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (working copy) @@ -219,6 +219,11 @@ "partitioned table is not allowed. It may lead to wrong results for " + "older partitions"), + SORTMERGE_MAPJOIN_FAILED(10132, + "Sort merge bucketed join could not be performed. " + + "If you really want to perform the operation, either set " + + "hive.optimize.bucketmapjoin.sortedmerge=false, or set " + + "hive.enforce.sortmergebucketmapjoin=false."), SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "