diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 2551d0a..c0af9e0 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -520,6 +520,7 @@ minillaplocal.query.files=acid_globallimit.q,\ lineage3.q,\ list_bucket_dml_10.q,\ llap_partitioned.q,\ + llap_vector_nohybridgrace.q,\ load_dyn_part5.q,\ lvj_mapjoin.q,\ mapjoin_decimal.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapPreVectorizationPass.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapPreVectorizationPass.java new file mode 100644 index 0000000..aa5396d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapPreVectorizationPass.java @@ -0,0 +1,111 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.none; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Stack; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.TezWork; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * For any LLAP-related transformations which need to occur before vectorization. + */ +public class LlapPreVectorizationPass implements PhysicalPlanResolver { + protected static transient final Logger LOG = LoggerFactory.getLogger(LlapPreVectorizationPass.class); + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + HiveConf conf = pctx.getConf(); + LlapMode mode = LlapMode.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_EXECUTION_MODE)); + if (mode == none) { + LOG.info("LLAP disabled."); + return pctx; + } + + Dispatcher disp = new LlapPreVectorizationPassDispatcher(pctx); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + ogw.startWalking(topNodes, null); + + return pctx; + } + + class LlapPreVectorizationPassDispatcher implements Dispatcher { + HiveConf conf; + + LlapPreVectorizationPassDispatcher(PhysicalContext pctx) { + conf = pctx.getConf(); + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + @SuppressWarnings("unchecked") + Task currTask = (Task) nd; + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w: work.getAllWork()) { + handleWork(work, w); + } + } + return null; + } + + private void handleWork(TezWork tezWork, BaseWork work) + throws SemanticException { + Map opRules = new LinkedHashMap(); + + if (conf.getVar(HiveConf.ConfVars.LLAP_EXECUTION_MODE).equals("only") + && !conf.getBoolVar(HiveConf.ConfVars.LLAP_ENABLE_GRACE_JOIN_IN_LLAP)) { + // In LLAP only mode, grace hash join will be disabled later on by the LlapDispatcher anyway. + // Since the presence of Grace Hash Join disables some "native" vectorization optimizations, + // we will disable the grace hash join now, before vectorization is done. + opRules.put( + new RuleRegExp("Disable grace hash join if LLAP mode and not dynamic partition hash join", + MapJoinOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { + MapJoinOperator mapJoinOp = (MapJoinOperator) n; + if (mapJoinOp.getConf().isHybridHashJoin() + && !(mapJoinOp.getConf().isDynamicPartitionHashJoin())) { + mapJoinOp.getConf().setHybridHashJoin(false); + } + return new Boolean(true); + } + }); + } + + if (!opRules.isEmpty()) { + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(work.getAllRootOperators()); + ogw.startWalking(topNodes, null); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index abf531b..eaad988 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider; +import org.apache.hadoop.hive.ql.optimizer.physical.LlapPreVectorizationPass; import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; @@ -553,6 +554,12 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping cross product analysis"); } + if ("llap".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { + physicalCtx = new LlapPreVectorizationPass().resolve(physicalCtx); + } else { + LOG.debug("Skipping llap pre-vectorization pass"); + } + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && ctx.getExplainAnalyze() == null) { physicalCtx = new Vectorizer().resolve(physicalCtx); diff --git a/ql/src/test/queries/clientpositive/llap_vector_nohybridgrace.q b/ql/src/test/queries/clientpositive/llap_vector_nohybridgrace.q new file mode 100644 index 0000000..69f1819 --- /dev/null +++ b/ql/src/test/queries/clientpositive/llap_vector_nohybridgrace.q @@ -0,0 +1,32 @@ + +set hive.vectorized.execution.enabled=true; +set hive.vectorized.execution.mapjoin.native.enabled=true; +set hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled=true; +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask.size=1300000; +set hive.mapjoin.optimized.hashtable.wbsize=880000; +set hive.mapjoin.hybridgrace.memcheckfrequency=1024; + +set hive.mapjoin.hybridgrace.hashtable=false; + +explain vectorization expression +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +set hive.mapjoin.hybridgrace.hashtable=true; +set hive.llap.enable.grace.join.in.llap=false; + +-- The vectorized mapjoin className/nativeConditionsMet should be the same as the previous query +explain vectorization expression +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; diff --git a/ql/src/test/results/clientpositive/llap/llap_vector_nohybridgrace.q.out b/ql/src/test/results/clientpositive/llap/llap_vector_nohybridgrace.q.out new file mode 100644 index 0000000..526662d --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/llap_vector_nohybridgrace.q.out @@ -0,0 +1,356 @@ +PREHOOK: query: explain vectorization expression +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +PREHOOK: type: QUERY +POSTHOOK: query: explain vectorization expression +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +POSTHOOK: type: QUERY +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + projectedOutputColumns: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: FilterLongColLessLongScalar(col 2, val 2000000000) -> boolean + predicate: (cint < 2000000000) (type: boolean) + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [2] + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Map Join Vectorization: + className: VectorMapJoinInnerBigOnlyLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Fast Hash Table and No Hybrid Hash Join IS true + input vertices: + 1 Map 3 + Statistics: Num rows: 26150 Data size: 209200 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + Group By Vectorization: + aggregators: VectorUDAFCountStar(*) -> bigint + className: VectorGroupByOperator + vectorOutput: true + native: false + projectedOutputColumns: [0] + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Reduce Sink Vectorization: + className: VectorReduceSinkObjectHashOperator + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + groupByVectorOutput: true + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + Map 3 + Map Operator Tree: + TableScan + alias: cd + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + projectedOutputColumns: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: FilterLongColLessLongScalar(col 2, val 2000000000) -> boolean + predicate: (cint < 2000000000) (type: boolean) + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [2] + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + groupByVectorOutput: true + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: true + usesVectorUDFAdaptor: false + vectorized: true + Reducer 2 + Execution mode: vectorized, llap + Reduce Vectorization: + enabled: true + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true + groupByVectorOutput: true + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + Group By Vectorization: + aggregators: VectorUDAFCountMerge(col 0) -> bigint + className: VectorGroupByOperator + vectorOutput: true + native: false + projectedOutputColumns: [0] + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain vectorization expression +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +PREHOOK: type: QUERY +POSTHOOK: query: explain vectorization expression +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +POSTHOOK: type: QUERY +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + projectedOutputColumns: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: FilterLongColLessLongScalar(col 2, val 2000000000) -> boolean + predicate: (cint < 2000000000) (type: boolean) + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [2] + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Map Join Vectorization: + className: VectorMapJoinInnerBigOnlyLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Fast Hash Table and No Hybrid Hash Join IS true + input vertices: + 1 Map 3 + Statistics: Num rows: 26150 Data size: 209200 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + Group By Vectorization: + aggregators: VectorUDAFCountStar(*) -> bigint + className: VectorGroupByOperator + vectorOutput: true + native: false + projectedOutputColumns: [0] + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Reduce Sink Vectorization: + className: VectorReduceSinkObjectHashOperator + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + groupByVectorOutput: true + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + Map 3 + Map Operator Tree: + TableScan + alias: cd + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + projectedOutputColumns: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: FilterLongColLessLongScalar(col 2, val 2000000000) -> boolean + predicate: (cint < 2000000000) (type: boolean) + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [2] + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + groupByVectorOutput: true + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: true + usesVectorUDFAdaptor: false + vectorized: true + Reducer 2 + Execution mode: vectorized, llap + Reduce Vectorization: + enabled: true + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true + groupByVectorOutput: true + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + Group By Vectorization: + aggregators: VectorUDAFCountMerge(col 0) -> bigint + className: VectorGroupByOperator + vectorOutput: true + native: false + projectedOutputColumns: [0] + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +