diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index b88a258..884575f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -113,9 +113,7 @@ private boolean useBucketizedHiveInputFormat; // Data structures specific for vectorized operators. - private int size; - private boolean selectedInUse; - private int[] selected; + private transient int[] selected; // dummy operator (for not increasing seqId) protected Operator(String name, CompilationOpContext cContext) { @@ -129,8 +127,6 @@ protected Operator() { childOperators = new ArrayList>(); parentOperators = new ArrayList>(); abortOp = new AtomicBoolean(false); - // Initializing data structures for vectorization - selected = new int[VectorizedRowBatch.DEFAULT_SIZE]; } public Operator(CompilationOpContext cContext) { @@ -323,6 +319,9 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) // String className = this.getClass().getName(); this.done = false; + this.runTimeNumRows = 0; // initializeOp can be overridden + // Initializing data structures for vectorForward + this.selected = new int[VectorizedRowBatch.DEFAULT_SIZE]; if (state == State.INIT) { return; } @@ -487,7 +486,6 @@ public void initializeLocalWork(Configuration hconf) throws HiveException { protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; rootInitializeCalled = true; - runTimeNumRows = 0; } /** @@ -703,6 +701,12 @@ public void close(boolean abort) throws HiveException { // call the operator specific close routine closeOp(abort); + + // closeOp can be overriden + if (conf != null && conf.getRuntimeStatsTmpDir() != null) { + publishRunTimeStats(); + } + this.runTimeNumRows = 0; reporter = null; @@ -733,10 +737,6 @@ public void close(boolean abort) throws HiveException { * should overwrite this funtion for their specific cleanup routine. */ protected void closeOp(boolean abort) throws HiveException { - if (conf != null && conf.getRuntimeStatsTmpDir() != null) { - publishRunTimeStats(); - } - runTimeNumRows = 0; } private boolean jobCloseDone = false; @@ -894,26 +894,33 @@ protected void forward(Object row, ObjectInspector rowInspector) forward(row, rowInspector, false); } + protected void forward(VectorizedRowBatch vrg, ObjectInspector rowInspector) + throws HiveException { + forward(vrg, rowInspector, true); + } + protected void forward(Object row, ObjectInspector rowInspector, boolean isVectorized) throws HiveException { - if (isVectorized && getNumChild() > 1) { + if (isVectorized) { vectorForward((VectorizedRowBatch) row, rowInspector); - return; + } else { + baseForward(row, rowInspector); } - baseForward(row, rowInspector); } private void vectorForward(VectorizedRowBatch vrg, ObjectInspector rowInspector) throws HiveException { - runTimeNumRows++; + this.runTimeNumRows += vrg.count(); if (getDone()) { return; } + final boolean saveState = (vrg.selectedInUse && getNumChild() > 1); + // Data structures to store original values - size = vrg.size; - selectedInUse = vrg.selectedInUse; - if (vrg.selectedInUse) { + final int size = vrg.size; + final boolean selectedInUse = vrg.selectedInUse; + if (saveState) { System.arraycopy(vrg.selected, 0, selected, 0, size); } @@ -924,11 +931,13 @@ private void vectorForward(VectorizedRowBatch vrg, ObjectInspector rowInspector) childrenDone++; } else { o.process(vrg, childOperatorsTag[i]); - // Restore original values - vrg.size = size; - vrg.selectedInUse = selectedInUse; - if (vrg.selectedInUse) { - System.arraycopy(selected, 0, vrg.selected, 0, size); + if (saveState) { + // Restore original values + vrg.size = size; + vrg.selectedInUse = selectedInUse; + if (selectedInUse) { + System.arraycopy(selected, 0, vrg.selected, 0, size); + } } } } @@ -941,7 +950,7 @@ private void vectorForward(VectorizedRowBatch vrg, ObjectInspector rowInspector) private void baseForward(Object row, ObjectInspector rowInspector) throws HiveException { - runTimeNumRows++; + this.runTimeNumRows++; if (getDone()) { return; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index c76026b..e89afad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -115,22 +115,7 @@ public String getSchemaEvolutionColumnsTypes() { @Override public void process(Object row, int tag) throws HiveException { if (rowLimit >= 0) { - if (row instanceof VectorizedRowBatch) { - // We need to check with 'instanceof' instead of just checking - // vectorized because the row can be a VectorizedRowBatch when - // FetchOptimizer kicks in even if the operator pipeline is not - // vectorized - VectorizedRowBatch batch = (VectorizedRowBatch) row; - if (currCount >= rowLimit) { - setDone(true); - return; - } - if (currCount + batch.size > rowLimit) { - batch.size = rowLimit - currCount; - } - currCount += batch.size; - } else if (currCount++ >= rowLimit) { - setDone(true); + if (checkSetDone(row, tag)) { return; } } @@ -139,6 +124,28 @@ public void process(Object row, int tag) throws HiveException { } forward(row, inputObjInspectors[tag], vectorized); } + + private boolean checkSetDone(Object row, int tag) { + if (row instanceof VectorizedRowBatch) { + // We need to check with 'instanceof' instead of just checking + // vectorized because the row can be a VectorizedRowBatch when + // FetchOptimizer kicks in even if the operator pipeline is not + // vectorized + VectorizedRowBatch batch = (VectorizedRowBatch) row; + if (currCount >= rowLimit) { + setDone(true); + return true; + } + if (currCount + batch.size > rowLimit) { + batch.size = rowLimit - currCount; + } + currCount += batch.size; + } else if (currCount++ >= rowLimit) { + setDone(true); + return true; + } + return false; + } // Change the table partition for collecting stats @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index b059b01..89043b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -405,6 +405,7 @@ protected void closeOp(boolean abort) throws HiveException { if (LOG.isInfoEnabled()) { LOG.info(toString() + ": records written - " + numRows); } + this.runTimeNumRows = numRows; recordCounter.set(numRows); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index c040406..d43837d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -88,8 +88,7 @@ private void initialize(HiveConf hiveConf) { // Vectorization should be the last optimization, because it doesn't modify the plan // or any operators. It makes a very low level transformation to the expressions to // run in the vectorized mode. - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && pctx.getContext().getExplainAnalyze() == null) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { resolvers.add(new Vectorizer()); } if (!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index be33f38..31a04e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -669,8 +669,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping llap pre-vectorization pass"); } - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && ctx.getExplainAnalyze() == null) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { physicalCtx = new Vectorizer().resolve(physicalCtx); } else { LOG.debug("Skipping vectorization"); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 965044d..698531f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -586,8 +586,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping cross product analysis"); } - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && ctx.getExplainAnalyze() == null) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { (new Vectorizer()).resolve(physicalCtx); } else { LOG.debug("Skipping vectorization");