Index: conf/hive-default.xml.template =================================================================== --- conf/hive-default.xml.template (revision 1367859) +++ conf/hive-default.xml.template (working copy) @@ -477,10 +477,27 @@ hive.mapred.mode nonstrict - The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run + The mode in which the hive operations are being performed. + In strict mode, some risky queries are not allowed to run. They include: + Cartesian Product. + No partition being picked up for a query. + Comparing bigints and strings. + Comparing bigints and doubles. + Orderby without limit. + + hive.enforce.bucketmapjoin + false + If the user asked for bucketed map-side join, and it cannot be performed, + should the query fail or not ? For eg, if the buckets in the tables being joined are + not a multiple of each other, bucketed map-side join cannot be performed, and the + query will fail if hive.enforce.bucketmapjoin is set to true. + + + + hive.exec.script.maxerrsize 100000 Maximum number of bytes a script is allowed to emit to standard error (per map-reduce task). This prevents runaway scripts from filling logs partitions to capacity Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1367859) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -465,6 +465,7 @@ HIVEENFORCEBUCKETING("hive.enforce.bucketing", false), HIVEENFORCESORTING("hive.enforce.sorting", false), HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"), + HIVEENFORCEBUCKETMAPJOIN("hive.enforce.bucketmapjoin", false), HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false), HIVEROWOFFSET("hive.exec.rowoffset", false), Index: ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out =================================================================== --- ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out (revision 0) +++ ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out (working copy) @@ -0,0 +1,243 @@ +PREHOOK: query: CREATE TABLE srcbucket_mapjoin_part (key int, value string) + partitioned by (ds string) CLUSTERED BY (key) INTO 3 BUCKETS + STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part (key int, value string) + partitioned by (ds string) CLUSTERED BY (key) INTO 3 BUCKETS + STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcbucket_mapjoin_part +PREHOOK: query: load data local inpath '../data/files/srcbucket20.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_mapjoin_part +POSTHOOK: query: load data local inpath '../data/files/srcbucket20.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_mapjoin_part +POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08 +PREHOOK: query: load data local inpath '../data/files/srcbucket21.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../data/files/srcbucket21.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08 +PREHOOK: query: load data local inpath '../data/files/srcbucket22.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../data/files/srcbucket22.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08 +PREHOOK: query: CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) + partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS + STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) + partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS + STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcbucket_mapjoin_part_2 +PREHOOK: query: load data local inpath '../data/files/srcbucket22.txt' + INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_mapjoin_part_2 +POSTHOOK: query: load data local inpath '../data/files/srcbucket22.txt' + INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_mapjoin_part_2 +POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08 +PREHOOK: query: load data local inpath '../data/files/srcbucket23.txt' + INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../data/files/srcbucket23.txt' + INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08 +PREHOOK: query: -- The number of buckets in the 2 tables above (being joined later) dont match. +-- Throw an error if the user requested a bucketed mapjoin to be enforced. +-- In the default case (hive.enforce.bucketmapjoin=false), the query succeeds +-- even though mapjoin is not being performed + +explain +select a.key, a.value, b.value +from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b +on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08" +PREHOOK: type: QUERY +POSTHOOK: query: -- The number of buckets in the 2 tables above (being joined later) dont match. +-- Throw an error if the user requested a bucketed mapjoin to be enforced. +-- In the default case (hive.enforce.bucketmapjoin=false), the query succeeds +-- even though mapjoin is not being performed + +explain +select a.key, a.value, b.value +from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b +on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08" +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME srcbucket_mapjoin_part) a) (TOK_TABREF (TOK_TABNAME srcbucket_mapjoin_part_2) b) (and (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) ds) "2008-04-08")) (= (. (TOK_TABLE_OR_COL b) ds) "2008-04-08")))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: int + sort order: + + Map-reduce partition columns: + expr: key + type: int + tag: 0 + value expressions: + expr: key + type: int + expr: value + type: string + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: int + sort order: + + Map-reduce partition columns: + expr: key + type: int + tag: 1 + value expressions: + expr: value + type: string + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col0, _col1, _col6 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col6 + type: string + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: explain +select /*+mapjoin(b)*/ a.key, a.value, b.value +from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b +on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08" +PREHOOK: type: QUERY +POSTHOOK: query: explain +select /*+mapjoin(b)*/ a.key, a.value, b.value +from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b +on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08" +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME srcbucket_mapjoin_part) a) (TOK_TABREF (TOK_TABNAME srcbucket_mapjoin_part_2) b) (and (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) ds) "2008-04-08")) (= (. (TOK_TABLE_OR_COL b) ds) "2008-04-08")))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value))))) + +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-3 + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + HashTable Sink Operator + condition expressions: + 0 {key} {value} + 1 {value} + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 0 + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 {value} + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0, _col1, _col6 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col6 + type: string + outputColumnNames: _col0, _col1, _col6 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col6 + type: string + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Local Work: + Map Reduce Local Work + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +FAILED: SemanticException [Error 10136]: Bucketed mapjoin cannot be performed. This can be due to multiple reasons: . Join columns dont match bucketed columns. . Number of buckets are not a multiple of each other. If you really want to perform the operation, either remove the mapjoin hint from your query or set hive.enforce.bucketmapjoin to false. Index: ql/src/test/queries/clientnegative/bucket_mapjoin_mismatch1.q =================================================================== --- ql/src/test/queries/clientnegative/bucket_mapjoin_mismatch1.q (revision 0) +++ ql/src/test/queries/clientnegative/bucket_mapjoin_mismatch1.q (working copy) @@ -0,0 +1,42 @@ +CREATE TABLE srcbucket_mapjoin_part (key int, value string) + partitioned by (ds string) CLUSTERED BY (key) INTO 3 BUCKETS + STORED AS TEXTFILE; +load data local inpath '../data/files/srcbucket20.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../data/files/srcbucket21.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); +load data local inpath '../data/files/srcbucket22.txt' + INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08'); + +CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) + partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS + STORED AS TEXTFILE; +load data local inpath '../data/files/srcbucket22.txt' + INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08'); +load data local inpath '../data/files/srcbucket23.txt' + INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08'); + +-- The number of buckets in the 2 tables above (being joined later) dont match. +-- Throw an error if the user requested a bucketed mapjoin to be enforced. +-- In the default case (hive.enforce.bucketmapjoin=false), the query succeeds +-- even though mapjoin is not being performed + +explain +select a.key, a.value, b.value +from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b +on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08"; + +set hive.optimize.bucketmapjoin = true; + +explain +select /*+mapjoin(b)*/ a.key, a.value, b.value +from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b +on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08"; + +set hive.enforce.bucketmapjoin=true; + +explain +select /*+mapjoin(b)*/ a.key, a.value, b.value +from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b +on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08"; + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (revision 1367859) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (working copy) @@ -38,6 +38,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -80,7 +82,8 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { Map opRules = new LinkedHashMap(); - BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx = new BucketMapjoinOptProcCtx(); + BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx = + new BucketMapjoinOptProcCtx(pctx.getConf()); // process map joins with no reducers pattern opRules.put(new RuleRegExp("R1", "MAPJOIN%"), getBucketMapjoinProc(pctx)); @@ -142,21 +145,19 @@ this.pGraphContext = pGraphContext; } - @Override - @SuppressWarnings("unchecked") - public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + private boolean convertBucketMapJoin(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - MapJoinOperator mapJoinOp = (MapJoinOperator) nd; BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx; + HiveConf conf = context.getConf(); if(context.getListOfRejectedMapjoins().contains(mapJoinOp)) { - return null; + return false; } QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext().get(mapJoinOp); if(joinCxt == null) { - return null; + return false; } List joinAliases = new ArrayList(); @@ -200,7 +201,7 @@ String alias = joinAliases.get(index); TableScanOperator tso = (TableScanOperator) topOps.get(alias); if (tso == null) { - return null; + return false; } Table tbl = topToTable.get(tso); if(tbl.isPartitioned()) { @@ -230,7 +231,7 @@ List> files = new ArrayList>(); for (Partition p : partitions) { if (!checkBucketColumns(p.getBucketCols(), mjDecs, index)) { - return null; + return false; } List fileNames = getOnePartitionBucketFileNames(p.getDataLocation()); if (alias.equals(baseBigAlias)) { @@ -248,7 +249,7 @@ } } else { if (!checkBucketColumns(tbl.getBucketCols(), mjDecs, index)) { - return null; + return false; } List fileNames = getOnePartitionBucketFileNames(tbl.getDataLocation()); Integer num = new Integer(tbl.getNumBuckets()); @@ -268,7 +269,7 @@ // the big table can be divided by no of buckets in small tables. for (Integer bucketNumber : bigTblPartsToBucketNumber.values()) { if (!checkBucketNumberAgainstBigTable(aliasToPartitionBucketNumberMapping, bucketNumber)) { - return null; + return false; } } @@ -318,6 +319,26 @@ if (bigTablePartitioned) { desc.setBigTablePartSpecToFileMapping(convert(bigTblPartsToBucketFileNames)); } + + return true; + } + + + @Override + @SuppressWarnings("unchecked") + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + boolean convert = convertBucketMapJoin(nd, stack, procCtx, nodeOutputs); + BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx; + HiveConf conf = context.getConf(); + + // Throw an error if the user asked for bucketed mapjoin to be enforced and + // bucketed mapjoin cannot be performed + if (!convert && conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETMAPJOIN)) { + throw new SemanticException(ErrorMsg.BUCKET_MAPJOIN_NOT_POSSIBLE.getMsg()); + } + return null; } @@ -433,14 +454,23 @@ } class BucketMapjoinOptProcCtx implements NodeProcessorCtx { + private final HiveConf conf; + // we only convert map joins that follows a root table scan in the same // mapper. That means there is no reducer between the root table scan and // mapjoin. Set listOfRejectedMapjoins = new HashSet(); + public BucketMapjoinOptProcCtx(HiveConf conf) { + this.conf = conf; + } + + public HiveConf getConf() { + return conf; + } + public Set getListOfRejectedMapjoins() { return listOfRejectedMapjoins; } - } } Index: ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (revision 1367859) +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (working copy) @@ -224,6 +224,14 @@ ALTER_VIEW_DISALLOWED_OP(10133, "Cannot use this form of ALTER on a view"), ALTER_TABLE_NON_NATIVE(10134, "ALTER TABLE cannot be used for a non-native table"), + BUCKET_MAPJOIN_NOT_POSSIBLE(10136, + "Bucketed mapjoin cannot be performed. " + + "This can be due to multiple reasons: " + + " . Join columns dont match bucketed columns. " + + " . Number of buckets are not a multiple of each other. " + + "If you really want to perform the operation, either remove the " + + "mapjoin hint from your query or set hive.enforce.bucketmapjoin to false."), + SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " + "It may have crashed with an error."),