Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (revision 1636677) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (working copy) @@ -134,6 +134,19 @@ sources = ((TezContext) MapredContext.get()).getRecordSources(); } + @Override + public void endGroup() throws HiveException { + // we do not want the end group to cause a checkAndGenObject + defaultEndGroup(); + } + + @Override + public void startGroup() throws HiveException { + // we do not want the start group to clear the storage + defaultStartGroup(); + } + + /* * (non-Javadoc) * @@ -275,7 +288,7 @@ if (foundNextKeyGroup[t]) { // first promote the next group to be the current group if we reached a // new group in the previous fetch - if ((this.nextKeyWritables[t] != null) || (this.fetchDone[t] == false)) { + if (this.nextKeyWritables[t] != null) { promoteNextGroupToCandidate(t); } else { this.keyWritables[t] = null; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (revision 1636677) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (working copy) @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -73,10 +74,10 @@ // for different tags private SerDe inputValueDeserializer; - TableDesc keyTableDesc; - TableDesc valueTableDesc; + private TableDesc keyTableDesc; + private TableDesc valueTableDesc; - ObjectInspector rowObjectInspector; + private ObjectInspector rowObjectInspector; private Operator reducer; private Object keyObject = null; @@ -84,8 +85,6 @@ private boolean vectorized = false; - List row = new ArrayList(Utilities.reduceFieldNameList.size()); - private DataOutputBuffer keyBuffer; private DataOutputBuffer valueBuffer; private VectorizedRowBatchCtx batchContext; @@ -111,7 +110,7 @@ private Iterable valueWritables; - private final boolean grouped = true; + private final GroupIterator groupIterator = new GroupIterator(); void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyTableDesc, TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag, @@ -207,7 +206,7 @@ @Override public final boolean isGrouped() { - return grouped; + return vectorized; } @Override @@ -214,6 +213,12 @@ public boolean pushRecord() throws HiveException { BytesWritable keyWritable; + if (!vectorized && groupIterator.hasNext()) { + // if we have records left in the group we push one of those + groupIterator.next(); + return true; + } + try { if (!reader.next()) { return false; @@ -245,11 +250,13 @@ reducer.setGroupKeyObject(keyObject); } - /* this.keyObject passed via reference */ if(vectorized) { processVectors(valueWritables, tag); } else { - processKeyValues(valueWritables, tag); + groupIterator.initialize(valueWritables, keyObject, tag); + if (groupIterator.hasNext()) { + groupIterator.next(); // push first record of group + } } return true; } catch (Throwable e) { @@ -279,16 +286,29 @@ } } - /** - * @param values - * @return true if it is not done and can take more inputs - */ - private void processKeyValues(Iterable values, byte tag) throws HiveException { - List passDownKey = null; - for (Object value : values) { + private class GroupIterator { + private final List row = new ArrayList(Utilities.reduceFieldNameList.size()); + private List passDownKey = null; + private Iterator values; + private byte tag; + private Object keyObject; + + public void initialize(Iterable values, Object keyObject, byte tag) { + this.passDownKey = null; + this.values = values.iterator(); + this.tag = tag; + this.keyObject = keyObject; + } + + public boolean hasNext() { + return values != null && values.hasNext(); + } + + public void next() throws HiveException { + row.clear(); + Object value = values.next(); BytesWritable valueWritable = (BytesWritable) value; - row.clear(); if (passDownKey == null) { row.add(this.keyObject); } else { @@ -387,7 +407,6 @@ } catch (Exception e) { if (!abort) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators: " + e.getMessage(), e); }