Index: eclipse-templates/.classpath
===================================================================
--- eclipse-templates/.classpath (revision 1146922)
+++ eclipse-templates/.classpath (working copy)
@@ -10,7 +10,7 @@
-
+
Index: contrib/build.xml
===================================================================
--- contrib/build.xml (revision 1146922)
+++ contrib/build.xml (working copy)
@@ -76,6 +76,7 @@
logFile="${test.log.dir}/testcontribparsegen.log"
logDirectory="${test.log.dir}/contribpositive"/>
+
Map Operator Tree:
+ b
+ TableScan
+ alias: b
+ Sorted Merge Bucket Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {userid} {pageid} {postid} {type} {ds}
+ 1 {userid} {pageid} {postid} {type} {ds}
+ handleSkewJoin: false
+ keys:
+ 0 [Column[userid], Column[pageid], Column[postid], Column[type]]
+ 1 [Column[userid], Column[pageid], Column[postid], Column[type]]
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col7, _col8, _col9, _col10, _col11
+ Position of Big Table: 1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: int
+ expr: _col2
+ type: int
+ expr: _col3
+ type: string
+ expr: _col4
+ type: string
+ expr: _col7
+ type: int
+ expr: _col8
+ type: int
+ expr: _col9
+ type: int
+ expr: _col10
+ type: string
+ expr: _col11
+ type: string
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col7, _col8, _col9, _col10, _col11
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: int
+ expr: _col2
+ type: int
+ expr: _col3
+ type: string
+ expr: _col4
+ type: string
+ expr: _col7
+ type: int
+ expr: _col8
+ type: int
+ expr: _col9
+ type: int
+ expr: _col10
+ type: string
+ expr: _col11
+ type: string
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
Index: ql/src/test/queries/clientpositive/smb_mapjoin_10.q
===================================================================
--- ql/src/test/queries/clientpositive/smb_mapjoin_10.q (revision 0)
+++ ql/src/test/queries/clientpositive/smb_mapjoin_10.q (revision 0)
@@ -0,0 +1,18 @@
+
+create table tmp_smb_bucket_10(userid int, pageid int, postid int, type string) partitioned by (ds string) CLUSTERED BY (userid) SORTED BY (pageid, postid, type, userid) INTO 2 BUCKETS STORED AS RCFILE;
+
+alter table tmp_smb_bucket_10 add partition (ds = '1');
+alter table tmp_smb_bucket_10 add partition (ds = '2');
+
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+
+explain
+select /*+mapjoin(a)*/ * from tmp_smb_bucket_10 a join tmp_smb_bucket_10 b
+on (a.ds = '1' and b.ds = '2' and
+ a.userid = b.userid and
+ a.pageid = b.pageid and
+ a.postid = b.postid and
+ a.type = b.type);
+
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (revision 1146922)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (working copy)
@@ -27,9 +27,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.Stack;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -143,6 +143,7 @@
@Override
public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
+
MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
@@ -256,12 +257,12 @@
Iterator iter = prunedParts.getConfirmedPartns()
.iterator();
if (iter.hasNext()) {
- part = iter.next();
+ part = iter.next();
}
if (part == null) {
iter = prunedParts.getUnknownPartns().iterator();
if (iter.hasNext()) {
- part = iter.next();
+ part = iter.next();
}
}
assert part != null;
@@ -467,18 +468,13 @@
}
}
- // to see if the join columns from a table is exactly this same as its
- // bucket columns
- if (joinCols.size() == 0 || joinCols.size() != bucketColumns.size()) {
+ // Check if the join columns contains all bucket columns.
+ // If a table is bucketized on column B, but the join key is A and B,
+ // it is easy to see joining on different buckets yield empty results.
+ if (joinCols.size() == 0 || !joinCols.containsAll(bucketColumns)) {
return false;
}
- for (String col : joinCols) {
- if (!bucketColumns.contains(col)) {
- return false;
- }
- }
-
return true;
}