diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 8b3489f..c3a66b2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -115,8 +115,6 @@ long nextSz = 0; transient Byte lastAlias = null; - transient boolean handleSkewJoin = false; - transient boolean hasLeftSemiJoin = false; protected transient int countAfterReport; @@ -146,7 +144,6 @@ public CommonJoinOperator(CommonJoinOperator clone) { this.dummyObjVectors = clone.dummyObjVectors; this.forwardCache = clone.forwardCache; this.groupKeyObject = clone.groupKeyObject; - this.handleSkewJoin = clone.handleSkewJoin; this.hconf = clone.hconf; this.id = clone.id; this.inputObjInspectors = clone.inputObjInspectors; @@ -186,7 +183,6 @@ public CommonJoinOperator(CommonJoinOperator clone) { @Override @SuppressWarnings("unchecked") protected void initializeOp(Configuration hconf) throws HiveException { - this.handleSkewJoin = conf.getHandleSkewJoin(); this.hconf = hconf; heartbeatInterval = HiveConf.getIntVar(hconf, @@ -319,15 +315,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { LOG.info("JOIN " + outputObjInspector.getTypeName() + " totalsz = " + totalSz); } - transient boolean newGroupStarted = false; - @Override - public void startGroup() throws HiveException { - newGroupStarted = true; + public void startGroupOp() throws HiveException { + LOG.trace("Join: Starting new group"); for (AbstractRowContainer> alw : storage) { alw.clearRows(); } - super.startGroup(); } protected long getNextSize(long sz) { @@ -628,9 +621,16 @@ protected final short getFilterTag(List row) { * Forward a record of join results. * * @throws HiveException + * @param flush */ @Override - public void endGroup() throws HiveException { + public void endGroupOp(boolean flush) throws HiveException { + LOG.trace("Join Op: endGroup called: numValues=" + numAliases); + checkAndGenObject(); + } + + @Override + protected void flushOp() throws HiveException { checkAndGenObject(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java index 772dda6..15e36bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java @@ -85,9 +85,6 @@ private int childrenDone; - // The index of the child which the last row was forwarded to in a key group. - private int lastChildIndex; - // Since DemuxOperator may appear multiple times in MuxOperator's parents list. // We use newChildIndexTag instead of childOperatorsTag. // Example: @@ -247,13 +244,6 @@ protected void initializeChildren(Configuration hconf) throws HiveException { public void processOp(Object row, int tag) throws HiveException { int currentChildIndex = newTagToChildIndex[tag]; - // Check if we start to forward rows to a new child. - // If so, in the current key group, rows will not be forwarded - // to those children which have an index less than the currentChildIndex. - // We can call flush the buffer of children from lastChildIndex (inclusive) - // to currentChildIndex (exclusive) and propagate processGroup to those children. - endGroupIfNecessary(currentChildIndex); - int oldTag = newTagToOldTag[tag]; if (isLogInfoEnabled) { cntrs[tag]++; @@ -286,6 +276,11 @@ public void forward(Object row, ObjectInspector rowInspector) } @Override + public void endGroup(boolean flush) throws HiveException { + super.endGroup(true); + } + + @Override protected void closeOp(boolean abort) throws HiveException { for (int i = 0 ; i < newTagToOldTag.length; i++) { int newTag = i; @@ -297,55 +292,6 @@ protected void closeOp(boolean abort) throws HiveException { } /** - * We assume that the input rows associated with the same key are ordered by - * the tag. Because a tag maps to a childindex, when we see a new childIndex, - * we will not see the last childIndex (lastChildIndex) again before we start - * a new key group. So, we can call flush the buffer of children - * from lastChildIndex (inclusive) to currentChildIndex (exclusive) and - * propagate processGroup to those children. - * @param currentChildIndex the childIndex we have right now. - * @throws HiveException - */ - private void endGroupIfNecessary(int currentChildIndex) throws HiveException { - if (lastChildIndex != currentChildIndex) { - for (int i = lastChildIndex; i < currentChildIndex; i++) { - Operator child = childOperatorsArray[i]; - child.flush(); - child.endGroup(); - for (int childTag: newChildOperatorsTag[i]) { - child.processGroup(childTag); - } - } - lastChildIndex = currentChildIndex; - } - } - - @Override - public void startGroup() throws HiveException { - lastChildIndex = 0; - super.startGroup(); - } - - @Override - public void endGroup() throws HiveException { - if (childOperators == null) { - return; - } - - // We will start a new key group. We can call flush the buffer - // of children from lastChildIndex (inclusive) to the last child and - // propagate processGroup to those children. - for (int i = lastChildIndex; i < childOperatorsArray.length; i++) { - Operator child = childOperatorsArray[i]; - child.flush(); - child.endGroup(); - for (int childTag: newChildOperatorsTag[i]) { - child.processGroup(childTag); - } - } - } - - /** * @return the name of the operator */ @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 4632f08..e383aab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -720,16 +720,16 @@ protected void updateAggregations(AggregationBuffer[] aggs, Object row, } @Override - public void startGroup() throws HiveException { + public void startGroupOp() throws HiveException { firstRowInGroup = true; - super.startGroup(); } @Override - public void endGroup() throws HiveException { + public void endGroupOp(boolean flush) throws HiveException { if (groupKeyIsNotReduceKey) { keysCurrentGroup.clear(); } + super.endGroupOp(flush); } private void processKey(Object row, @@ -1076,10 +1076,10 @@ protected void forward(Object[] keys, * Forward all aggregations to children. It is only used by DemuxOperator. * @throws HiveException */ - @Override - public void flush() throws HiveException{ + protected void flushOp() throws HiveException { + System.err.println("-- [GroupByOperator/flushOp] " + this); try { - if (hashAggregations != null) { + if (hashAggregations != null && !hashAggregations.isEmpty()) { LOG.info("Begin Hash Table flush: size = " + hashAggregations.size()); Iterator iter = hashAggregations.entrySet().iterator(); @@ -1091,17 +1091,11 @@ public void flush() throws HiveException{ iter.remove(); } hashAggregations.clear(); - } else if (aggregations != null) { + } + if (aggregations != null && currentKeys != null) { // sort-based aggregations - if (currentKeys != null) { - forward(currentKeys.getKeyArray(), aggregations); - } + forward(currentKeys.getKeyArray(), aggregations); currentKeys = null; - } else { - // The GroupByOperator is not initialized, which means there is no - // data - // (since we initialize the operators when we see the first record). - // Just do nothing here. } } catch (Exception e) { throw new HiveException(e); @@ -1143,7 +1137,7 @@ public void closeOp(boolean abort) throws HiveException { // create dummy keys - size 0 forward(new Object[0], aggregations); } else { - flush(); + flushOp(); } } catch (Exception e) { throw new HiveException(e); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java index 91b2369..a17a854 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java @@ -45,6 +45,16 @@ protected void initializeOp(Configuration hconf) throws HiveException { } @Override + protected boolean isGroupStarted() { + return true; + } + + @Override + protected boolean isGroupEnded() { + return true; + } + + @Override public void processOp(Object row, int tag) throws HiveException { throw new HiveException(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index c747099..177ccd4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -58,14 +59,24 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); initializeChildren(hconf); - if (handleSkewJoin) { - skewJoinKeyContext = new SkewJoinHandler(this); - skewJoinKeyContext.initiliaze(hconf); - skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs); + if (conf.getHandleSkewJoin()) { + skewJoinKeyContext = createSkewContext(hconf); } statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS, skewjoin_followup_jobs); } + private SkewJoinHandler createSkewContext(Configuration hconf) { + SkewJoinHandler skewJoinKeyContext = new SkewJoinHandler(this); + try { + skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs); + skewJoinKeyContext.initialize(hconf); + } catch (SerDeException e) { + LOG.error("Skewjoin will be disabled due to " + e.getMessage(), e); + return null; + } + return skewJoinKeyContext; + } + @Override public void processOp(Object row, int tag) throws HiveException { try { @@ -80,7 +91,7 @@ public void processOp(Object row, int tag) throws HiveException { List nr = getFilteredValue(alias, row); - if (handleSkewJoin) { + if (skewJoinKeyContext != null) { skewJoinKeyContext.handleSkew(tag); } @@ -91,27 +102,23 @@ public void processOp(Object row, int tag) throws HiveException { .toString()); List keyObject = (List) soi.getStructFieldData(row, sf); // Are we consuming too much memory - if (alias == numAliases - 1 && !(handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0) && - !hasLeftSemiJoin) { - if (sz == joinEmitInterval && !hasFilter(alias)) { - // The input is sorted by alias, so if we are already in the last join - // operand, - // we can emit some results now. - // Note this has to be done before adding the current row to the - // storage, - // to preserve the correctness for outer joins. - checkAndGenObject(); - storage[alias].clearRows(); - } - } else { - if (sz == nextSz) { - // Print a message if we reached at least 1000 rows for a join operand - // We won't print a message for the last join operand since the size - // will never goes to joinEmitInterval. - LOG.info("table " + alias + " has " + sz + " rows for join key " - + keyObject); - nextSz = getNextSize(nextSz); - } + if (sz == joinEmitInterval && !hasLeftSemiJoin && isLastInput(tag) && !hasFilter(tag) && + !(skewJoinKeyContext != null && skewJoinKeyContext.currBigKeyTag >= 0)) { + // The input is sorted by alias, so if we are already in the last join + // operand, + // we can emit some results now. + // Note this has to be done before adding the current row to the + // storage, + // to preserve the correctness for outer joins. + checkAndGenObject(); + storage[alias].clearRows(); + } else if (sz == nextSz) { + // Print a message if we reached at least 1000 rows for a join operand + // We won't print a message for the last join operand since the size + // will never goes to joinEmitInterval. + LOG.info("table " + alias + " has " + sz + " rows for join key " + + keyObject); + nextSz = getNextSize(nextSz); } // Add the value to the vector @@ -119,8 +126,9 @@ public void processOp(Object row, int tag) throws HiveException { StructObjectInspector inspector = (StructObjectInspector) sf.getFieldObjectInspector(); if (SerDeUtils.hasAnyNullObject(keyObject, inspector, nullsafes)) { - endGroup(); - startGroup(); + // internal flushing + endGroupOp(true); + startGroupOp(); } storage[alias].addRow(nr); } catch (Exception e) { @@ -130,6 +138,28 @@ public void processOp(Object row, int tag) throws HiveException { } @Override + public void startGroupOp() throws HiveException { + super.startGroupOp(); + if (skewJoinKeyContext != null) { + skewJoinKeyContext.newGroupStarted(); + } + } + + protected boolean isLastInput(int input) { + if (parentOperators == null || parentOperators.isEmpty()) { + return input == numAliases - 1; + } + // with MUX operator + for (int i = 0; i < parentOperators.size(); i++) { + Operator parent = parentOperators.get(i); + if (input != i && parent != null && !parent.isGroupEnded()) { + return false; + } + } + return true; + } + + @Override public OperatorType getType() { return OperatorType.JOIN; } @@ -140,7 +170,7 @@ public OperatorType getType() { */ @Override public void closeOp(boolean abort) throws HiveException { - if (handleSkewJoin) { + if (skewJoinKeyContext != null) { skewJoinKeyContext.close(abort); } super.closeOp(abort); @@ -240,20 +270,20 @@ private void mvFileToFinalPath(Path specPath, Configuration hconf, * Forward a record of join results. * * @throws HiveException + * @param flush */ @Override - public void endGroup() throws HiveException { + public void endGroupOp(boolean flush) throws HiveException { // if this is a skew key, we need to handle it in a separate map reduce job. - if (handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0) { + if (skewJoinKeyContext != null && skewJoinKeyContext.currBigKeyTag >= 0) { try { skewJoinKeyContext.endGroup(); } catch (IOException e) { LOG.error(e.getMessage(), e); throw new HiveException(e); } - return; } else { - checkAndGenObject(); + super.endGroupOp(flush); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index e3877d9..101813b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -80,13 +80,11 @@ public MapJoinOperator(AbstractMapJoinOperator mjop) { * Note: The mapjoin can be run in the reducer only on Tez. */ @Override - public void endGroup() throws HiveException { - defaultEndGroup(); + public void endGroupOp(boolean flush) throws HiveException { } @Override - public void startGroup() throws HiveException { - defaultStartGroup(); + public void startGroupOp() throws HiveException { } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java index b10a7fa..dc03947 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java @@ -36,8 +36,8 @@ /** * MuxOperator is used in the Reduce side of MapReduce jobs optimized by Correlation Optimizer. - * Correlation Optimizer will remove unnecessary ReduceSinkOperaotrs, - * and MuxOperators are used to replace those ReduceSinkOperaotrs. + * Correlation Optimizer will remove unnecessary ReduceSinkOperators, + * and MuxOperators are used to replace those ReduceSinkOperators. * Example: The original operator tree is ... * JOIN2 * / \ @@ -160,7 +160,6 @@ public Object process(Object row) throws HiveException { private transient ObjectInspector[] outputObjectInspectors; private transient int numParents; private transient boolean[] forward; - private transient boolean[] processGroupCalled; private Handler[] handlers; // Counters for debugging, we cannot use existing counters (cntr and nextCntr) @@ -177,13 +176,11 @@ protected void initializeOp(Configuration hconf) throws HiveException { } numParents = getNumParent(); forward = new boolean[numParents]; - processGroupCalled = new boolean[numParents]; outputObjectInspectors = new ObjectInspector[numParents]; handlers = new Handler[numParents]; cntrs = new long[numParents]; nextCntrs = new long[numParents]; for (int i = 0; i < numParents; i++) { - processGroupCalled[i] = false; if (conf.getParentToKeyCols().get(i) == null) { // We do not need to evaluate the input row for this parent. // So, we can just forward it to the child of this MuxOperator. @@ -269,38 +266,6 @@ public void forward(Object row, ObjectInspector rowInspector) } @Override - public void startGroup() throws HiveException{ - for (int i = 0; i < numParents; i++) { - processGroupCalled[i] = false; - } - super.startGroup(); - } - - @Override - public void endGroup() throws HiveException { - // do nothing - } - - @Override - public void processGroup(int tag) throws HiveException { - processGroupCalled[tag] = true; - boolean shouldProceed = true; - for (int i = 0; i < numParents; i++) { - if (!processGroupCalled[i]) { - shouldProceed = false; - break; - } - } - if (shouldProceed) { - Operator child = childOperatorsArray[0]; - int childTag = childOperatorsTag[0]; - child.flush(); - child.endGroup(); - child.processGroup(childTag); - } - } - - @Override protected void closeOp(boolean abort) throws HiveException { for (int i = 0; i < numParents; i++) { LOG.info(id + ", tag=" + i + ", forwarded " + cntrs[i] + " rows"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 3dc7c76..7960968 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -68,6 +68,8 @@ protected String operatorId; private transient ExecMapperContext execContext; + private transient boolean groupStarted; // always false in map side + private static AtomicInteger seqId; // It can be optimized later so that an operator operator (init/close) is performed @@ -489,58 +491,109 @@ public ObjectInspector getOutputObjInspector() { */ public abstract void processOp(Object row, int tag) throws HiveException; - protected final void defaultStartGroup() throws HiveException { - LOG.debug("Starting group"); + protected boolean isGroupStarted() { + return groupStarted; + } - if (childOperators == null) { + protected boolean areAllParentsGroupStarted() { + if (parentOperators != null) { + for (Operator parent : parentOperators) { + if (parent != null && !parent.isGroupStarted()) { + return false; + } + } + } + return true; + } + + public void startGroup() throws HiveException { + if (!areAllParentsGroupStarted() || isGroupStarted()) { return; } + if (LOG.isDebugEnabled()) { + LOG.debug("Starting group " + this); + } + groupStarted = true; + + startGroupOp(); LOG.debug("Starting group for children:"); - for (Operator op : childOperators) { - op.startGroup(); + for (Operator op : childOperators) { + op.setGroupKeyObject(groupKeyObject); + op.startGroup(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Start group Done " + this); } + } - LOG.debug("Start group Done"); + // If a operator wants to do some work at the beginning of a group + protected void startGroupOp() throws HiveException { } - protected final void defaultEndGroup() throws HiveException { - LOG.debug("Ending group"); + protected boolean isGroupEnded() { + return !groupStarted; + } - if (childOperators == null) { + protected boolean areAllParentsGroupEnded() { + if (parentOperators != null) { + for (Operator parent : parentOperators) { + if (parent != null && !parent.isGroupEnded()) { + return false; + } + } + } + return true; + } + + public void endGroup(boolean flush) throws HiveException { + if (!areAllParentsGroupEnded() || isGroupEnded()) { return; } + if (LOG.isDebugEnabled()) { + LOG.debug("Ending group " + this); + } + groupStarted = false; + endGroupOp(flush); LOG.debug("Ending group for children:"); - for (Operator op : childOperators) { - op.endGroup(); + if (childOperators != null) { + for (Operator op : childOperators) { + op.endGroup(flush); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("End group Done " + this); } - - LOG.debug("End group Done"); - } - - // If a operator wants to do some work at the beginning of a group - public void startGroup() throws HiveException { - defaultStartGroup(); } // If an operator wants to do some work at the end of a group - public void endGroup() throws HiveException { - defaultEndGroup(); - } - - // an blocking operator (e.g. GroupByOperator and JoinOperator) can - // override this method to forward its outputs - public void flush() throws HiveException { + protected void endGroupOp(boolean flush) throws HiveException { + if (flush) { + flushOp(); + } } - public void processGroup(int tag) throws HiveException { - if (childOperators == null || childOperators.isEmpty()) { + public final void flush() throws HiveException { + if (!areAllParentsGroupEnded()) { return; } - for (int i = 0; i < childOperatorsArray.length; i++) { - childOperatorsArray[i].processGroup(childOperatorsTag[i]); + if (LOG.isDebugEnabled()) { + LOG.debug("Flushing " + this); } + flushOp(); + LOG.debug("Flushing for children:"); + if (childOperators != null) { + for (Operator op : childOperators) { + op.flush(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Flush Done " + this); + } + } + + protected void flushOp() throws HiveException { } protected boolean allInitializedParentsAreClosed() { @@ -603,7 +656,7 @@ public void close(boolean abort) throws HiveException { /** * Operator specific close routine. Operators which inherents this class - * should overwrite this funtion for their specific cleanup routine. + * should overwrite this function for their specific cleanup routine. */ protected void closeOp(boolean abort) throws HiveException { } @@ -757,7 +810,7 @@ public boolean removeChildren(int depth) { } /** - * Replace one parent with another at the same position. Chilren of the new + * Replace one parent with another at the same position. Children of the new * parent are not updated * * @param parent @@ -1060,7 +1113,7 @@ public boolean supportSkewJoinOptimization() { if (parents != null) { for (Operator parent : parents) { - parentClones.add((parent.clone())); + parentClones.add(parent.clone()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java index a63466a..df66281 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.JoinDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -95,18 +94,18 @@ List dummyKey = null; String taskId; - private final CommonJoinOperator joinOp; + private final JoinOperator joinOp; private final int numAliases; private final JoinDesc conf; - public SkewJoinHandler(CommonJoinOperator joinOp) { + public SkewJoinHandler(JoinOperator joinOp) { this.joinOp = joinOp; numAliases = joinOp.numAliases; conf = joinOp.getConf(); noOuterJoin = joinOp.noOuterJoin; } - public void initiliaze(Configuration hconf) { + public void initialize(Configuration hconf) throws SerDeException { this.hconf = hconf; JoinDesc desc = joinOp.getConf(); skewKeyDefinition = desc.getSkewKeyDefinition(); @@ -136,16 +135,10 @@ public void initiliaze(Configuration hconf) { StructObjectInspector structTblKeyInpector = ObjectInspectorFactory .getStandardStructObjectInspector(keyColNames, skewTableKeyInspectors); - try { - SerDe serializer = (SerDe) ReflectionUtils.newInstance(tblDesc.get( - alias).getDeserializerClass(), null); - SerDeUtils.initializeSerDe(serializer, null, tblDesc.get(alias).getProperties(), null); - tblSerializers.put((byte) i, serializer); - } catch (SerDeException e) { - LOG.error("Skewjoin will be disabled due to " + e.getMessage(), e); - joinOp.handleSkewJoin = false; - break; - } + SerDe serializer = (SerDe) ReflectionUtils.newInstance(tblDesc.get( + alias).getDeserializerClass(), null); + SerDeUtils.initializeSerDe(serializer, null, tblDesc.get(alias).getProperties(), null); + tblSerializers.put((byte) i, serializer); boolean hasFilter = filterMap != null && filterMap[i] != null; TableDesc valTblDesc = JoinUtil.getSpillTableDesc(alias, @@ -199,18 +192,22 @@ void endGroup() throws IOException, HiveException { skewKeyInCurrentGroup = false; } + boolean newGroupStarted = false; boolean skewKeyInCurrentGroup = false; + public void newGroupStarted() { + newGroupStarted = true; + } + public void handleSkew(int tag) throws HiveException { - if (joinOp.newGroupStarted || tag != currTag) { + if (newGroupStarted || tag != currTag) { rowNumber = 0; currTag = tag; } - if (joinOp.newGroupStarted) { + if (newGroupStarted) { currBigKeyTag = -1; - joinOp.newGroupStarted = false; dummyKey = (List) joinOp.getGroupKeyObject(); skewKeyInCurrentGroup = false; @@ -220,11 +217,11 @@ public void handleSkew(int tag) throws HiveException { rc.setKeyObject(dummyKey); } } + newGroupStarted = false; } rowNumber++; - if (currBigKeyTag == -1 && (tag < numAliases - 1) - && rowNumber >= skewKeyDefinition) { + if (currBigKeyTag == -1 && rowNumber >= skewKeyDefinition && !joinOp.isLastInput(tag)) { // the first time we see a big key. If this key is not in the last // table (the last table can always be streamed), we define that we get // a skew key now. @@ -343,5 +340,4 @@ public void setSkewJoinJobCounter(LongWritable skewjoinFollowupJobs) { public void updateSkewJoinJobCounter(int tag) { this.skewjoinFollowupJobs.set(this.skewjoinFollowupJobs.get() + 1); } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 9cd8b56..7e15f6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -207,7 +207,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, if (isTraceEnabled) { LOG.trace("End Group"); } - reducer.endGroup(); + reducer.endGroup(false); } try { @@ -249,10 +249,8 @@ public void reduce(Object key, Iterator values, OutputCollector output, cntr++; if (cntr == nextCntr) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - if (isInfoEnabled) { - LOG.info("ExecReducer: processing " + cntr - + " rows: used memory = " + used_memory); - } + LOG.info("ExecReducer: processing " + cntr + + " rows: used memory = " + used_memory); nextCntr = getNextCntr(cntr); } } @@ -308,7 +306,7 @@ public void close() { if (isTraceEnabled) { LOG.trace("End Group"); } - reducer.endGroup(); + reducer.endGroup(false); } if (isInfoEnabled) { LOG.info("ExecReducer: processed " + cntr + " rows: used memory = " diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 017a72a..3bdef67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -237,7 +237,7 @@ public boolean pushRecord() throws HiveException { this.groupKey = new BytesWritable(); } else { // If a operator wants to do some work at the end of a group - reducer.endGroup(); + reducer.endGroup(false); } groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); @@ -382,7 +382,7 @@ boolean close() throws Exception { try { if (handleGroupKey && groupKey != null) { // If a operator wants to do some work at the end of a group - reducer.endGroup(); + reducer.endGroup(false); } } catch (Exception e) { if (!abort) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 6274cb6..a5446fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -32,11 +32,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; -import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; -import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -46,12 +43,9 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.DataOutputBuffer; @@ -849,12 +843,12 @@ private void changeToUnsortedStreamingMode() throws HiveException { } @Override - public void startGroup() throws HiveException { + public void startGroupOp() throws HiveException { processingMode.startGroup(); } @Override - public void endGroup() throws HiveException { + public void endGroupOp(boolean flush) throws HiveException { processingMode.endGroup(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index c52f753..686ba72 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -459,7 +459,8 @@ private boolean sameOrder(String order1, String order2) { CorrelationUtilities.getSingleChild(rsop, GroupByOperator.class); if (cGBY != null) { if (CorrelationUtilities.hasGroupingSet(rsop) || - cGBY.getConf().isGroupingSetsPresent()) { + cGBY.getConf().isGroupingSetsPresent() || + cGBY.getConf().getGroupKeyNotReductionKey()) { // Do not support grouping set right now isCorrelated = false; } @@ -536,7 +537,8 @@ private boolean sameOrder(String order1, String order2) { CorrelationUtilities.getSingleChild(op, GroupByOperator.class); if (cGBY != null) { if (CorrelationUtilities.hasGroupingSet(op) || - cGBY.getConf().isGroupingSetsPresent()) { + cGBY.getConf().isGroupingSetsPresent() || + cGBY.getConf().getGroupKeyNotReductionKey()) { // Do not support grouping set right now shouldDetect = false; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java index 43458d9..283cdb3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java @@ -20,7 +20,6 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; diff --git ql/src/test/queries/clientpositive/correlationoptimizer16.q ql/src/test/queries/clientpositive/correlationoptimizer16.q new file mode 100644 index 0000000..970ff8a --- /dev/null +++ ql/src/test/queries/clientpositive/correlationoptimizer16.q @@ -0,0 +1,19 @@ +create table TBL (a string, b string); +insert into table TBL select 'a','a' from src tablesample (1 rows); + +set hive.optimize.correlation=true; + +explain +select b, sum(cc) from ( + select b,count(1) as cc from TBL group by b + union all + select a as b,count(1) as cc from TBL group by a +) z +group by b; + +select b, sum(cc) from ( + select b,count(1) as cc from TBL group by b + union all + select a as b,count(1) as cc from TBL group by a +) z +group by b; diff --git ql/src/test/results/clientpositive/correlationoptimizer16.q.out ql/src/test/results/clientpositive/correlationoptimizer16.q.out new file mode 100644 index 0000000..f9ad658 --- /dev/null +++ ql/src/test/results/clientpositive/correlationoptimizer16.q.out @@ -0,0 +1,179 @@ +PREHOOK: query: create table TBL (a string, b string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@TBL +POSTHOOK: query: create table TBL (a string, b string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@TBL +PREHOOK: query: insert into table TBL select 'a','a' from src tablesample (1 rows) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tbl +POSTHOOK: query: insert into table TBL select 'a','a' from src tablesample (1 rows) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tbl +POSTHOOK: Lineage: tbl.a SIMPLE [] +POSTHOOK: Lineage: tbl.b SIMPLE [] +PREHOOK: query: explain +select b, sum(cc) from ( + select b,count(1) as cc from TBL group by b + union all + select a as b,count(1) as cc from TBL group by a +) z +group by b +PREHOOK: type: QUERY +POSTHOOK: query: explain +select b, sum(cc) from ( + select b,count(1) as cc from TBL group by b + union all + select a as b,count(1) as cc from TBL group by a +) z +group by b +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: tbl + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: a (type: string) + outputColumnNames: a + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(1) + keys: a (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + TableScan + alias: tbl + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: b (type: string) + outputColumnNames: b + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(1) + keys: b (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reduce Operator Tree: + Demux Operator + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Union + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Mux Operator + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(_col1) + keys: _col0 (type: string) + mode: complete + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Union + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Mux Operator + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(_col1) + keys: _col0 (type: string) + mode: complete + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select b, sum(cc) from ( + select b,count(1) as cc from TBL group by b + union all + select a as b,count(1) as cc from TBL group by a +) z +group by b +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl +#### A masked pattern was here #### +POSTHOOK: query: select b, sum(cc) from ( + select b,count(1) as cc from TBL group by b + union all + select a as b,count(1) as cc from TBL group by a +) z +group by b +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl +#### A masked pattern was here #### +a 2