diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index e064f34..380b603 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -156,6 +158,7 @@ public void setChildOperators( public Configuration getConfiguration() { return configuration; } + public List> getChildOperators() { return childOperators; } @@ -168,18 +171,9 @@ public int getNumChild() { * Implements the getChildren function for the Node Interface. */ @Override - public ArrayList getChildren() { - - if (getChildOperators() == null) { - return null; - } - - ArrayList ret_vec = new ArrayList(); - for (Operator op : getChildOperators()) { - ret_vec.add(op); - } - - return ret_vec; + public List getChildren() { + List> childOps = getChildOperators(); + return (childOps == null) ? null : new ArrayList<>(childOps); } public void setParentOperators( @@ -244,7 +238,6 @@ public RowSchema getSchema() { // for output rows of this operator protected transient ObjectInspector outputObjInspector; - /** * This function is not named getId(), to make sure java serialization does * NOT serialize it. Some TestParse tests will fail if we serialize this @@ -299,11 +292,7 @@ public void setAlias(String alias) { */ protected boolean areAllParentsInitialized() { for (Operator parent : parentOperators) { - if (parent == null) { - //return true; - continue; - } - if (parent.state != State.INIT) { + if (parent != null && parent.state != State.INIT) { return false; } } @@ -324,8 +313,6 @@ protected boolean areAllParentsInitialized() { public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) throws HiveException { - // String className = this.getClass().getName(); - this.done = false; this.runTimeNumRows = 0; // initializeOp can be overridden // Initializing data structures for vectorForward @@ -339,9 +326,7 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) return; } - if (LOG.isInfoEnabled()) { - LOG.info("Initializing operator " + this); - } + LOG.info("Initializing Operator: {}", this); if (inputOIs != null) { inputObjInspectors = inputOIs; @@ -378,22 +363,20 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) || childOperatorsArray.length != childOperators.size()) { throw new AssertionError("Internal error during operator initialization"); } - if (LOG.isDebugEnabled()) { - LOG.debug("Initialization Done " + id + " " + getName()); - } + + LOG.debug("Initialization Done: {}", this); initializeChildren(hconf); isInitOk = true; } finally { - // TODO: ugly hack because Java doesn't have dtors and Tez input hangs on shutdown. + // TODO: ugly hack because Java does not have dtors and Tez input + // hangs on shutdown. if (!isInitOk) { cancelAsyncInitOps(); } } - if (LOG.isDebugEnabled()) { - LOG.debug("Initialization Done " + id + " " + getName() + " done is reset."); - } + LOG.debug("Initialization Done - Reset: {}", this); // let's wait on the async ops before continuing completeInitialization(asyncInitOperations); @@ -453,7 +436,6 @@ private void completeInitialization(Collection> fs) throws HiveExcepti } } } - } } @@ -465,7 +447,6 @@ private void completeInitialization(Collection> fs) throws HiveExcepti throw new HiveException("Async Initialization failed. abortRequested=" + abortOp.get(), asyncEx); } - completeInitializationOp(os); } @@ -481,9 +462,8 @@ protected void completeInitializationOp(Object[] os) throws HiveException { } public void initializeLocalWork(Configuration hconf) throws HiveException { - if (childOperators != null) { - for (int i =0; i childOp = this.childOperators.get(i); + if (CollectionUtils.isNotEmpty(childOperators)) { + for (Operator childOp : childOperators) { childOp.initializeLocalWork(hconf); } } @@ -499,7 +479,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { public String getCounterName(Counter counter, Configuration hconf) { String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); - if (context != null && !context.isEmpty()) { + if (StringUtils.isNotEmpty(context)) { context = "_" + context.replace(" ", "_"); } return counter + context; @@ -511,15 +491,14 @@ public String getCounterName(Counter counter, Configuration hconf) { */ protected void initializeChildren(Configuration hconf) throws HiveException { state = State.INIT; - if (LOG.isDebugEnabled()) { - LOG.debug("Operator " + id + " " + getName() + " initialized"); - } - if (childOperators == null || childOperators.isEmpty()) { + LOG.debug("Operator Initialized: {}", this); + + if (CollectionUtils.isEmpty(childOperators)) { return; } - if (LOG.isDebugEnabled()) { - LOG.debug("Initializing children of " + id + " " + getName()); - } + + LOG.debug("Initializing Children: {}", this); + for (int i = 0; i < childOperatorsArray.length; i++) { childOperatorsArray[i].initialize(hconf, outputObjInspector, childOperatorsTag[i]); if (reporter != null) { @@ -529,7 +508,7 @@ protected void initializeChildren(Configuration hconf) throws HiveException { } public void abort() { - LOG.info("Received abort in operator: {}", getName()); + LOG.info("Received Abort in Operator: {}", this); abortOp.set(true); } @@ -538,8 +517,8 @@ public void abort() { */ public void passExecContext(ExecMapperContext execContext) { this.setExecContext(execContext); - for (int i = 0; i < childOperators.size(); i++) { - childOperators.get(i).passExecContext(execContext); + for (Operator childOp : childOperators) { + childOp.passExecContext(execContext); } } @@ -556,14 +535,12 @@ public void passExecContext(ExecMapperContext execContext) { */ protected void initialize(Configuration hconf, ObjectInspector inputOI, int parentId) throws HiveException { - if (LOG.isDebugEnabled()) { - LOG.debug("Initializing child " + id + " " + getName()); - } - // Double the size of the array if needed + LOG.debug("Initializing Child : {}", this); if (parentId >= inputObjInspectors.length) { - int newLength = inputObjInspectors.length * 2; + // Determine the next power of 2 larger than the requested index + int newLength = 2; while (parentId >= newLength) { - newLength *= 2; + newLength <<= 1; } inputObjInspectors = Arrays.copyOf(inputObjInspectors, newLength); } @@ -597,45 +574,33 @@ public ObjectInspector getOutputObjInspector() { public abstract void process(Object row, int tag) throws HiveException; protected final void defaultStartGroup() throws HiveException { - if (LOG.isDebugEnabled()) { - LOG.debug("Starting group"); - } + LOG.debug("Starting group"); - if (childOperators == null) { + if (CollectionUtils.isEmpty(childOperators)) { + LOG.trace("No children operators; start group done"); return; } - if (LOG.isDebugEnabled()) { - LOG.debug("Starting group for children:"); - } + LOG.trace("Starting group for children"); for (Operator op : childOperators) { op.startGroup(); } - - if (LOG.isDebugEnabled()) { - LOG.debug("Start group Done"); - } + LOG.trace("Start group done"); } protected final void defaultEndGroup() throws HiveException { - if (LOG.isDebugEnabled()) { - LOG.debug("Ending group"); - } + LOG.debug("Ending group"); - if (childOperators == null) { + if (CollectionUtils.isEmpty(childOperators)) { + LOG.trace("No children operators; end group done"); return; } - if (LOG.isDebugEnabled()) { - LOG.debug("Ending group for children:"); - } + LOG.trace("Ending group for children"); for (Operator op : childOperators) { op.endGroup(); } - - if (LOG.isDebugEnabled()) { - LOG.debug("End group Done"); - } + LOG.trace("End group done"); } // If a operator wants to do some work at the beginning of a group @@ -663,33 +628,29 @@ public void flush() throws HiveException { // Recursive flush to flush all the tree operators public void flushRecursive() throws HiveException { flush(); - if (childOperators == null) { - return; - } - - for (Operator child : childOperators) { - child.flushRecursive(); + if (CollectionUtils.isNotEmpty(childOperators)) { + for (Operator child : childOperators) { + child.flushRecursive(); + } } } public void processGroup(int tag) throws HiveException { - if (childOperators == null || childOperators.isEmpty()) { - return; - } - for (int i = 0; i < childOperatorsArray.length; i++) { - childOperatorsArray[i].processGroup(childOperatorsTag[i]); + if (CollectionUtils.isNotEmpty(childOperators)) { + for (int i = 0; i < childOperatorsArray.length; i++) { + childOperatorsArray[i].processGroup(childOperatorsTag[i]); + } } } protected boolean allInitializedParentsAreClosed() { if (parentOperators != null) { for (Operator parent : parentOperators) { - if(parent==null){ + if (parent == null) { continue; } - if (LOG.isDebugEnabled()) { - LOG.debug("allInitializedParentsAreClosed? parent.state = " + parent.state); - } + LOG.debug("allInitializedParentsAreClosed? parent.state = {}", + parent.state); if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) { return false; } @@ -702,9 +663,7 @@ protected boolean allInitializedParentsAreClosed() { // since it is called by its parents' main thread, so no // more than 1 thread should call this close() function. public void close(boolean abort) throws HiveException { - if (LOG.isDebugEnabled()) { - LOG.debug("close called for operator " + this); - } + LOG.debug("Close Called for Operator: {}", this); if (state == State.CLOSE) { return; @@ -712,22 +671,17 @@ public void close(boolean abort) throws HiveException { // check if all parents are finished if (!allInitializedParentsAreClosed()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not all parent operators are closed. Not closing."); - } + LOG.debug("Not all parent operators are closed. Not closing."); return; } // set state as CLOSE as long as all parents are closed // state == CLOSE doesn't mean all children are also in state CLOSE state = State.CLOSE; - if (LOG.isDebugEnabled()) { - LOG.info("Closing operator " + this); - } + LOG.info("Closing Operator: {}", this); abort |= abortOp.get(); - // call the operator specific close routine closeOp(abort); @@ -750,17 +704,13 @@ public void close(boolean abort) throws HiveException { } for (Operator op : childOperators) { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing child = " + op); - } + LOG.debug("Closing Child: {} ", op); op.close(abort); } - if (LOG.isDebugEnabled()) { - LOG.debug(id + " Close done"); - } + LOG.debug("Close Done: {}", this); } catch (HiveException e) { - LOG.warn("Caught exception while closing operator: " + e.getMessage(), e); + LOG.warn("Caught exception while closing operator", e); throw e; } } @@ -798,8 +748,8 @@ public void jobClose(Configuration conf, boolean success) jobCloseOp(conf, success); jobCloseDone = true; - if (childOperators != null) { - for (Operator op : childOperators) { + if (CollectionUtils.isNotEmpty(this.childOperators)) { + for (Operator op : this.childOperators) { op.jobClose(conf, success); } } @@ -992,13 +942,12 @@ protected void vectorForward(VectorizedRowBatch batch) } public void reset(){ - this.state=State.INIT; + this.state = State.INIT; if (childOperators != null) { for (Operator o : childOperators) { o.reset(); } } - } /** @@ -1042,14 +991,14 @@ static public String getOperatorName() { * @return null if the operator doesn't change columns */ public Map getColumnExprMap() { - if(this.getConf() == null) { + if (this.getConf() == null) { return null; } return this.getConf().getColumnExprMap(); } public void setColumnExprMap(Map colExprMap) { - if(this.getConf() != null) { + if (this.getConf() != null) { this.getConf().setColumnExprMap(colExprMap); } } @@ -1163,7 +1112,7 @@ public void setMarker(String marker) { } public void initOperatorId() { - this.operatorId = getName() + "_" + this.id; + this.operatorId = getName() + '_' + this.id; } /* @@ -1212,9 +1161,8 @@ public void setExecContext(ExecMapperContext execContext) { // for each input file public void cleanUpInputFileChanged() throws HiveException { this.cleanUpInputFileChangedOp(); - if(this.childOperators != null) { - for (int i = 0; i op = this.childOperators.get(i); + if (CollectionUtils.isNotEmpty(this.childOperators)) { + for (Operator op : this.childOperators) { op.cleanUpInputFileChanged(); } } @@ -1258,12 +1206,12 @@ public boolean supportSkewJoinOptimization() { T descClone = (T)conf.clone(); // also clone the colExprMap by default // we need a deep copy - ArrayList colInfos = new ArrayList<>(); - colInfos.addAll(getSchema().getSignature()); + ArrayList colInfos = + new ArrayList<>(getSchema().getSignature()); Map map = null; - if (getColumnExprMap() != null) { - map = new HashMap<>(); - map.putAll(getColumnExprMap()); + Map colExprMap = getColumnExprMap(); + if (colExprMap != null) { + map = new HashMap<>(colExprMap); } Operator ret = OperatorFactory.getAndMakeChild( cContext, descClone, new RowSchema(colInfos), map, parentClones); @@ -1403,7 +1351,7 @@ public boolean acceptLimitPushdown() { @Override public String toString() { - return getName() + "[" + getIdentifier() + "]"; + return getName() + '[' + getIdentifier() + ']'; } public static String toString(Collection top) { @@ -1446,40 +1394,28 @@ static boolean toString(StringBuilder builder, Set visited, Operator } public Statistics getStatistics() { - if (conf != null) { - return conf.getStatistics(); - } - - return null; + return (conf == null) ? null : conf.getStatistics(); } public OpTraits getOpTraits() { - if (conf != null) { - return conf.getTraits(); - } - - return null; + return (conf == null) ? null : conf.getTraits(); } public void setOpTraits(OpTraits metaInfo) { - if (LOG.isInfoEnabled()) { - LOG.debug("Setting traits (" + metaInfo + ") on " + this); - } + LOG.debug("Setting traits ({}) on: {}", metaInfo, this); if (conf != null) { conf.setTraits(metaInfo); } else { - LOG.warn("Cannot set traits when there's no descriptor: " + this); + LOG.warn("Cannot set traits when there is no descriptor: {}", this); } } public void setStatistics(Statistics stats) { - if (LOG.isInfoEnabled()) { - LOG.debug("Setting stats (" + stats + ") on " + this); - } + LOG.debug("Setting stats ({}) on: {}", stats, this); if (conf != null) { conf.setStatistics(stats); } else { - LOG.warn("Cannot set stats when there's no descriptor: " + this); + LOG.warn("Cannot set stats when there is no descriptor: {}", this); } }