diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index cb0a5e7..057ed48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -83,6 +85,7 @@ transient List> originalParents = new ArrayList>(); + transient List fetchInputAtClose; public CommonMergeJoinOperator() { super(); @@ -93,6 +96,7 @@ public CommonMergeJoinOperator() { public Collection> initializeOp(Configuration hconf) throws HiveException { Collection> result = super.initializeOp(hconf); firstFetchHappened = false; + fetchInputAtClose = getFetchInputAtCloseList(); int maxAlias = 0; for (byte pos = 0; pos < order.length; pos++) { @@ -145,6 +149,35 @@ public CommonMergeJoinOperator() { return result; } + /* + * In case of outer joins, we need to push records through even if one of the sides is done + * sending records. For e.g. In the case of full outer join, the right side needs to send in data + * for the join even after the left side has completed sending all the records on its side. This + * can be done once at initialize time and at close, these tags will still forward records until + * they have no more to send. + */ + private List getFetchInputAtCloseList() { + List retval = new ArrayList(); + for (JoinCondDesc joinCondDesc : conf.getConds()) { + switch (joinCondDesc.getType()) { + case JoinDesc.FULL_OUTER_JOIN: + retval.add(joinCondDesc.getLeft()); + retval.add(joinCondDesc.getRight()); + break; + case JoinDesc.LEFT_OUTER_JOIN: + retval.add(joinCondDesc.getLeft()); + break; + case JoinDesc.RIGHT_OUTER_JOIN: + retval.add(joinCondDesc.getRight()); + break; + default: + break; + } + } + + return retval; + } + @Override public void endGroup() throws HiveException { // we do not want the end group to cause a checkAndGenObject @@ -405,7 +438,23 @@ private void joinFinalLeftData() throws HiveException { while (!allFetchDone) { List ret = joinOneGroup(); - if (ret == null || ret.size() == 0) { + boolean nothingToDo = false; + for (int i = 0; i < fetchDone.length; i++) { + if (i == posBigTable) { + continue; + } + if (fetchInputAtClose.contains(i)) { + if (fetchDone[i] == false) { + fetchNextGroup((byte) i); + } + } else { + if (ret == null || ret.size() == 0) { + nothingToDo = true; + break; + } + } + } + if (nothingToDo) { break; } reportProgress(); diff --git a/ql/src/test/queries/clientpositive/mergejoin.q b/ql/src/test/queries/clientpositive/mergejoin.q index 257337a..cf642c0 100644 --- a/ql/src/test/queries/clientpositive/mergejoin.q +++ b/ql/src/test/queries/clientpositive/mergejoin.q @@ -105,3 +105,6 @@ join (select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 where vt1.id=vt2.id; +set mapred.reduce.tasks=18; +select * from (select * from tab where tab.key = 0)a full outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key; +select * from (select * from tab where tab.key = 0)a right outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key; diff --git a/ql/src/test/results/clientpositive/mergejoin.q.out b/ql/src/test/results/clientpositive/mergejoin.q.out index af3d7df..a2028b6 100644 --- a/ql/src/test/results/clientpositive/mergejoin.q.out +++ b/ql/src/test/results/clientpositive/mergejoin.q.out @@ -2565,3 +2565,38 @@ POSTHOOK: Input: default@tab_part POSTHOOK: Input: default@tab_part@ds=2008-04-08 #### A masked pattern was here #### 480 +PREHOOK: query: select * from (select * from tab where tab.key = 0)a full outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select * from (select * from tab where tab.key = 0)a full outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +0 val_0 2008-04-08 NULL NULL NULL +0 val_0 2008-04-08 NULL NULL NULL +0 val_0 2008-04-08 NULL NULL NULL +NULL NULL NULL 98 val_98 2008-04-08 +NULL NULL NULL 98 val_98 2008-04-08 +PREHOOK: query: select * from (select * from tab where tab.key = 0)a right outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select * from (select * from tab where tab.key = 0)a right outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +NULL NULL NULL 98 val_98 2008-04-08 +NULL NULL NULL 98 val_98 2008-04-08 diff --git a/ql/src/test/results/clientpositive/tez/mergejoin.q.out b/ql/src/test/results/clientpositive/tez/mergejoin.q.out index 48cd2a1..f1faa33 100644 --- a/ql/src/test/results/clientpositive/tez/mergejoin.q.out +++ b/ql/src/test/results/clientpositive/tez/mergejoin.q.out @@ -2526,3 +2526,38 @@ POSTHOOK: Input: default@tab_part POSTHOOK: Input: default@tab_part@ds=2008-04-08 #### A masked pattern was here #### 480 +PREHOOK: query: select * from (select * from tab where tab.key = 0)a full outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select * from (select * from tab where tab.key = 0)a full outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +0 val_0 2008-04-08 NULL NULL NULL +0 val_0 2008-04-08 NULL NULL NULL +0 val_0 2008-04-08 NULL NULL NULL +NULL NULL NULL 98 val_98 2008-04-08 +NULL NULL NULL 98 val_98 2008-04-08 +PREHOOK: query: select * from (select * from tab where tab.key = 0)a right outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select * from (select * from tab where tab.key = 0)a right outer join (select * from tab_part where tab_part.key = 98)b on a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +NULL NULL NULL 98 val_98 2008-04-08 +NULL NULL NULL 98 val_98 2008-04-08