diff --git pom.xml pom.xml index 70518d1..b0c6e54 100644 --- pom.xml +++ pom.xml @@ -141,7 +141,7 @@ 14.0.1 2.4.4 1.3.166 - 2.8.0 + 2.7.3 ${basedir}/${hive.path.to.root}/testutils/hadoop 1.3 1.1.1 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 3656842..c81b28d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -318,6 +319,7 @@ protected boolean areAllParentsInitialized() { public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) throws HiveException { this.done = false; + this.runTimeNumRows = 0; if (state == State.INIT) { return; } @@ -483,7 +485,6 @@ public void initializeLocalWork(Configuration hconf) throws HiveException { protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; rootInitializeCalled = true; - runTimeNumRows = 0; } /** @@ -693,6 +694,11 @@ public void close(boolean abort) throws HiveException { // call the operator specific close routine closeOp(abort); + if (conf != null && conf.getRuntimeStatsTmpDir() != null) { + publishRunTimeStats(); + } + runTimeNumRows = 0; + reporter = null; try { @@ -722,10 +728,6 @@ public void close(boolean abort) throws HiveException { * should overwrite this funtion for their specific cleanup routine. */ protected void closeOp(boolean abort) throws HiveException { - if (conf != null && conf.getRuntimeStatsTmpDir() != null) { - publishRunTimeStats(); - } - runTimeNumRows = 0; } private boolean jobCloseDone = false; @@ -901,6 +903,11 @@ protected void forward(Object row, ObjectInspector rowInspector) } } + protected void forward(VectorizedRowBatch vrb, ObjectInspector rowInspector) throws HiveException { + this.runTimeNumRows += vrb.size - 1; + forward((Object) vrb, rowInspector); + } + public void resetStats() { for (String e : statsMap.keySet()) { statsMap.get(e).set(0L); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 17f2efb..56b1f4a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -109,25 +109,38 @@ public String getSchemaEvolutionColumnsTypes() { @Override public void process(Object row, int tag) throws HiveException { if (rowLimit >= 0) { - if (row instanceof VectorizedRowBatch) { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - if (currCount >= rowLimit) { - setDone(true); - return; - } - if (currCount + batch.size > rowLimit) { - batch.size = rowLimit - currCount; - } - currCount += batch.size; - } else if (currCount++ >= rowLimit) { - setDone(true); + if (checkSetDone(row, tag)) { return; } } + if (conf != null && conf.isGatherStats()) { gatherStats(row); } - forward(row, inputObjInspectors[tag]); + + if (row instanceof VectorizedRowBatch) { + forward((VectorizedRowBatch) row, inputObjInspectors[tag]); + } else { + forward(row, inputObjInspectors[tag]); + } + } + + private boolean checkSetDone(Object row, int tag) { + if (row instanceof VectorizedRowBatch) { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + if (currCount >= rowLimit) { + setDone(true); + return true; + } + if (currCount + batch.size > rowLimit) { + batch.size = rowLimit - currCount; + } + currCount += batch.size; + } else if (currCount++ >= rowLimit) { + setDone(true); + return true; + } + return false; } // Change the table partition for collecting stats diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 9377563..97b375d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -88,8 +88,7 @@ private void initialize(HiveConf hiveConf) { // Vectorization should be the last optimization, because it doesn't modify the plan // or any operators. It makes a very low level transformation to the expressions to // run in the vectorized mode. - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && pctx.getContext().getExplainAnalyze() == null) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { resolvers.add(new Vectorizer()); } if (!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 7e156f6..d79a0a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -633,8 +633,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping llap pre-vectorization pass"); } - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && ctx.getExplainAnalyze() == null) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { physicalCtx = new Vectorizer().resolve(physicalCtx); } else { LOG.debug("Skipping vectorization"); diff --git ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out index e5c8d6c..3c209ee 100644 --- ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out +++ ql/src/test/results/clientpositive/tez/explainanalyze_3.q.out @@ -349,10 +349,10 @@ Stage-3 Stage-2 Dependency Collection{} Stage-1 - Map 1 - File Output Operator [FS_2] + Map 1 vectorized + File Output Operator [FS_4] table:{"name:":"default.src_autho_test"} - Select Operator [SEL_1] (rows=500/500 width=178) + Select Operator [SEL_3] (rows=500/500 width=178) Output:["_col0","_col1"] TableScan [TS_0] (rows=500/500 width=178) default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] @@ -609,15 +609,15 @@ Stage-0 Fetch Operator limit:5 Stage-1 - Reducer 2 - File Output Operator [FS_5] - Limit [LIM_4] (rows=5/5 width=178) + Reducer 2 vectorized + File Output Operator [FS_10] + Limit [LIM_9] (rows=5/3 width=178) Number of rows:5 - Select Operator [SEL_3] (rows=500/5 width=178) + Select Operator [SEL_8] (rows=500/5 width=178) Output:["_col0","_col1"] - <-Map 1 [SIMPLE_EDGE] - SHUFFLE [RS_2] - Select Operator [SEL_1] (rows=500/500 width=178) + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_7] + Select Operator [SEL_6] (rows=500/500 width=178) Output:["_col0","_col1"] TableScan [TS_0] (rows=500/500 width=178) default@src,src,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"] @@ -664,12 +664,12 @@ Stage-3 Stage-8(CONDITIONAL CHILD TASKS: Stage-5, Stage-4, Stage-6) Conditional Operator Stage-1 - Map 1 - File Output Operator [FS_3] + Map 1 vectorized + File Output Operator [FS_10] table:{"name:":"default.orc_merge5"} - Select Operator [SEL_2] (rows=306/3 width=268) + Select Operator [SEL_9] (rows=306/3 width=268) Output:["_col0","_col1","_col2","_col3","_col4"] - Filter Operator [FIL_4] (rows=306/3 width=268) + Filter Operator [FIL_8] (rows=306/3 width=268) predicate:(userid <= 13) TableScan [TS_0] (rows=919/15000 width=268) default@orc_merge5,orc_merge5,Tbl:COMPLETE,Col:NONE,Output:["userid","string1","subtype","decimal1","ts"] @@ -833,24 +833,24 @@ Stage-0 Fetch Operator limit:-1 Stage-1 - Map 2 - File Output Operator [FS_10] - Select Operator [SEL_9] (rows=550/480 width=18) + Map 2 vectorized + File Output Operator [FS_34] + Select Operator [SEL_33] (rows=550/480 width=18) Output:["_col0","_col1","_col2"] - Map Join Operator [MAPJOIN_25] (rows=550/480 width=18) - Conds:RS_6._col0=SEL_5._col0(Inner),HybridGraceHashJoin:true,Output:["_col0","_col1","_col3"] - <-Map 1 [BROADCAST_EDGE] - BROADCAST [RS_6] + Map Join Operator [MAPJOIN_32] (rows=550/480 width=18) + Conds:RS_29._col0=SEL_31._col0(Inner),HybridGraceHashJoin:true,Output:["_col0","_col1","_col3"] + <-Map 1 [BROADCAST_EDGE] vectorized + BROADCAST [RS_29] PartitionCols:_col0 - Select Operator [SEL_2] (rows=242/242 width=18) + Select Operator [SEL_28] (rows=242/242 width=18) Output:["_col0","_col1"] - Filter Operator [FIL_13] (rows=242/242 width=18) + Filter Operator [FIL_27] (rows=242/242 width=18) predicate:key is not null TableScan [TS_0] (rows=242/242 width=18) default@tab,a,Tbl:COMPLETE,Col:NONE,Output:["key","value"] - <-Select Operator [SEL_5] (rows=500/500 width=18) + <-Select Operator [SEL_31] (rows=500/500 width=18) Output:["_col0","_col1"] - Filter Operator [FIL_14] (rows=500/500 width=18) + Filter Operator [FIL_30] (rows=500/500 width=18) predicate:key is not null TableScan [TS_3] (rows=500/500 width=18) default@tab_part,b,Tbl:COMPLETE,Col:NONE,Output:["key","value"]