diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index 0d6bcd7..d7dbe45 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -195,6 +195,26 @@ public void process(Object row, int tag) throws HiveException { } } + if (!nextKeyGroup && (tag == posBigTable) + && (candidateStorage[tag].rowCount() == joinEmitInterval)) { + boolean canEmit = true; + for (int i = 0; i < foundNextKeyGroup.length; i++) { + if (i == posBigTable) { + continue; + } + if (foundNextKeyGroup[i] == false) { + canEmit = false; + } + } + // we can save ourselves from spilling once we have join emit interval worth of rows. + if (canEmit) { + LOG.info("We are emitting rows since we hit the join emit interval of " + joinEmitInterval); + joinOneGroup(false); + candidateStorage[tag].clearRows(); + storage[tag].clearRows(); + } + } + reportProgress(); numMapRowsRead++; @@ -218,11 +238,15 @@ public void process(Object row, int tag) throws HiveException { } private List joinOneGroup() throws HiveException { + return joinOneGroup(true); + } + + private List joinOneGroup(boolean clear) throws HiveException { int[] smallestPos = findSmallestKey(); List listOfNeedFetchNext = null; if (smallestPos != null) { - listOfNeedFetchNext = joinObject(smallestPos); - if (listOfNeedFetchNext.size() > 0) { + listOfNeedFetchNext = joinObject(smallestPos, clear); + if ((listOfNeedFetchNext.size() > 0) && clear) { // listOfNeedFetchNext contains all tables that we have joined data in their // candidateStorage, and we need to clear candidate storage and promote their // nextGroupStorage to candidateStorage and fetch data until we reach a @@ -239,7 +263,7 @@ public void process(Object row, int tag) throws HiveException { return listOfNeedFetchNext; } - private List joinObject(int[] smallestPos) throws HiveException { + private List joinObject(int[] smallestPos, boolean clear) throws HiveException { List needFetchList = new ArrayList(); byte index = (byte) (smallestPos.length - 1); for (; index >= 0; index--) { @@ -248,7 +272,9 @@ public void process(Object row, int tag) throws HiveException { continue; } storage[index] = candidateStorage[index]; - needFetchList.add(index); + if (clear) { + needFetchList.add(index); + } if (smallestPos[index] < 0) { break; } @@ -257,9 +283,11 @@ public void process(Object row, int tag) throws HiveException { putDummyOrEmpty(index); } checkAndGenObject(); - for (Byte pos : needFetchList) { - this.candidateStorage[pos].clearRows(); - this.keyWritables[pos] = null; + if (clear) { + for (Byte pos : needFetchList) { + this.candidateStorage[pos].clearRows(); + this.keyWritables[pos] = null; + } } return needFetchList; } @@ -419,7 +447,6 @@ private void promoteNextGroupToCandidate(Byte t) throws HiveException { this.nextGroupStorage[t] = oldRowContainer; } - @SuppressWarnings("rawtypes") private boolean processKey(byte alias, List key) throws HiveException { List keyWritable = keyWritables[alias]; if (keyWritable == null) { diff --git ql/src/test/queries/clientpositive/tez_smb_1.q ql/src/test/queries/clientpositive/tez_smb_1.q index 266a81b..580672f 100644 --- ql/src/test/queries/clientpositive/tez_smb_1.q +++ ql/src/test/queries/clientpositive/tez_smb_1.q @@ -1,4 +1,5 @@ set hive.auto.convert.join=true; +set hive.join.emit.interval=2; set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=10000; set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;