diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java index 48b3139..d06be17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java @@ -285,7 +285,7 @@ public void process(Object row, int tag) throws HiveException { // if all children are done, this operator is also done if (childrenDone == childOperatorsArray.length) { - setDone(true); + setDone(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java index 86519a6..8baaa82 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java @@ -54,7 +54,7 @@ public void process(Object row, int tag) throws HiveException { forward(row, inputObjInspectors[tag]); currCount++; } else { - setDone(true); + setDone(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5ea96a..622e3dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -521,7 +521,7 @@ public void process(Writable value) throws HiveException { rowsForwarded(childrenDone, 1); } - protected final void rowsForwarded(int childrenDone, int rows) { + protected final void rowsForwarded(int childrenDone, int rows) throws HiveException { numRows += rows; if (isLogInfoEnabled) { while (numRows >= cntr) { @@ -534,7 +534,7 @@ protected final void rowsForwarded(int childrenDone, int rows) { } } if (childrenDone == currentCtxs.length) { - setDone(true); + setDone(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java index eb4dff3..e0aaa86 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java @@ -262,7 +262,7 @@ public void process(Object row, int tag) throws HiveException { // if all children are done, this operator is also done if (childrenDone == childOperatorsArray.length) { - setDone(true); + setDone(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 5856cfd..1f0ed06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -54,7 +55,7 @@ * Base operator implementation. **/ public abstract class Operator implements Serializable,Cloneable, - Node { + Node { // Bean methods @@ -67,6 +68,7 @@ private transient Configuration configuration; protected List> childOperators; protected List> parentOperators; + private transient Set> completedOperators; protected String operatorId; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; @@ -100,11 +102,15 @@ private boolean useBucketizedHiveInputFormat; + protected T conf; + protected boolean done; + // dummy operator (for not increasing seqId) private Operator(String name) { id = name; initOperatorId(); childOperators = new ArrayList>(); + completedOperators = new HashSet>(); parentOperators = new ArrayList>(); } @@ -127,17 +133,25 @@ public Operator(Reporter reporter) { this.reporter = reporter; } - public void setChildOperators( - List> childOperators) { + public void setChildOperators(List> childOperators) { + if (childOperators == null) { childOperators = new ArrayList>(); } + this.childOperators = childOperators; + completedOperators.clear(); + for (Operator o : childOperators) { + if (o.getDone()) { + completedOperators.add(o); + } + } } public Configuration getConfiguration() { return configuration; } + public List> getChildOperators() { return childOperators; } @@ -164,11 +178,12 @@ public int getNumChild() { return ret_vec; } - public void setParentOperators( - List> parentOperators) { + public void setParentOperators(List> parentOperators) { + if (parentOperators == null) { parentOperators = new ArrayList>(); } + this.parentOperators = parentOperators; } @@ -177,12 +192,9 @@ public void setParentOperators( } public int getNumParent() { - return parentOperators == null ? 0 : parentOperators.size(); + return parentOperators.size(); } - protected T conf; - protected boolean done; - public void setConf(T conf) { this.conf = conf; } @@ -196,8 +208,26 @@ public boolean getDone() { return done; } - protected final void setDone(boolean done) { - this.done = done; + protected final void setDone() throws HiveException { + if (!done) { + done = true; + for (Operator p : parentOperators) { + p.setChildDone(this); + } + } + } + + protected final void setChildDone(Operator child) throws HiveException { + if (childOperators.contains(child) && !completedOperators.contains(child)) { + completedOperators.add(child); + initializeChildOperatorArray(completedOperators); + for (Operator o: childOperators) { + if (!o.getDone()) { + return; + } + } + setDone(); + } } // non-bean fields needed during compilation @@ -306,6 +336,31 @@ protected boolean areAllParentsInitialized() { return true; } + private void initializeChildOperatorArray(Set> completedOperators) + throws HiveException { + + // initialize structure to maintain child op info. operator tree changes + // while initializing so this need to be done here instead of constructor + childOperatorsArray = new Operator[childOperators.size() - completedOperators.size()]; + + int j = 0; + for (int i = 0; i < childOperators.size(); i++) { + Operator child = childOperators.get(i); + if (!completedOperators.contains(child)) { + childOperatorsArray[j++] = child; + } + } + childOperatorsTag = new int[childOperatorsArray.length]; + for (int i = 0; i < childOperatorsArray.length; i++) { + List> parentOperators = + childOperatorsArray[i].getParentOperators(); + childOperatorsTag[i] = parentOperators.indexOf(this); + if (childOperatorsTag[i] == -1) { + throw new HiveException("Hive internal error: cannot find parent in the child operator!"); + } + } + } + /** * Initializes operators only if all parents have been initialized. Calls * operator specific initializer which then initializes child ops. @@ -319,15 +374,19 @@ protected boolean areAllParentsInitialized() { @SuppressWarnings("unchecked") public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) throws HiveException { + if (state == State.INIT) { return; } - this.configuration = hconf; + configuration = hconf; + if (!areAllParentsInitialized()) { return; } + done = false; + if (isLogInfoEnabled) { LOG.info("Initializing operator " + this); } @@ -336,38 +395,23 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) inputObjInspectors = inputOIs; } - // initialize structure to maintain child op info. operator tree changes - // while initializing so this need to be done here instead of constructor - childOperatorsArray = new Operator[childOperators.size()]; - for (int i = 0; i < childOperatorsArray.length; i++) { - childOperatorsArray[i] = childOperators.get(i); - } - childOperatorsTag = new int[childOperatorsArray.length]; - for (int i = 0; i < childOperatorsArray.length; i++) { - List> parentOperators = - childOperatorsArray[i].getParentOperators(); - childOperatorsTag[i] = parentOperators.indexOf(this); - if (childOperatorsTag[i] == -1) { - throw new HiveException("Hive internal error: cannot find parent in the child operator!"); - } - } - - if (inputObjInspectors.length == 0) { - throw new HiveException("Internal Error during operator initialization."); - } - // derived classes can set this to different object if needed outputObjInspector = inputObjInspectors[0]; + initializeChildOperatorArray(Collections.EMPTY_SET); + Collection> asyncInitOperations = initializeOp(hconf); // sanity checks - if (!rootInitializeCalled - || asyncInitOperations == null - || childOperatorsArray.length != childOperators.size()) { + if (!rootInitializeCalled || asyncInitOperations == null + || childOperatorsArray.length != childOperators.size()) { throw new AssertionError("Internal error during operator initialization"); } + if (inputObjInspectors.length == 0) { + throw new HiveException("Internal Error during operator initialization."); + } + if (isLogInfoEnabled) { LOG.info("Initialization Done " + id + " " + getName()); } @@ -448,7 +492,7 @@ protected void initializeChildren(Configuration hconf) throws HiveException { public void passExecContext(ExecMapperContext execContext) { this.setExecContext(execContext); for (int i = 0; i < childOperators.size(); i++) { - childOperators.get(i).passExecContext(execContext); + childOperators.get(i).passExecContext(execContext); } } @@ -507,17 +551,10 @@ public ObjectInspector getOutputObjInspector() { protected final void defaultStartGroup() throws HiveException { if (isLogDebugEnabled) { - LOG.debug("Starting group"); - } - - if (childOperators == null) { - return; - } - - if (isLogDebugEnabled) { LOG.debug("Starting group for children:"); } - for (Operator op : childOperators) { + + for (Operator op : childOperatorsArray) { op.startGroup(); } @@ -528,17 +565,10 @@ protected final void defaultStartGroup() throws HiveException { protected final void defaultEndGroup() throws HiveException { if (isLogDebugEnabled) { - LOG.debug("Ending group"); - } - - if (childOperators == null) { - return; - } - - if (isLogDebugEnabled) { LOG.debug("Ending group for children:"); } - for (Operator op : childOperators) { + + for (Operator op : childOperatorsArray) { op.endGroup(); } @@ -557,32 +587,27 @@ public void endGroup() throws HiveException { defaultEndGroup(); } - // an blocking operator (e.g. GroupByOperator and JoinOperator) can + // a blocking operator (e.g. GroupByOperator and JoinOperator) can // override this method to forward its outputs public void flush() throws HiveException { } public void processGroup(int tag) throws HiveException { - if (childOperators == null || childOperators.isEmpty()) { - return; - } for (int i = 0; i < childOperatorsArray.length; i++) { childOperatorsArray[i].processGroup(childOperatorsTag[i]); } } protected boolean allInitializedParentsAreClosed() { - if (parentOperators != null) { - for (Operator parent : parentOperators) { - if(parent==null){ - continue; - } - if (isLogDebugEnabled) { - LOG.debug("allInitializedParentsAreClosed? parent.state = " + parent.state); - } - if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) { - return false; - } + for (Operator parent : parentOperators) { + if (parent == null) { + continue; + } + if (isLogDebugEnabled) { + LOG.debug("allInitializedParentsAreClosed? parent.state = " + parent.state); + } + if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) { + return false; } } return true; @@ -624,14 +649,14 @@ public void close(boolean abort) throws HiveException { } for (Operator op : childOperators) { - if (isLogDebugEnabled) { - LOG.debug("Closing child = " + op); - } + if (isLogDebugEnabled) { + LOG.debug("Closing child = " + op); + } op.close(abort); } if (isLogInfoEnabled) { - LOG.info(id + " Close done"); + LOG.info(id + " Close done"); } } catch (HiveException e) { e.printStackTrace(); @@ -745,7 +770,7 @@ public void removeChildAndAdoptItsChildren( int index = parents.indexOf(child); if (index == -1) { throw new SemanticException( - "Exception when trying to remove partition predicates: fail to find parent from child"); + "Exception when trying to remove partition predicates: fail to find parent from child"); } parents.set(index, this); } @@ -785,7 +810,7 @@ public boolean removeChildren(int depth) { setChildOperators(currOp.getChildOperators()); List> parentOps = - new ArrayList>(); + new ArrayList>(); parentOps.add(this); for (Operator op : currOp.getChildOperators()) { @@ -823,24 +848,9 @@ protected long getNextCntr(long cntr) { protected void forward(Object row, ObjectInspector rowInspector) throws HiveException { - - if (getDone()) { - return; - } - - int childrenDone = 0; for (int i = 0; i < childOperatorsArray.length; i++) { Operator o = childOperatorsArray[i]; - if (o.getDone()) { - childrenDone++; - } else { - o.process(row, childOperatorsTag[i]); - } - } - - // if all children are done, this operator is also done - if (childrenDone != 0 && childrenDone == childOperatorsArray.length) { - setDone(true); + o.process(row, childOperatorsTag[i]); } } @@ -1101,7 +1111,7 @@ public boolean supportSkewJoinOptimization() { List> parents = getParentOperators(); List> parentClones = - new ArrayList>(); + new ArrayList>(); if (parents != null) { for (Operator parent : parents) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index f2eed44..5e84717 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -445,7 +445,7 @@ public void process(Object row, int tag) throws HiveException { LOG.warn("Exception in closing outThread: " + StringUtils.stringifyException(e2)); } - setDone(true); + setDone(); LOG.warn("Got broken pipe during write: ignoring exception and setting operator to done"); } else { LOG.error("Error in writing to script: " + e.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index cbf02e9..6e50c35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -88,7 +88,7 @@ public void setTableDesc(TableDesc tableDesc) { @Override public void process(Object row, int tag) throws HiveException { if (rowLimit >= 0 && currCount++ >= rowLimit) { - setDone(true); + setDone(); return; } if (conf != null && conf.isGatherStats()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index a5c1463..b51cbee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -415,9 +415,6 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket private void initializeOperators(Map fetchOpJobConfMap) throws HiveException { - for (Map.Entry> entry : work.getAliasToWork().entrySet()) { - LOG.debug("initializeOperators: " + entry.getKey() + ", children = " + entry.getValue().getChildOperators()); - } // this mapper operator is used to initialize all the operators for (Map.Entry entry : work.getAliasToFetchWork().entrySet()) { if (entry.getValue() == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java index 2f4e46b..aa1dc6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java @@ -47,7 +47,7 @@ public void process(Object row, int tag) throws HiveException { forward(row, inputObjInspectors[tag]); currCount += batch.size; } else { - setDone(true); + setDone(); } } }