Index: conf/hive-default.xml.template
===================================================================
--- conf/hive-default.xml.template (revision 1365910)
+++ conf/hive-default.xml.template (working copy)
@@ -477,10 +477,27 @@
hive.mapred.mode
nonstrict
- The mode in which the hive operations are being performed. In strict mode, some risky queries are not allowed to run
+ The mode in which the hive operations are being performed.
+ In strict mode, some risky queries are not allowed to run. They include:
+ Cartesian Product.
+ No partition being picked up for a query.
+ Comparing bigints and strings.
+ Comparing bigints and doubles.
+ Orderby without limit.
+
+ hive.enforce.bucketmapjoin
+ false
+ If the user asked for bucketed map-side join, and it cannot be performed,
+ should the query fail or not ? For eg, if the buckets in the tables being joined are
+ not a multiple of each other, bucketed map-side join cannot be performed, and the
+ query will fail if hive.enforce.bucketmapjoin is set to true.
+
+
+
+
hive.exec.script.maxerrsize
100000
Maximum number of bytes a script is allowed to emit to standard error (per map-reduce task). This prevents runaway scripts from filling logs partitions to capacity
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1365910)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -465,6 +465,7 @@
HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
HIVEENFORCESORTING("hive.enforce.sorting", false),
HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
+ HIVEENFORCEBUCKETMAPJOIN("hive.enforce.bucketmapjoin", false),
HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
HIVEROWOFFSET("hive.exec.rowoffset", false),
Index: ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out
===================================================================
--- ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out (revision 0)
+++ ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out (working copy)
@@ -0,0 +1,243 @@
+PREHOOK: query: CREATE TABLE srcbucket_mapjoin_part (key int, value string)
+ partitioned by (ds string) CLUSTERED BY (key) INTO 3 BUCKETS
+ STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part (key int, value string)
+ partitioned by (ds string) CLUSTERED BY (key) INTO 3 BUCKETS
+ STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@srcbucket_mapjoin_part
+PREHOOK: query: load data local inpath '../data/files/srcbucket20.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+PREHOOK: Output: default@srcbucket_mapjoin_part
+POSTHOOK: query: load data local inpath '../data/files/srcbucket20.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket_mapjoin_part
+POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+PREHOOK: query: load data local inpath '../data/files/srcbucket21.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+PREHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../data/files/srcbucket21.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+PREHOOK: query: load data local inpath '../data/files/srcbucket22.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+PREHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../data/files/srcbucket22.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+PREHOOK: query: CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string)
+ partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS
+ STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string)
+ partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS
+ STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2
+PREHOOK: query: load data local inpath '../data/files/srcbucket22.txt'
+ INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+PREHOOK: Output: default@srcbucket_mapjoin_part_2
+POSTHOOK: query: load data local inpath '../data/files/srcbucket22.txt'
+ INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+PREHOOK: query: load data local inpath '../data/files/srcbucket23.txt'
+ INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+PREHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../data/files/srcbucket23.txt'
+ INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+PREHOOK: query: -- The number of buckets in the 2 tables above (being joined later) dont match.
+-- Throw an error if the user requested a bucketed mapjoin to be enforced.
+-- In the default case (hive.enforce.bucketmapjoin=false), the query succeeds
+-- even though mapjoin is not being performed
+
+explain
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08"
+PREHOOK: type: QUERY
+POSTHOOK: query: -- The number of buckets in the 2 tables above (being joined later) dont match.
+-- Throw an error if the user requested a bucketed mapjoin to be enforced.
+-- In the default case (hive.enforce.bucketmapjoin=false), the query succeeds
+-- even though mapjoin is not being performed
+
+explain
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08"
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME srcbucket_mapjoin_part) a) (TOK_TABREF (TOK_TABNAME srcbucket_mapjoin_part_2) b) (and (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) ds) "2008-04-08")) (= (. (TOK_TABLE_OR_COL b) ds) "2008-04-08")))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: int
+ tag: 0
+ value expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ b
+ TableScan
+ alias: b
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: int
+ tag: 1
+ value expressions:
+ expr: value
+ type: string
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col0} {VALUE._col1}
+ 1 {VALUE._col1}
+ handleSkewJoin: false
+ outputColumnNames: _col0, _col1, _col6
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ expr: _col6
+ type: string
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: explain
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08"
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08"
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME srcbucket_mapjoin_part) a) (TOK_TABREF (TOK_TABNAME srcbucket_mapjoin_part_2) b) (and (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) ds) "2008-04-08")) (= (. (TOK_TABLE_OR_COL b) ds) "2008-04-08")))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) value)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) value)))))
+
+STAGE DEPENDENCIES:
+ Stage-3 is a root stage
+ Stage-1 depends on stages: Stage-3
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-3
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ b
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ b
+ TableScan
+ alias: b
+ HashTable Sink Operator
+ condition expressions:
+ 0 {key} {value}
+ 1 {value}
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ a
+ TableScan
+ alias: a
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key} {value}
+ 1 {value}
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col0, _col1, _col6
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ expr: _col6
+ type: string
+ outputColumnNames: _col0, _col1, _col6
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ expr: _col6
+ type: string
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+FAILED: SemanticException [Error 10131]: The number of buckets are not a multiple of each other. Bucketed mapjoin cannot be performed. If you really want to perform the operation, either remove the mapjoin hint from your query or set hive.enforce.bucketmapjoin to false.
Index: ql/src/test/queries/clientnegative/bucket_mapjoin_mismatch1.q
===================================================================
--- ql/src/test/queries/clientnegative/bucket_mapjoin_mismatch1.q (revision 0)
+++ ql/src/test/queries/clientnegative/bucket_mapjoin_mismatch1.q (working copy)
@@ -0,0 +1,42 @@
+CREATE TABLE srcbucket_mapjoin_part (key int, value string)
+ partitioned by (ds string) CLUSTERED BY (key) INTO 3 BUCKETS
+ STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket21.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket22.txt'
+ INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string)
+ partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS
+ STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket22.txt'
+ INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt'
+ INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+
+-- The number of buckets in the 2 tables above (being joined later) dont match.
+-- Throw an error if the user requested a bucketed mapjoin to be enforced.
+-- In the default case (hive.enforce.bucketmapjoin=false), the query succeeds
+-- even though mapjoin is not being performed
+
+explain
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08";
+
+set hive.optimize.bucketmapjoin = true;
+
+explain
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08";
+
+set hive.enforce.bucketmapjoin=true;
+
+explain
+select /*+mapjoin(b)*/ a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and a.ds="2008-04-08" and b.ds="2008-04-08";
+
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (revision 1365910)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (working copy)
@@ -37,6 +37,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -79,7 +81,8 @@
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map opRules = new LinkedHashMap();
- BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx = new BucketMapjoinOptProcCtx();
+ BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx =
+ new BucketMapjoinOptProcCtx(pctx.getConf());
// process map joins with no reducers pattern
opRules.put(new RuleRegExp("R1", "MAPJOIN%"), getBucketMapjoinProc(pctx));
@@ -148,6 +151,7 @@
MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
+ HiveConf conf = context.getConf();
if(context.getListOfRejectedMapjoins().contains(mapJoinOp)) {
return null;
@@ -265,6 +269,11 @@
// the big table can be divided by no of buckets in small tables.
for (Integer bucketNumber : bigTblPartsToBucketNumber.values()) {
if (!checkBucketNumberAgainstBigTable(aliasToPartitionBucketNumberMapping, bucketNumber)) {
+ // Throw an error if the user asked for bucketed mapjoin to be enforced and
+ // bucketed mapjoin cannot be performed
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETMAPJOIN)) {
+ throw new SemanticException(ErrorMsg.BUCKET_MAPJOIN_BUCKET_MISMATCH.getMsg());
+ }
return null;
}
}
@@ -418,14 +427,23 @@
}
class BucketMapjoinOptProcCtx implements NodeProcessorCtx {
+ private final HiveConf conf;
+
// we only convert map joins that follows a root table scan in the same
// mapper. That means there is no reducer between the root table scan and
// mapjoin.
Set listOfRejectedMapjoins = new HashSet();
+ public BucketMapjoinOptProcCtx(HiveConf conf) {
+ this.conf = conf;
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
public Set getListOfRejectedMapjoins() {
return listOfRejectedMapjoins;
}
-
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (revision 1365910)
+++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (working copy)
@@ -219,6 +219,11 @@
"partitioned table is not allowed. It may lead to wrong results for " +
"older partitions"),
+ BUCKET_MAPJOIN_BUCKET_MISMATCH(10131,
+ "The number of buckets are not a multiple of each other. " +
+ "Bucketed mapjoin cannot be performed. " +
+ "If you really want to perform the operation, either remove the " +
+ "mapjoin hint from your query or set hive.enforce.bucketmapjoin to false."),
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "