diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1af59ba..8e25537 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2847,6 +2847,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal true, "Allows hive server 2 to send progress bar update information. This is currently available" + " only if the execution engine is tez."), + SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true, "Updates spark job execution progress in-place in the terminal."), TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, @@ -3193,8 +3194,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, + "Whether to use operator stats to determine reducer parallelism for Hive on Spark. " + + "If this is false, Hive will use source table stats to determine reducer " + + "parallelism for all first level reduce tasks, and the maximum reducer parallelism " + + "from all parents for all the rest (second level and onward) reducer tasks."), SPARK_USE_FILE_SIZE_FOR_MAPJOIN("hive.spark.use.file.size.for.mapjoin", false, - "If this is set to true, mapjoin optimization in Hive/Spark will use source file sizes associated" + "If this is set to true, map-join optimization in Hive/Spark will use source file sizes associated " + "with TableScan operator on the root of operator tree, instead of using operator statistics."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 7c54275..4442d63 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1482,7 +1482,8 @@ spark.only.query.files=spark_combine_equivalent_work.q,\ spark_dynamic_partition_pruning.q,\ spark_dynamic_partition_pruning_2.q,\ spark_vectorized_dynamic_partition_pruning.q,\ - spark_use_file_size_for_mapjoin.q + spark_use_file_size_for_mapjoin.q,\ + spark_use_op_stats.q miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ bucket4.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index 7a5b71f..86814a1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.optimizer.spark; import java.util.List; +import java.util.HashSet; +import java.util.Set; import java.util.Stack; import org.slf4j.Logger; @@ -29,7 +31,9 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; @@ -57,6 +61,12 @@ // Spark memory per task, and total number of cores private ObjectPair sparkMemoryAndCores; + private final boolean useOpStats; + + public SetSparkReducerParallelism(HiveConf conf) { + sparkMemoryAndCores = null; + useOpStats = conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_OP_STATS); + } @Override public Object process(Node nd, Stack stack, @@ -67,16 +77,28 @@ public Object process(Node nd, Stack stack, ReduceSinkOperator sink = (ReduceSinkOperator) nd; ReduceSinkDesc desc = sink.getConf(); + Set parentSinks = null; int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS); int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); + if (!useOpStats) { + parentSinks = OperatorUtils.findOperatorsUpstream(sink, ReduceSinkOperator.class); + parentSinks.remove(sink); + if (!context.getVisitedReduceSinks().containsAll(parentSinks)) { + // We haven't processed all the parent sinks, and we need + // them to be done in order to compute the parallelism for this sink. + // In this case, skip. We should visit this again from another path. + LOG.debug("Skipping sink " + sink + " for now as we haven't seen all its parents."); + return false; + } + } + if (context.getVisitedReduceSinks().contains(sink)) { // skip walking the children LOG.debug("Already processed reduce sink: " + sink.getName()); return true; } - context.getVisitedReduceSinks().add(sink); if (needSetParallelism(sink, context.getConf())) { @@ -96,19 +118,52 @@ public Object process(Node nd, Stack stack, return false; } } + long numberOfBytes = 0; - // we need to add up all the estimates from the siblings of this reduce sink - for (Operator sibling - : sink.getChildOperators().get(0).getParentOperators()) { - if (sibling.getStatistics() != null) { - numberOfBytes += sibling.getStatistics().getDataSize(); - if (LOG.isDebugEnabled()) { - LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics()); + if (useOpStats) { + // we need to add up all the estimates from the siblings of this reduce sink + for (Operator sibling + : sink.getChildOperators().get(0).getParentOperators()) { + if (sibling.getStatistics() != null) { + numberOfBytes += sibling.getStatistics().getDataSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics()); + } + } else { + LOG.warn("No stats available from: " + sibling); } - } else { - LOG.warn("No stats available from: " + sibling); } + } else if (parentSinks.isEmpty()) { + // Not using OP stats and this is the first sink in the path, meaning that + // we should use TS stats to infer parallelism + for (Operator sibling + : sink.getChildOperators().get(0).getParentOperators()) { + Set sources = + OperatorUtils.findOperatorsUpstream(sibling, TableScanOperator.class); + for (TableScanOperator source : sources) { + if (source.getStatistics() != null) { + numberOfBytes += source.getStatistics().getDataSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("Table source " + source + " has stats: " + source.getStatistics()); + } + } else { + LOG.warn("No stats available from table source: " + source); + } + } + } + LOG.debug("Gathered stats for sink " + sink + ". Total size is " + + numberOfBytes + " bytes."); + } else { + // Use the maximum parallelism from all parent reduce sinks + int numberOfReducers = 0; + for (ReduceSinkOperator parent : parentSinks) { + numberOfReducers = Math.max(numberOfReducers, parent.getConf().getNumReducers()); + } + desc.setNumReducers(numberOfReducers); + LOG.debug("Set parallelism for sink " + sink + " to " + numberOfReducers + + " based on its parents"); + return false; } // Divide it by 2 so that we can have more reducers @@ -134,7 +189,7 @@ public Object process(Node nd, Stack stack, desc.setNumReducers(numReducers); } } else { - LOG.info("Number of reducers determined to be: " + desc.getNumReducers()); + LOG.info("Number of reducers for sink " + sink + " was already determined to be: " + desc.getNumReducers()); } return false; @@ -165,6 +220,9 @@ private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveC } private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws SemanticException { + if (sparkMemoryAndCores != null) { + return; + } if (context.getConf().getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, false)) { // If dynamic allocation is enabled, numbers for memory and cores are meaningless. So, we don't // try to get it. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 36bde30..d0a82af 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -102,21 +102,21 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root reduceWork.setReducer(root); reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork)); - // All parents should be reduce sinks. We pick the one we just walked - // to choose the number of reducers. In the join/union case they will - // all be -1. In sort/order case where it matters there will be only - // one parent. - Preconditions.checkArgument(context.parentOfRoot instanceof ReduceSinkOperator, - "AssertionError: expected context.parentOfRoot to be an instance of ReduceSinkOperator, but was " - + context.parentOfRoot.getClass().getName()); - ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; - - reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + // Pick the maximum # reducers across all parents as the # of reduce tasks. + int maxExecutors = -1; + for (Operator parentOfRoot : root.getParentOperators()) { + Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator, + "AssertionError: expected parentOfRoot to be an " + + "instance of ReduceSinkOperator, but was " + + parentOfRoot.getClass().getName()); + ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot; + maxExecutors = Math.max(maxExecutors, reduceSink.getConf().getNumReducers()); + } + reduceWork.setNumReduceTasks(maxExecutors); + ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; setupReduceSink(context, reduceWork, reduceSink); - sparkWork.add(reduceWork); - SparkEdgeProperty edgeProp = getEdgeProperty(reduceSink, reduceWork); sparkWork.connect(context.preceedingWork, reduceWork, edgeProp); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index c4b1640..682b987 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TypeRule; @@ -117,6 +118,9 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, // Annotation OP tree with statistics runStatsAnnotation(procCtx); + // Set reducer parallelism + runSetReducerParallelism(procCtx); + // Run Join releated optimizations runJoinOptimizations(procCtx); @@ -266,12 +270,27 @@ private void runDynamicPartitionPruning(OptimizeSparkProcContext procCtx) } } - private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException { + private void runSetReducerParallelism(OptimizeSparkProcContext procCtx) throws SemanticException { ParseContext pCtx = procCtx.getParseContext(); Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("Set parallelism - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), - new SetSparkReducerParallelism()); + new SetSparkReducerParallelism(pCtx.getConf())); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + GraphWalker ogw = new PreOrderWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pCtx.getTopOps().values()); + ogw.startWalking(topNodes, null); + } + + private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException { + ParseContext pCtx = procCtx.getParseContext(); + Map opRules = new LinkedHashMap(); opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx)); diff --git ql/src/test/queries/clientpositive/spark_use_op_stats.q ql/src/test/queries/clientpositive/spark_use_op_stats.q new file mode 100644 index 0000000..b559bc0 --- /dev/null +++ ql/src/test/queries/clientpositive/spark_use_op_stats.q @@ -0,0 +1,41 @@ +set hive.mapred.mode=nonstrict; +set hive.spark.use.op.stats=false; +set hive.auto.convert.join=false; +set hive.exec.reducers.bytes.per.reducer=500; + +EXPLAIN +SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97; + +SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97; + +CREATE TEMPORARY TABLE tmp AS +SELECT * FROM src WHERE key > 50 AND key < 200; + +EXPLAIN +WITH a AS ( + SELECT src1.key, src2.value + FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key) + WHERE src1.key > 100 +), +b AS ( + SELECT src1.key, src2.value + FROM src src1 JOIN src src2 ON (src1.key = src2.key) + WHERE src1.key > 150 +) +SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key; + +WITH a AS ( + SELECT src1.key, src2.value + FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key) + WHERE src1.key > 100 +), +b AS ( + SELECT src1.key, src2.value + FROM src src1 JOIN src src2 ON (src1.key = src2.key) + WHERE src1.key > 150 +) +SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key; diff --git ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out new file mode 100644 index 0000000..76f9936 --- /dev/null +++ ql/src/test/results/clientpositive/spark/spark_use_op_stats.q.out @@ -0,0 +1,331 @@ +PREHOOK: query: EXPLAIN +SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 43), Map 3 (PARTITION-LEVEL SORT, 43) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 97.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map 3 + Map Operator Tree: + TableScan + alias: src2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) = 97.0) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col2 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT src1.key, src2.value +FROM src src1 JOIN src src2 ON (src1.key = src2.key) +WHERE src1.key = 97 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +97 val_97 +97 val_97 +97 val_97 +97 val_97 +PREHOOK: query: CREATE TEMPORARY TABLE tmp AS +SELECT * FROM src WHERE key > 50 AND key < 200 +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@tmp +POSTHOOK: query: CREATE TEMPORARY TABLE tmp AS +SELECT * FROM src WHERE key > 50 AND key < 200 +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tmp +PREHOOK: query: EXPLAIN +WITH a AS ( + SELECT src1.key, src2.value + FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key) + WHERE src1.key > 100 +), +b AS ( + SELECT src1.key, src2.value + FROM src src1 JOIN src src2 ON (src1.key = src2.key) + WHERE src1.key > 150 +) +SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +WITH a AS ( + SELECT src1.key, src2.value + FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key) + WHERE src1.key > 100 +), +b AS ( + SELECT src1.key, src2.value + FROM src src1 JOIN src src2 ON (src1.key = src2.key) + WHERE src1.key > 150 +) +SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 13), Map 5 (PARTITION-LEVEL SORT, 13) + Reducer 3 <- Reducer 2 (PARTITION-LEVEL SORT, 43), Reducer 7 (PARTITION-LEVEL SORT, 43) + Reducer 4 <- Reducer 3 (GROUP, 1) + Reducer 7 <- Map 6 (PARTITION-LEVEL SORT, 43), Map 8 (PARTITION-LEVEL SORT, 43) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 148 Data size: 1542 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((UDFToDouble(key) > 100.0) and (UDFToDouble(key) > 150.0)) (type: boolean) + Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE + Map 5 + Map Operator Tree: + TableScan + alias: src2 + Statistics: Num rows: 148 Data size: 1542 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((UDFToDouble(key) > 100.0) and (UDFToDouble(key) > 150.0)) (type: boolean) + Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 16 Data size: 166 Basic stats: COMPLETE Column stats: NONE + Map 6 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((UDFToDouble(key) > 150.0) and (UDFToDouble(key) > 100.0)) (type: boolean) + Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE + Map 8 + Map Operator Tree: + TableScan + alias: src2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((UDFToDouble(key) > 150.0) and (UDFToDouble(key) > 100.0)) (type: boolean) + Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 55 Data size: 584 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 17 Data size: 182 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 17 Data size: 182 Basic stats: COMPLETE Column stats: NONE + Reducer 3 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 66 Data size: 706 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: hash(_col0,_col2) (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 66 Data size: 706 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 4 + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 7 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col2 + Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col2 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 60 Data size: 642 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: WITH a AS ( + SELECT src1.key, src2.value + FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key) + WHERE src1.key > 100 +), +b AS ( + SELECT src1.key, src2.value + FROM src src1 JOIN src src2 ON (src1.key = src2.key) + WHERE src1.key > 150 +) +SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@tmp +#### A masked pattern was here #### +POSTHOOK: query: WITH a AS ( + SELECT src1.key, src2.value + FROM tmp src1 JOIN tmp src2 ON (src1.key = src2.key) + WHERE src1.key > 100 +), +b AS ( + SELECT src1.key, src2.value + FROM src src1 JOIN src src2 ON (src1.key = src2.key) + WHERE src1.key > 150 +) +SELECT sum(hash(a.key, b.value)) FROM a JOIN b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@tmp +#### A masked pattern was here #### +180817551380