Index: conf/hive-default.xml.template
===================================================================
--- conf/hive-default.xml.template (revision 1365916)
+++ conf/hive-default.xml.template (working copy)
@@ -758,6 +758,14 @@
+ hive.enforce.sortmergebucketmapjoin
+ false
+ If the user asked for sort-merge bucketed map-side join, and it cannot be performed,
+ should the query fail or not ?
+
+
+
+
hive.metastore.ds.connection.url.hook
Name of the hook to use for retriving the JDO connection URL. If empty, the value in javax.jdo.option.ConnectionURL is used
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1365916)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -465,6 +465,7 @@
HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
HIVEENFORCESORTING("hive.enforce.sorting", false),
HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
+ HIVEENFORCESORTMERGEBUCKETMAPJOIN("hive.enforce.sortmergebucketmapjoin", false),
HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
HIVEROWOFFSET("hive.exec.rowoffset", false),
Index: ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out
===================================================================
--- ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out (revision 0)
+++ ql/src/test/results/clientnegative/sortmerge_mapjoin_mismatch_1.q.out (working copy)
@@ -0,0 +1,144 @@
+PREHOOK: query: create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc)
+INTO 1 BUCKETS STORED AS RCFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc)
+INTO 1 BUCKETS STORED AS RCFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@table_asc
+PREHOOK: query: create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc)
+INTO 1 BUCKETS STORED AS RCFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc)
+INTO 1 BUCKETS STORED AS RCFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@table_desc
+PREHOOK: query: insert overwrite table table_asc select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@table_asc
+POSTHOOK: query: insert overwrite table table_asc select key, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@table_asc
+POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table table_desc select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@table_desc
+POSTHOOK: query: insert overwrite table table_desc select key, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@table_desc
+POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: table_desc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_desc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: -- If the user asked for sort merge join to be enforced (by setting
+-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since
+-- one of the tables is in ascending order and the other is in descending order,
+-- and sort merge bucket mapjoin cannot be performed. In the default mode, the
+-- query would succeed, although a regular map-join would be performed instead of
+-- what the user asked.
+
+explain
+select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: -- If the user asked for sort merge join to be enforced (by setting
+-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since
+-- one of the tables is in ascending order and the other is in descending order,
+-- and sort merge bucket mapjoin cannot be performed. In the default mode, the
+-- query would succeed, although a regular map-join would be performed instead of
+-- what the user asked.
+
+explain
+select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: table_asc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_asc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: table_desc.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: table_desc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_asc) a) (TOK_TABREF (TOK_TABNAME table_desc) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+ Stage-3 is a root stage
+ Stage-1 depends on stages: Stage-3
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-3
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ a
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ a
+ TableScan
+ alias: a
+ HashTable Sink Operator
+ condition expressions:
+ 0 {key} {value}
+ 1 {key} {value}
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ Position of Big Table: 1
+
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ b
+ TableScan
+ alias: b
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key} {value}
+ 1 {key} {value}
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col0, _col1, _col4, _col5
+ Position of Big Table: 1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ expr: _col4
+ type: int
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1, _col4, _col5
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: string
+ expr: _col4
+ type: int
+ expr: _col5
+ type: string
+ outputColumnNames: _col0, _col1, _col2, _col3
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+FAILED: SemanticException [Error 10132]: Sort merge bucketed join could not be performed. If you really want to perform the operation, either set hive.optimize.bucketmapjoin.sortedmerge=false, or set hive.enforce.sortmergebucketmapjoin=false.
Index: ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q
===================================================================
--- ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q (revision 0)
+++ ql/src/test/queries/clientnegative/sortmerge_mapjoin_mismatch_1.q (working copy)
@@ -0,0 +1,28 @@
+create table table_asc(key int, value string) CLUSTERED BY (key) SORTED BY (key asc)
+INTO 1 BUCKETS STORED AS RCFILE;
+create table table_desc(key int, value string) CLUSTERED BY (key) SORTED BY (key desc)
+INTO 1 BUCKETS STORED AS RCFILE;
+
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+
+insert overwrite table table_asc select key, value from src;
+insert overwrite table table_desc select key, value from src;
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+
+-- If the user asked for sort merge join to be enforced (by setting
+-- hive.enforce.sortmergebucketmapjoin to true), an error should be thrown, since
+-- one of the tables is in ascending order and the other is in descending order,
+-- and sort merge bucket mapjoin cannot be performed. In the default mode, the
+-- query would succeed, although a regular map-join would be performed instead of
+-- what the user asked.
+
+explain
+select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key;
+
+set hive.enforce.sortmergebucketmapjoin=true;
+
+explain
+select /*+mapjoin(a)*/ * from table_asc a join table_desc b on a.key = b.key;
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (revision 1365916)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (working copy)
@@ -28,7 +28,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -104,7 +106,7 @@
}
class SortedMergeBucketMapjoinProc implements NodeProcessor {
- ParseContext pGraphContext;
+ private ParseContext pGraphContext;
public SortedMergeBucketMapjoinProc(ParseContext pctx) {
this.pGraphContext = pctx;
@@ -113,23 +115,24 @@
public SortedMergeBucketMapjoinProc() {
}
- @Override
- public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ // Return true or false based on whether the mapjoin was converted successfully to
+ // a sort-merge map join operator.
+ private boolean convertSMBJoin(Node nd, Stack stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
if (nd instanceof SMBMapJoinOperator) {
- return null;
+ return false;
}
MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
if (mapJoinOp.getConf().getAliasBucketFileNameMapping() == null
|| mapJoinOp.getConf().getAliasBucketFileNameMapping().size() == 0) {
- return null;
+ return false;
}
boolean tableSorted = true;
QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext()
.get(mapJoinOp);
if (joinCxt == null) {
- return null;
+ return false;
}
String[] srcs = joinCxt.getBaseSrc();
int pos = 0;
@@ -142,11 +145,26 @@
//this is a mapjoin but not suit for a sort merge bucket map join. check outer joins
MapJoinProcessor.checkMapJoin(((MapJoinOperator) nd).getConf().getPosBigTable(),
((MapJoinOperator) nd).getConf().getConds());
- return null;
+ return false;
}
// convert a bucket map join operator to a sorted merge bucket map join
// operator
convertToSMBJoin(mapJoinOp, srcs);
+ return true;
+ }
+
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ boolean convert = convertSMBJoin(nd, stack, procCtx, nodeOutputs);
+ // Throw an error if the user asked for sort merge bucketed mapjoin to be enforced
+ // and sort merge bucketed mapjoin cannot be performed
+ if (!convert &&
+ pGraphContext.getConf().getBoolVar(
+ HiveConf.ConfVars.HIVEENFORCESORTMERGEBUCKETMAPJOIN)) {
+ throw new SemanticException(ErrorMsg.SORTMERGE_MAPJOIN_FAILED.getMsg());
+ }
+
return null;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (revision 1365916)
+++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (working copy)
@@ -219,6 +219,11 @@
"partitioned table is not allowed. It may lead to wrong results for " +
"older partitions"),
+ SORTMERGE_MAPJOIN_FAILED(10132,
+ "Sort merge bucketed join could not be performed. " +
+ "If you really want to perform the operation, either set " +
+ "hive.optimize.bucketmapjoin.sortedmerge=false, or set " +
+ "hive.enforce.sortmergebucketmapjoin=false."),
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "