commit c171d3f8c7626cc683e25806fc0ed7a5b16db120 Author: Janaki Lahorani Date: Mon Jul 24 14:25:52 2017 -0700 HIVE-16998: Add parameter that limits DPP to map joins For any join other than map join, to support dynamic partition pruning (DPP), scan of dimension tables (or the table whose data is used to determine the list of partitions to be scaned from fact table) is executed as a separate spark job. This can result in additional overhead, that can sometimes out weigh the benefits of DPP optimization. So, a paramter hive.spark.dynamic.partition.pruning.map.join.only is added to enable DPP only for map joins. Change-Id: I06c611e0690855b61a63afe140edbe2f122771cb diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index df45f2cc32fa078e773bf1b4ed2acb260176da94..b9b397f2486fae87f736005eb1f292d0e4413c65 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3406,6 +3406,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE( "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L, "Maximum total data size in dynamic pruning."), + SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY( + "hive.spark.dynamic.partition.pruning.map.join.only", false, + "Turn on dynamic partition pruning only for map joins.\n" + + "If hive.spark.dynamic.partition.pruning is set to false, this parameter value is ignored."), SPARK_USE_GROUPBY_SHUFFLE( "hive.spark.use.groupby.shuffle", true, "Spark groupByKey transformation has better performance but uses unbounded memory." + diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index f66e19be3ed3c9619585a961bb1607a352c17df7..9e038b9f07d007df66428cb0a6a2b2722e930680 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1392,6 +1392,7 @@ spark.query.files=add_part_multiple.q, \ spark.only.query.files=spark_combine_equivalent_work.q,\ spark_dynamic_partition_pruning.q,\ spark_dynamic_partition_pruning_2.q,\ + spark_dynamic_partition_pruning_mapjoin_only.q,\ dynamic_rdd_cache.q, \ spark_multi_insert_parallel_orderby.q,\ spark_explainuser_1.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java similarity index 74% rename from ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java rename to ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java index 26a108830eeed02f0b35750526e18731634a2566..241a6ccf9c6874fbd20b0b41688762313bdf70a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruning.java @@ -20,7 +20,10 @@ import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -38,9 +41,10 @@ * * Cloned from RemoveDynamicPruningBySize */ -public class SparkRemoveDynamicPruningBySize implements NodeProcessor { +public class SparkRemoveDynamicPruning implements NodeProcessor { - static final private Logger LOG = LoggerFactory.getLogger(SparkRemoveDynamicPruningBySize.class.getName()); + static final private Logger LOG = + LoggerFactory.getLogger(SparkRemoveDynamicPruningBySize.class.getName()); @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, @@ -48,18 +52,30 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, throws SemanticException { OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext; + boolean remove = false; SparkPartitionPruningSinkOperator op = (SparkPartitionPruningSinkOperator) nd; SparkPartitionPruningSinkDesc desc = op.getConf(); - if (desc.getStatistics().getDataSize() > context.getConf() + if (context.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAP_JOIN_ONLY) && + !op.isWithMapjoin()) { + LOG.info("Dynamic partition not enabled based on: " + desc.getTableScan().getName() + + ". This is not part of a map join."); + remove = true; + } + else if (desc.getStatistics().getDataSize() > context.getConf() .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) { - OperatorUtils.removeBranch(op); - // at this point we've found the fork in the op pipeline that has the pruning as a child plan. - LOG.info("Disabling dynamic pruning for: " + LOG.info("Disabling dynamic pruning based on: " + desc.getTableScan().getName() + ". Expected data size is too big: " + desc.getStatistics().getDataSize()); + remove = true; + } + + if (remove) { + // at this point we've found the fork in the op pipeline that has the pruning as a child plan. + OperatorUtils.removeBranch(op); } + return false; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 682b98757aa1f89648c3ade497d2280e0ad979de..828e3a13e060e1c60498f7802ab34e9ffbfb3f08 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -63,7 +63,7 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate; import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization; -import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruningBySize; +import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruning; import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; @@ -298,7 +298,7 @@ private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws Seman opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning By Size", SparkPartitionPruningSinkOperator.getOperatorName() + "%"), - new SparkRemoveDynamicPruningBySize()); + new SparkRemoveDynamicPruning()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java index 94230fdd2958519f985da61cb5a3dee8a64a8c8e..3a0b74bc45b2c419cf5160757e8cb4b938ae4280 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java @@ -27,7 +27,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; @@ -95,6 +97,49 @@ public void closeOp(boolean abort) throws HiveException { } } + /* This function determines whether sparkpruningsink is with mapjoin. This will be called + to check whether the tree should be split for dpp. For mapjoin it won't be. Also called + to determine whether dpp should be enabled for anything other than mapjoin. + */ + public boolean isWithMapjoin() { + Operator branchingOp = this; + + /* Locate the op where the branch starts. This is guaranteed to succeed because the branch + * always follows this pattern when the operator tree is given as input to this function + * TS1 TS2 + * | | + * FIL FIL + * | | + * | --------- + * RS | | | + * | RS SEL SEL + * | / | | + * | / GBY GBY + * JOIN | | + * | SPARKPRUNINGSINK + * | + * SPARKPRUNINGSINK + */ + + while (branchingOp != null) { + if (branchingOp.getNumChild() > 1) { + break; + } else { + branchingOp = branchingOp.getParentOperators().get(0); + } + } + + // Check if this is a MapJoin. If so, do not split. + for (Operator childOp : branchingOp.getChildOperators()) { + if (childOp instanceof ReduceSinkOperator && + childOp.getChildOperators().get(0) instanceof MapJoinOperator) { + return true; + } + } + + return false; + } + private void flushToFile() throws IOException { // write an intermediate file to the specified path // the format of the path is: tmpPath/targetWorkId/sourceWorkId/randInt diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java index 1348d8bdb9991f529643d42aadb01f4d04b01b9b..ec971906855d81ebcdeaff4a74e289716676a37f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java @@ -89,30 +89,17 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } - // Locate the op where the branch starts - // This is guaranteed to succeed since the branch always follow the pattern - // as shown in the first picture above. - Operator branchingOp = pruningSinkOp; - while (branchingOp != null) { - if (branchingOp.getNumChild() > 1) { - break; - } else { - branchingOp = branchingOp.getParentOperators().get(0); - } - } - - // Check if this is a MapJoin. If so, do not split. - for (Operator childOp : branchingOp.getChildOperators()) { - if (childOp instanceof ReduceSinkOperator && - childOp.getChildOperators().get(0) instanceof MapJoinOperator) { - context.pruningSinkSet.add(pruningSinkOp); - return null; - } + // If pruning sink operator is with map join, then pruning sink need not be split to a + // separate tree. Add the pruning sink operator to context and return + if (pruningSinkOp.isWithMapjoin()) { + context.pruningSinkSet.add(pruningSinkOp); + return null; } List> roots = new LinkedList>(); collectRoots(roots, pruningSinkOp); + Operator branchingOp = pruningSinkOp; List> savedChildOps = branchingOp.getChildOperators(); List> firstNodesOfPruningBranch = findFirstNodesOfPruningBranch(branchingOp); branchingOp.setChildOperators(Utilities.makeList(firstNodesOfPruningBranch.toArray(new diff --git ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q new file mode 100644 index 0000000000000000000000000000000000000000..a15b0bb3b5d334080d8829f3945fa943f00ac711 --- /dev/null +++ ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_mapjoin_only.q @@ -0,0 +1,49 @@ +set hive.optimize.ppd=true; +set hive.ppd.remove.duplicatefilters=true; +set hive.optimize.metadataonly=false; +set hive.optimize.index.filter=true; +set hive.strict.checks.cartesian.product=false; + +-- srcpart_date is the small table that will use map join. srcpart2 is the big table. +-- both srcpart_date and srcpart2 will be joined with srcpart +create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds; +create table srcpart2 as select * from srcpart; + +-- enable map join and set the size to be small so that only join with srcpart_date gets to be a +-- map join +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask.size=100; + +-- checking without partition pruning enabled +-- expectation: 3 spark jobs +EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11; + +-- checking without partition pruning enabled +-- both join parts of srcpart_date and srcpart2 scans will result in partition pruning sink +-- scan with srcpart2 will get split resulting in additional spark jobs +-- expectation: 4 spark jobs +set hive.spark.dynamic.partition.pruning=true; +EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11; + +-- Restrict dpp to be enabled only for map joins +-- expectation: 3 spark jobs +set hive.spark.dynamic.partition.pruning.map.join.only=true; +EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11; + +drop table srcpart_date; +drop table srcpart2; diff --git ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out new file mode 100644 index 0000000000000000000000000000000000000000..23f93893e94098a7a2d25095f4bc123af6f974c6 --- /dev/null +++ ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_mapjoin_only.q.out @@ -0,0 +1,531 @@ +PREHOOK: query: create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: database:default +PREHOOK: Output: default@srcpart_date +POSTHOOK: query: create table srcpart_date as select ds as ds, ds as ds2 from srcpart group by ds +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcpart_date +POSTHOOK: Lineage: srcpart_date.ds SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ] +POSTHOOK: Lineage: srcpart_date.ds2 SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ] +PREHOOK: query: create table srcpart2 as select * from srcpart +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: database:default +PREHOOK: Output: default@srcpart2 +POSTHOOK: query: create table srcpart2 as select * from srcpart +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +POSTHOOK: Output: database:default +POSTHOOK: Output: default@srcpart2 +POSTHOOK: Lineage: srcpart2.ds SIMPLE [(srcpart)srcpart.FieldSchema(name:ds, type:string, comment:null), ] +POSTHOOK: Lineage: srcpart2.hr SIMPLE [(srcpart)srcpart.FieldSchema(name:hr, type:string, comment:null), ] +POSTHOOK: Lineage: srcpart2.key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: srcpart2.value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean) + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart + filterExpr: ((11.0 = 11.0) and ds is not null) (type: boolean) + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + input vertices: + 1 Map 3 + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col3 (type: string) + sort order: + + Map-reduce partition columns: _col3 (type: string) + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string) + Local Work: + Map Reduce Local Work + Map 4 + Map Operator Tree: + TableScan + alias: srcpart2 + filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean) + Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(hr) = 11.0) (type: boolean) + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col3 (type: string) + sort order: + + Map-reduce partition columns: _col3 (type: string) + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col3 (type: string) + 1 _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9 + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-3 depends on stages: Stage-2 + Stage-1 depends on stages: Stage-3 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark + Edges: + Reducer 6 <- Map 5 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 5 + Map Operator Tree: + TableScan + alias: srcpart2 + filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean) + Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(hr) = 11.0) (type: boolean) + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col3 (type: string) + sort order: + + Map-reduce partition columns: _col3 (type: string) + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string) + Select Operator + expressions: _col3 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + partition key expr: hr + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + target column name: hr + target work: Map 1 + Reducer 6 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9 + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean) + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + partition key expr: ds + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + target column name: ds + target work: Map 1 + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + input vertices: + 1 Map 3 + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col3 (type: string) + sort order: + + Map-reduce partition columns: _col3 (type: string) + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string) + Local Work: + Map Reduce Local Work + Map 4 + Map Operator Tree: + TableScan + alias: srcpart2 + filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean) + Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(hr) = 11.0) (type: boolean) + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col3 (type: string) + sort order: + + Map-reduce partition columns: _col3 (type: string) + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string) + Select Operator + expressions: _col3 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + partition key expr: hr + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + target column name: hr + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col3 (type: string) + 1 _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9 + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN select * + from srcpart + join srcpart_date on (srcpart.ds = srcpart_date.ds) + join srcpart2 on (srcpart.hr = srcpart2.hr) + where srcpart_date.ds2 = '2008-04-08' + and srcpart2.hr = 11 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 3 + Map Operator Tree: + TableScan + alias: srcpart_date + filterExpr: ((ds2 = '2008-04-08') and ds is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 42 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((ds2 = '2008-04-08') and ds is not null) (type: boolean) + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + partition key expr: ds + Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE + target column name: ds + target work: Map 1 + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col2 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + input vertices: + 1 Map 3 + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col3 (type: string) + sort order: + + Map-reduce partition columns: _col3 (type: string) + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col4 (type: string) + Local Work: + Map Reduce Local Work + Map 4 + Map Operator Tree: + TableScan + alias: srcpart2 + filterExpr: (UDFToDouble(hr) = 11.0) (type: boolean) + Statistics: Num rows: 2000 Data size: 49248 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(hr) = 11.0) (type: boolean) + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col3 (type: string) + sort order: + + Map-reduce partition columns: _col3 (type: string) + Statistics: Num rows: 1000 Data size: 24624 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col3 (type: string) + 1 _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6, _col7, _col8, _col9 + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), '2008-04-08' (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9 + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1210 Data size: 12854 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: drop table srcpart_date +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@srcpart_date +PREHOOK: Output: default@srcpart_date +POSTHOOK: query: drop table srcpart_date +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@srcpart_date +POSTHOOK: Output: default@srcpart_date +PREHOOK: query: drop table srcpart2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@srcpart2 +PREHOOK: Output: default@srcpart2 +POSTHOOK: query: drop table srcpart2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@srcpart2 +POSTHOOK: Output: default@srcpart2