diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 2462938..98d2e6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -113,9 +113,8 @@ private boolean useBucketizedHiveInputFormat; // Data structures specific for vectorized operators. - private int size; - private boolean selectedInUse; - private int[] selected; + private transient boolean multiChildren; + private transient int[] selected; // dummy operator (for not increasing seqId) protected Operator(String name, CompilationOpContext cContext) { @@ -129,8 +128,6 @@ protected Operator() { childOperators = new ArrayList>(); parentOperators = new ArrayList>(); abortOp = new AtomicBoolean(false); - // Initializing data structures for vectorization - selected = new int[VectorizedRowBatch.DEFAULT_SIZE]; } public Operator(CompilationOpContext cContext) { @@ -323,6 +320,9 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) // String className = this.getClass().getName(); this.done = false; + this.runTimeNumRows = 0; // initializeOp can be overridden + // Initializing data structures for vectorForward + this.selected = new int[VectorizedRowBatch.DEFAULT_SIZE]; if (state == State.INIT) { return; } @@ -345,6 +345,7 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) for (int i = 0; i < childOperatorsArray.length; i++) { childOperatorsArray[i] = childOperators.get(i); } + multiChildren = childOperatorsArray.length > 1; childOperatorsTag = new int[childOperatorsArray.length]; for (int i = 0; i < childOperatorsArray.length; i++) { List> parentOperators = @@ -487,7 +488,6 @@ public void initializeLocalWork(Configuration hconf) throws HiveException { protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; rootInitializeCalled = true; - runTimeNumRows = 0; } /** @@ -703,6 +703,12 @@ public void close(boolean abort) throws HiveException { // call the operator specific close routine closeOp(abort); + + // closeOp can be overriden + if (conf != null && conf.getRuntimeStatsTmpDir() != null) { + publishRunTimeStats(); + } + this.runTimeNumRows = 0; reporter = null; @@ -733,10 +739,6 @@ public void close(boolean abort) throws HiveException { * should overwrite this funtion for their specific cleanup routine. */ protected void closeOp(boolean abort) throws HiveException { - if (conf != null && conf.getRuntimeStatsTmpDir() != null) { - publishRunTimeStats(); - } - runTimeNumRows = 0; } private boolean jobCloseDone = false; @@ -894,26 +896,32 @@ protected void forward(Object row, ObjectInspector rowInspector) forward(row, rowInspector, false); } + protected void forward(VectorizedRowBatch vrg, ObjectInspector rowInspector) + throws HiveException { + forward(vrg, rowInspector, true); + } + protected void forward(Object row, ObjectInspector rowInspector, boolean isVectorized) throws HiveException { - if (isVectorized && getNumChild() > 1) { + if (isVectorized) { vectorForward((VectorizedRowBatch) row, rowInspector); - return; + } else { + baseForward(row, rowInspector); } - baseForward(row, rowInspector); } private void vectorForward(VectorizedRowBatch vrg, ObjectInspector rowInspector) throws HiveException { - runTimeNumRows++; + this.runTimeNumRows += vrg.count(); if (getDone()) { return; } // Data structures to store original values - size = vrg.size; - selectedInUse = vrg.selectedInUse; - if (vrg.selectedInUse) { + final int size = vrg.size; + final boolean selectedInUse = vrg.selectedInUse; + final boolean saveState = (selectedInUse && multiChildren); + if (saveState) { System.arraycopy(vrg.selected, 0, selected, 0, size); } @@ -927,7 +935,7 @@ private void vectorForward(VectorizedRowBatch vrg, ObjectInspector rowInspector) // Restore original values vrg.size = size; vrg.selectedInUse = selectedInUse; - if (vrg.selectedInUse) { + if (saveState) { System.arraycopy(selected, 0, vrg.selected, 0, size); } } @@ -941,7 +949,7 @@ private void vectorForward(VectorizedRowBatch vrg, ObjectInspector rowInspector) private void baseForward(Object row, ObjectInspector rowInspector) throws HiveException { - runTimeNumRows++; + this.runTimeNumRows++; if (getDone()) { return; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 7e5c724..049f137 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -115,18 +115,7 @@ public String getSchemaEvolutionColumnsTypes() { @Override public void process(Object row, int tag) throws HiveException { if (rowLimit >= 0) { - if (vectorized) { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - if (currCount >= rowLimit) { - setDone(true); - return; - } - if (currCount + batch.size > rowLimit) { - batch.size = rowLimit - currCount; - } - currCount += batch.size; - } else if (currCount++ >= rowLimit) { - setDone(true); + if (checkSetDone(row, tag)) { return; } } @@ -135,6 +124,28 @@ public void process(Object row, int tag) throws HiveException { } forward(row, inputObjInspectors[tag], vectorized); } + + private boolean checkSetDone(Object row, int tag) { + if (row instanceof VectorizedRowBatch) { + // We need to check with 'instanceof' instead of just checking + // vectorized because the row can be a VectorizedRowBatch when + // FetchOptimizer kicks in even if the operator pipeline is not + // vectorized + VectorizedRowBatch batch = (VectorizedRowBatch) row; + if (currCount >= rowLimit) { + setDone(true); + return true; + } + if (currCount + batch.size > rowLimit) { + batch.size = rowLimit - currCount; + } + currCount += batch.size; + } else if (currCount++ >= rowLimit) { + setDone(true); + return true; + } + return false; + } // Change the table partition for collecting stats @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index 3a6d8c1..8dd7cfe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -405,6 +405,7 @@ protected void closeOp(boolean abort) throws HiveException { if (LOG.isInfoEnabled()) { LOG.info(toString() + ": records written - " + numRows); } + this.runTimeNumRows = numRows; recordCounter.set(numRows); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 5571826..a64a498 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -88,8 +88,7 @@ private void initialize(HiveConf hiveConf) { // Vectorization should be the last optimization, because it doesn't modify the plan // or any operators. It makes a very low level transformation to the expressions to // run in the vectorized mode. - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && pctx.getContext().getExplainAnalyze() == null) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { resolvers.add(new Vectorizer()); } if (!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 6e274d1..f9a6386 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -669,8 +669,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping llap pre-vectorization pass"); } - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && ctx.getExplainAnalyze() == null) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { physicalCtx = new Vectorizer().resolve(physicalCtx); } else { LOG.debug("Skipping vectorization"); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 5220281..08e7f43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -588,8 +588,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping cross product analysis"); } - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && ctx.getExplainAnalyze() == null) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { (new Vectorizer()).resolve(physicalCtx); } else { LOG.debug("Skipping vectorization"); diff --git ql/src/test/queries/clientpositive/explainanalyze_3.q ql/src/test/queries/clientpositive/explainanalyze_3.q index d7773fc..8011124 100644 --- ql/src/test/queries/clientpositive/explainanalyze_3.q +++ ql/src/test/queries/clientpositive/explainanalyze_3.q @@ -12,6 +12,7 @@ set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.exec.dynamic.partition.mode=nonstrict; set hive.vectorized.execution.enabled=true; +set hive.llap.io.enabled=false; explain analyze select key, value FROM srcpart LATERAL VIEW explode(array(1,2,3)) myTable AS myCol; diff --git ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out index 05fcbc3..e7a5630 100644 --- ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out +++ ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out @@ -347,10 +347,10 @@ Stage-3 Stage-2 Dependency Collection{} Stage-1 - Map 1 - File Output Operator [FS_2] + Map 1 vectorized + File Output Operator [FS_4] table:{"name:":"default.src_autho_test"} - Select Operator [SEL_1] (rows=500/500 width=178) + Select Operator [SEL_3] (rows=500/500 width=178) Output:["_col0","_col1"] TableScan [TS_0] (rows=500/500 width=178) default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] @@ -607,15 +607,15 @@ Stage-0 Fetch Operator limit:5 Stage-1 - Reducer 2 - File Output Operator [FS_5] - Limit [LIM_4] (rows=5/5 width=178) + Reducer 2 vectorized + File Output Operator [FS_10] + Limit [LIM_9] (rows=5/5 width=178) Number of rows:5 - Select Operator [SEL_3] (rows=500/5 width=178) + Select Operator [SEL_8] (rows=500/5 width=178) Output:["_col0","_col1"] - <-Map 1 [SIMPLE_EDGE] - SHUFFLE [RS_2] - Select Operator [SEL_1] (rows=500/500 width=178) + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_7] + Select Operator [SEL_6] (rows=500/500 width=178) Output:["_col0","_col1"] TableScan [TS_0] (rows=500/500 width=178) default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] @@ -669,19 +669,19 @@ Stage-3 File Output Operator [FS_5] Group By Operator [GBY_3] (rows=1/1 width=2760) Output:["_col0","_col1","_col2","_col3","_col4"],aggregations:["compute_stats(VALUE._col0, 'hll')","compute_stats(VALUE._col2, 'hll')","compute_stats(VALUE._col3, 'hll')","compute_stats(VALUE._col4, 'hll')","compute_stats(VALUE._col5, 'hll')"] - <-Map 1 [CUSTOM_SIMPLE_EDGE] - File Output Operator [FS_3] + <-Map 1 [CUSTOM_SIMPLE_EDGE] vectorized + File Output Operator [FS_8] table:{"name:":"default.orc_merge5"} - Select Operator [SEL_2] (rows=1/3 width=352) + Select Operator [SEL_7] (rows=1/3 width=352) Output:["_col0","_col1","_col2","_col3","_col4"] - Filter Operator [FIL_4] (rows=1/3 width=352) + Filter Operator [FIL_6] (rows=1/3 width=352) predicate:(userid <= 13) TableScan [TS_0] (rows=1/15000 width=352) default@orc_merge5,orc_merge5,Tbl:COMPLETE,Col:NONE,Output:["userid","string1","subtype","decimal1","ts"] - PARTITION_ONLY_SHUFFLE [RS_2] - Select Operator [SEL_1] (rows=1/3 width=352) + PARTITION_ONLY_SHUFFLE [RS_7] + Select Operator [SEL_6] (rows=1/3 width=352) Output:["userid","string1","subtype","decimal1","ts"] - Please refer to the previous Select Operator [SEL_2] + Please refer to the previous Select Operator [SEL_7] Stage-4(CONDITIONAL) File Merge Please refer to the previous Stage-8(CONDITIONAL CHILD TASKS: Stage-5, Stage-4, Stage-6) @@ -842,24 +842,24 @@ Stage-0 Fetch Operator limit:-1 Stage-1 - Map 2 - File Output Operator [FS_10] - Select Operator [SEL_9] (rows=391/480 width=186) + Map 2 vectorized + File Output Operator [FS_34] + Select Operator [SEL_33] (rows=391/480 width=186) Output:["_col0","_col1","_col2"] - Map Join Operator [MAPJOIN_25] (rows=391/480 width=186) - BucketMapJoin:true,Conds:RS_6._col0=SEL_5._col0(Inner),HybridGraceHashJoin:true,Output:["_col0","_col1","_col3"] - <-Map 1 [CUSTOM_EDGE] - MULTICAST [RS_6] + Map Join Operator [MAPJOIN_32] (rows=391/480 width=186) + BucketMapJoin:true,Conds:RS_29._col0=SEL_31._col0(Inner),HybridGraceHashJoin:true,Output:["_col0","_col1","_col3"] + <-Map 1 [CUSTOM_EDGE] vectorized + MULTICAST [RS_29] PartitionCols:_col0 - Select Operator [SEL_2] (rows=242/242 width=95) + Select Operator [SEL_28] (rows=242/242 width=95) Output:["_col0","_col1"] - Filter Operator [FIL_13] (rows=242/242 width=95) + Filter Operator [FIL_27] (rows=242/242 width=95) predicate:key is not null TableScan [TS_0] (rows=242/242 width=95) default@tab,a,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] - <-Select Operator [SEL_5] (rows=500/500 width=95) + <-Select Operator [SEL_31] (rows=500/500 width=95) Output:["_col0","_col1"] - Filter Operator [FIL_14] (rows=500/500 width=95) + Filter Operator [FIL_30] (rows=500/500 width=95) predicate:key is not null TableScan [TS_3] (rows=500/500 width=95) default@tab_part,b,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]