diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index d35e3ba..4c24ab4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -54,9 +54,11 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -795,9 +797,12 @@ private JSONObject outputPlan(Object work, PrintStream out, if (jsonOut != null && jsonOut.length() > 0) { ((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put("OperatorId:", operator.getOperatorId()); - if (!this.work.isUserLevelExplain() && this.work.isFormatted() - && operator instanceof ReduceSinkOperator) { - List outputOperators = ((ReduceSinkOperator) operator).getConf().getOutputOperators(); + if (!this.work.isUserLevelExplain() + && this.work.isFormatted() + && (operator instanceof ReduceSinkOperator + || operator instanceof VectorReduceSinkOperator || operator instanceof VectorReduceSinkCommonOperator)) { + List outputOperators = ((ReduceSinkDesc) operator.getConf()) + .getOutputOperators(); if (outputOperators != null) { ((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put(OUTPUT_OPERATORS, Arrays.toString(outputOperators.toArray())); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index af1fa66..afe1484 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; @@ -143,13 +144,17 @@ public static Operator getVectorOperator( Class> opClass, CompilationOpContext cContext, T conf, - VectorizationContext vContext) throws HiveException { + VectorizationContext vContext, Operator originalOp) throws HiveException { try { VectorDesc vectorDesc = ((AbstractOperatorDesc) conf).getVectorDesc(); vectorDesc.setVectorOp(opClass); - Operator op = (Operator) opClass.getDeclaredConstructor( - CompilationOpContext.class, VectorizationContext.class, OperatorDesc.class) - .newInstance(cContext, vContext, conf); + Operator op = (Operator) opClass.getDeclaredConstructor(CompilationOpContext.class, + VectorizationContext.class, OperatorDesc.class).newInstance(cContext, vContext, conf); + op.setOperatorId(originalOp.getOperatorId()); + if (op instanceof VectorReduceSinkOperator || op instanceof VectorReduceSinkCommonOperator) { + ((ReduceSinkDesc) op.getConf()).setOutputOperators(((ReduceSinkDesc) originalOp.getConf()) + .getOutputOperators()); + } return op; } catch (Exception e) { e.printStackTrace(); @@ -158,11 +163,12 @@ } public static Operator getVectorOperator( - CompilationOpContext cContext, T conf, VectorizationContext vContext) throws HiveException { + CompilationOpContext cContext, T conf, VectorizationContext vContext, + Operator originalOp) throws HiveException { Class descClass = (Class) conf.getClass(); - Class opClass = vectorOpvec.get(descClass); + Class> opClass = vectorOpvec.get(descClass); if (opClass != null) { - return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, vContext); + return getVectorOperator(opClass, cContext, conf, vContext, originalOp); } throw new HiveException("No vector operator for descriptor class " + descClass.getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 1572384..8e689fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -2504,7 +2504,7 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { vectorDesc.setVectorMapJoinInfo(vectorMapJoinInfo); vectorOp = OperatorFactory.getVectorOperator( - opClass, op.getCompilationOpContext(), op.getConf(), vContext); + opClass, op.getCompilationOpContext(), op.getConf(), vContext, op); LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName()); return vectorOp; @@ -2969,7 +2969,7 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi Operator vectorOp = null; try { vectorOp = OperatorFactory.getVectorOperator( - opClass, op.getCompilationOpContext(), op.getConf(), vContext); + opClass, op.getCompilationOpContext(), op.getConf(), vContext, op); } catch (Exception e) { LOG.info("Vectorizer vectorizeOperator reduce sink class exception " + opClass.getSimpleName() + " exception " + e); @@ -3247,7 +3247,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { vContext.getVectorExpression(predicateExpr, VectorExpressionDescriptor.Mode.FILTER); vectorFilterDesc.setPredicateExpression(vectorPredicateExpr); return OperatorFactory.getVectorOperator( - filterOp.getCompilationOpContext(), filterDesc, vContext); + filterOp.getCompilationOpContext(), filterDesc, vContext, filterOp); } /* @@ -3275,7 +3275,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { vectorGroupByDesc.setAggregators(vecAggregators); vectorGroupByDesc.setProjectedOutputColumns(projectedOutputColumns); return OperatorFactory.getVectorOperator( - groupByOp.getCompilationOpContext(), groupByDesc, vContext); + groupByOp.getCompilationOpContext(), groupByDesc, vContext, groupByOp); } public static Operator vectorizeSelectOperator( @@ -3305,7 +3305,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { vectorSelectDesc.setSelectExpressions(vectorSelectExprs); vectorSelectDesc.setProjectedOutputColumns(projectedOutputColumns); return OperatorFactory.getVectorOperator( - selectOp.getCompilationOpContext(), selectDesc, vContext); + selectOp.getCompilationOpContext(), selectDesc, vContext, selectOp); } public Operator vectorizeOperator(Operator op, @@ -3341,7 +3341,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { } vectorOp = OperatorFactory.getVectorOperator( - opClass, op.getCompilationOpContext(), op.getConf(), vContext); + opClass, op.getCompilationOpContext(), op.getConf(), vContext, op); isNative = false; } else { @@ -3367,7 +3367,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { VectorSMBJoinDesc vectorSMBJoinDesc = new VectorSMBJoinDesc(); smbJoinSinkDesc.setVectorDesc(vectorSMBJoinDesc); vectorOp = OperatorFactory.getVectorOperator( - op.getCompilationOpContext(), smbJoinSinkDesc, vContext); + op.getCompilationOpContext(), smbJoinSinkDesc, vContext, op); isNative = false; } } @@ -3382,7 +3382,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { if (!specialize) { vectorOp = OperatorFactory.getVectorOperator( - op.getCompilationOpContext(), op.getConf(), vContext); + op.getCompilationOpContext(), op.getConf(), vContext, op); isNative = false; } else { @@ -3458,7 +3458,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { VectorFileSinkDesc vectorFileSinkDesc = new VectorFileSinkDesc(); fileSinkDesc.setVectorDesc(vectorFileSinkDesc); vectorOp = OperatorFactory.getVectorOperator( - op.getCompilationOpContext(), fileSinkDesc, vContext); + op.getCompilationOpContext(), fileSinkDesc, vContext, op); isNative = false; } break; @@ -3468,7 +3468,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { VectorLimitDesc vectorLimitDesc = new VectorLimitDesc(); limitDesc.setVectorDesc(vectorLimitDesc); vectorOp = OperatorFactory.getVectorOperator( - op.getCompilationOpContext(), limitDesc, vContext); + op.getCompilationOpContext(), limitDesc, vContext, op); isNative = true; } break; @@ -3478,7 +3478,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { VectorAppMasterEventDesc vectorEventDesc = new VectorAppMasterEventDesc(); eventDesc.setVectorDesc(vectorEventDesc); vectorOp = OperatorFactory.getVectorOperator( - op.getCompilationOpContext(), eventDesc, vContext); + op.getCompilationOpContext(), eventDesc, vContext, op); isNative = true; } break; @@ -3488,7 +3488,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { VectorSparkHashTableSinkDesc vectorSparkHashTableSinkDesc = new VectorSparkHashTableSinkDesc(); sparkHashTableSinkDesc.setVectorDesc(vectorSparkHashTableSinkDesc); vectorOp = OperatorFactory.getVectorOperator( - op.getCompilationOpContext(), sparkHashTableSinkDesc, vContext); + op.getCompilationOpContext(), sparkHashTableSinkDesc, vContext, op); isNative = true; } break; @@ -3498,7 +3498,7 @@ private boolean usesVectorUDFAdaptor(VectorExpression[] vecExprs) { VectorSparkPartitionPruningSinkDesc vectorSparkPartitionPruningSinkDesc = new VectorSparkPartitionPruningSinkDesc(); sparkPartitionPruningSinkDesc.setVectorDesc(vectorSparkPartitionPruningSinkDesc); vectorOp = OperatorFactory.getVectorOperator( - op.getCompilationOpContext(), sparkPartitionPruningSinkDesc, vContext); + op.getCompilationOpContext(), sparkPartitionPruningSinkDesc, vContext, op); isNative = true; } break;