diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index c777e45..c7aec5e 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -542,6 +542,8 @@ spark.query.files=add_part_multiple.q, \ groupby_complex_types.q, \ groupby_complex_types_multi_single_reducer.q, \ groupby_cube1.q, \ + groupby_map_ppr.q, \ + groupby_map_ppr_multi_distinct.q, \ groupby_multi_insert_common_distinct.q, \ groupby_multi_single_reducer.q, \ groupby_multi_single_reducer2.q, \ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index 6a7733c..39d8c92 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -489,6 +489,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -511,6 +512,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.removeReduceSinkForGroupBy( cRS, cGBY, dedupCtx.getPctx(), dedupCtx); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -529,6 +531,12 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup pJoin.getConf().setFixedAsSorted(true); CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); + ReduceSinkOperator pRS = + CorrelationUtilities.findPossibleParent( + pJoin, ReduceSinkOperator.class, dedupCtx.trustScript()); + if (pRS != null) { + pRS.getConf().setDeduplicated(true); + } return true; } return false; @@ -547,6 +555,12 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, pJoin.getConf().setFixedAsSorted(true); CorrelationUtilities.removeReduceSinkForGroupBy( cRS, cGBY, dedupCtx.getPctx(), dedupCtx); + ReduceSinkOperator pRS = + CorrelationUtilities.findPossibleParent( + pJoin, ReduceSinkOperator.class, dedupCtx.trustScript()); + if (pRS != null) { + pRS.getConf().setDeduplicated(true); + } return true; } return false; @@ -565,6 +579,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -581,6 +596,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, start, ReduceSinkOperator.class, dedupCtx.trustScript()); if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.removeReduceSinkForGroupBy(cRS, cGBY, dedupCtx.getPctx(), dedupCtx); + pRS.getConf().setDeduplicated(true); return true; } return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java index 92600be..b3d39ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java @@ -131,7 +131,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; } }*/ - SparkEdgeProperty edgeProp = new SparkEdgeProperty(0/*null, edgeType, numBuckets*/); + SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE); if (mapJoinWork != null) { for (BaseWork myWork: mapJoinWork) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index e8e18a7..9b829b0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -117,13 +117,19 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root sparkWork.add(reduceWork); - // Use group-by as the default shuffler - SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_GROUP, - reduceWork.getNumReduceTasks()); - + SparkEdgeProperty edgeProp = new SparkEdgeProperty(); + edgeProp.setNumPartitions(reduceWork.getNumReduceTasks()); String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim(); - if (!sortOrder.isEmpty() && isSortNecessary(reduceSink)) { - edgeProp.setShuffleSort(); + + // test if we need group-by shuffle + if (reduceSink.getChildOperators().size() == 1 && + reduceSink.getChildOperators().get(0) instanceof GroupByOperator) { + edgeProp.setShuffleGroup(); + // test if the group by needs partition level sort, if so, use the MR style shuffle + // SHUFFLE_SORT shouldn't be used for this purpose, see HIVE-8542 + if (!sortOrder.isEmpty() && GBYNeedParLevelOrder(reduceSink)) { + edgeProp.setMRShuffle(); + } } if (reduceWork.getReducer() instanceof JoinOperator) { @@ -131,6 +137,21 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root edgeProp.setMRShuffle(); } + // test if we need total order, if so, we can either use MR shuffle + set #reducer to 1, + // or we can use SHUFFLE_SORT + // TODO: simple sort-by will also end up here, ideally it should be handled by MR shuffle + if (!edgeProp.isShuffleGroup() && !edgeProp.isMRShuffle() && !sortOrder.isEmpty()) { + if (reduceSink.getConf().getKeyCols().equals(reduceSink.getConf().getPartitionCols())) { + edgeProp.setShuffleSort(); + } + } + + // set to groupby-shuffle if it's still NONE + // simple distribute-by goes here + if (edgeProp.isShuffleNone()) { + edgeProp.setShuffleGroup(); + } + //If its a FileSink to bucketed files, also use MR-style shuffle to get compatible taskId for bucket-name FileSinkOperator fso = getChildOperator(reduceWork.getReducer(), FileSinkOperator.class); if (fso != null) { @@ -316,14 +337,18 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi } /** - * Test if the sort order in the RS is necessary. - * Unnecessary sort is mainly introduced when GBY is created. Therefore, if the sorting + * Test if we need partition level order for group by query. + * GBY needs partition level order when distinct is present. Therefore, if the sorting * keys, partitioning keys and grouping keys are the same, we ignore the sort and use * GroupByShuffler to shuffle the data. In this case a group-by transformation should be * sufficient to produce the correct results, i.e. data is properly grouped by the keys * but keys are not guaranteed to be sorted. */ - public static boolean isSortNecessary(ReduceSinkOperator reduceSinkOperator) { + public static boolean GBYNeedParLevelOrder(ReduceSinkOperator reduceSinkOperator) { + // if the RS is deduplicated, enforce sorting anyway + if (reduceSinkOperator.getConf().isDeduplicated()) { + return true; + } List> children = reduceSinkOperator.getChildOperators(); if (children != null && children.size() == 1 && children.get(0) instanceof GroupByOperator) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index 7de79b6..a304fdf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -24,15 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; -import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.OperatorFactory; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -311,17 +303,35 @@ public Object process(Node nd, Stack stack, if (!context.connectedReduceSinks.contains(rs)) { // add dependency between the two work items - // Use group-by as the default shuffler - SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_GROUP, - rs.getConf().getNumReducers()); + SparkEdgeProperty edgeProp = new SparkEdgeProperty(); + edgeProp.setNumPartitions(rs.getConf().getNumReducers()); String sortOrder = Strings.nullToEmpty(rs.getConf().getOrder()).trim(); - if (!sortOrder.isEmpty() && GenSparkUtils.isSortNecessary(rs)) { - edgeProp.setShuffleSort(); + // test if we need group-by shuffle + if (rs.getChildOperators().size() == 1 && + rs.getChildOperators().get(0) instanceof GroupByOperator) { + edgeProp.setShuffleGroup(); + // test if the group by needs partition level sort, if so, use the MR style shuffle + // SHUFFLE_SORT shouldn't be used for this purpose, see HIVE-8542 + if (!sortOrder.isEmpty() && GenSparkUtils.GBYNeedParLevelOrder(rs)) { + edgeProp.setMRShuffle(); + } } if (rWork.getReducer() instanceof JoinOperator) { //reduce-side join, use MR-style shuffle edgeProp.setMRShuffle(); } + // test if we need total order, if so, we can either use MR shuffle + set #reducer to 1, + // or we can use SHUFFLE_SORT + if (!edgeProp.isShuffleGroup() && !edgeProp.isMRShuffle() && !sortOrder.isEmpty()) { + if (rs.getConf().getKeyCols().equals(rs.getConf().getPartitionCols())) { + edgeProp.setShuffleSort(); + } + } + // set to groupby-shuffle if it's still NONE + // simple distribute-by goes here + if (edgeProp.isShuffleNone()) { + edgeProp.setShuffleGroup(); + } //If its a FileSink to bucketed files, also use MR-style shuffle to get compatible taskId for bucket-name FileSinkOperator fso = GenSparkUtils.getChildOperator(rWork.getReducer(), FileSinkOperator.class); if (fso != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 57beb69..c8aafd6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -111,6 +111,9 @@ private ReducerTraits(int trait) { // Write type, since this needs to calculate buckets differently for updates and deletes private AcidUtils.Operation writeType; + // used for spark, remember if the RS is deduplicated, if so, spark will enforce sorting + private transient boolean isDeduplicated = false; + private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class); public ReduceSinkDesc() { } @@ -165,6 +168,7 @@ public Object clone() { desc.setStatistics(this.getStatistics()); desc.setSkipTag(skipTag); desc.reduceTraits = reduceTraits.clone(); + desc.setDeduplicated(isDeduplicated); return desc; } @@ -407,4 +411,12 @@ public final void setReducerTraits(EnumSet traits) { public AcidUtils.Operation getWriteType() { return writeType; } + + public boolean isDeduplicated() { + return isDeduplicated; + } + + public void setDeduplicated(boolean isDeduplicated) { + this.isDeduplicated = isDeduplicated; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java index 83e664b..67d6c33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java @@ -30,6 +30,10 @@ private int numPartitions; + public SparkEdgeProperty(){ + + } + public SparkEdgeProperty(long edgeType, int numPartitions) { this.edgeType = edgeType; this.numPartitions = numPartitions; @@ -91,7 +95,6 @@ public String getShuffleType() { sb.append(" "); } sb.append("PARTITION-LEVEL SORT"); - return sb.toString(); } if (isShuffleSort()) { @@ -99,7 +102,6 @@ public String getShuffleType() { sb.append(" "); } sb.append("SORT"); - return sb.toString(); } return sb.toString(); diff --git ql/src/test/results/clientpositive/spark/groupby_map_ppr.q.out ql/src/test/results/clientpositive/spark/groupby_map_ppr.q.out new file mode 100644 index 0000000..a950e7f --- /dev/null +++ ql/src/test/results/clientpositive/spark/groupby_map_ppr.q.out @@ -0,0 +1,342 @@ +PREHOOK: query: -- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key STRING, c1 INT, c2 STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: -- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key STRING, c1 INT, c2 STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN EXTENDED +FROM srcpart src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))) +WHERE src.ds = '2008-04-08' +GROUP BY substr(src.key,1,1) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED +FROM srcpart src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))) +WHERE src.ds = '2008-04-08' +GROUP BY substr(src.key,1,1) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + +TOK_QUERY + TOK_FROM + TOK_TABREF + TOK_TABNAME + srcpart + src + TOK_INSERT + TOK_DESTINATION + TOK_TAB + TOK_TABNAME + dest1 + TOK_SELECT + TOK_SELEXPR + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + key + 1 + 1 + TOK_SELEXPR + TOK_FUNCTIONDI + count + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + value + 5 + TOK_SELEXPR + TOK_FUNCTION + concat + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + key + 1 + 1 + TOK_FUNCTION + sum + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + value + 5 + TOK_WHERE + = + . + TOK_TABLE_OR_COL + src + ds + '2008-04-08' + TOK_GROUPBY + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + key + 1 + 1 + + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 31) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(DISTINCT substr(value, 5)), sum(substr(value, 5)) + keys: substr(key, 1, 1) (type: string), substr(value, 5) (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col3 (type: double) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: hr=11 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + partition values: + ds 2008-04-08 + hr 11 + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.srcpart + numFiles 1 + numRows 500 + partition_columns ds/hr + partition_columns.types string:string + rawDataSize 5312 + serialization.ddl struct srcpart { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.srcpart + partition_columns ds/hr + partition_columns.types string:string + serialization.ddl struct srcpart { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.srcpart + name: default.srcpart +#### A masked pattern was here #### + Partition + base file name: hr=12 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + partition values: + ds 2008-04-08 + hr 12 + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.srcpart + numFiles 1 + numRows 500 + partition_columns ds/hr + partition_columns.types string:string + rawDataSize 5312 + serialization.ddl struct srcpart { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.srcpart + partition_columns ds/hr + partition_columns.types string:string + serialization.ddl struct srcpart { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.srcpart + name: default.srcpart + Truncated Path -> Alias: + /srcpart/ds=2008-04-08/hr=11 [src] + /srcpart/ds=2008-04-08/hr=12 [src] + Reducer 2 + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: count(DISTINCT KEY._col1:0._col0), sum(VALUE._col1) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), UDFToInteger(_col1) (type: int), concat(_col0, _col2) (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 1 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,c1,c2 + columns.comments + columns.types string:int:string +#### A masked pattern was here #### + name default.dest1 + serialization.ddl struct dest1 { string key, i32 c1, string c2} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + TotalFiles: 1 + GatherStats: true + MultiFileSpray: false + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: true +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,c1,c2 + columns.comments + columns.types string:int:string +#### A masked pattern was here #### + name default.dest1 + serialization.ddl struct dest1 { string key, i32 c1, string c2} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-3 + Stats-Aggr Operator +#### A masked pattern was here #### + +PREHOOK: query: FROM srcpart src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))) +WHERE src.ds = '2008-04-08' +GROUP BY substr(src.key,1,1) +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM srcpart src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))) +WHERE src.ds = '2008-04-08' +GROUP BY substr(src.key,1,1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.c1 EXPRESSION [(srcpart)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.c2 EXPRESSION [(srcpart)src.FieldSchema(name:key, type:string, comment:default), (srcpart)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.key EXPRESSION [(srcpart)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 1 00.0 +1 71 132828.0 +2 69 251142.0 +3 62 364008.0 +4 74 4105526.0 +5 6 5794.0 +6 5 6796.0 +7 6 71470.0 +8 8 81524.0 +9 7 92094.0 diff --git ql/src/test/results/clientpositive/spark/groupby_map_ppr_multi_distinct.q.out ql/src/test/results/clientpositive/spark/groupby_map_ppr_multi_distinct.q.out new file mode 100644 index 0000000..3592169 --- /dev/null +++ ql/src/test/results/clientpositive/spark/groupby_map_ppr_multi_distinct.q.out @@ -0,0 +1,361 @@ +PREHOOK: query: -- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key STRING, c1 INT, c2 STRING, C3 INT, c4 INT) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: -- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key STRING, c1 INT, c2 STRING, C3 INT, c4 INT) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN EXTENDED +FROM srcpart src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(DISTINCT src.value) +WHERE src.ds = '2008-04-08' +GROUP BY substr(src.key,1,1) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED +FROM srcpart src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(DISTINCT src.value) +WHERE src.ds = '2008-04-08' +GROUP BY substr(src.key,1,1) +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + +TOK_QUERY + TOK_FROM + TOK_TABREF + TOK_TABNAME + srcpart + src + TOK_INSERT + TOK_DESTINATION + TOK_TAB + TOK_TABNAME + dest1 + TOK_SELECT + TOK_SELEXPR + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + key + 1 + 1 + TOK_SELEXPR + TOK_FUNCTIONDI + count + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + value + 5 + TOK_SELEXPR + TOK_FUNCTION + concat + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + key + 1 + 1 + TOK_FUNCTION + sum + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + value + 5 + TOK_SELEXPR + TOK_FUNCTIONDI + sum + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + value + 5 + TOK_SELEXPR + TOK_FUNCTIONDI + count + . + TOK_TABLE_OR_COL + src + value + TOK_WHERE + = + . + TOK_TABLE_OR_COL + src + ds + '2008-04-08' + TOK_GROUPBY + TOK_FUNCTION + substr + . + TOK_TABLE_OR_COL + src + key + 1 + 1 + + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 31) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(DISTINCT substr(value, 5)), sum(substr(value, 5)), sum(DISTINCT substr(value, 5)), count(DISTINCT value) + keys: substr(key, 1, 1) (type: string), substr(value, 5) (type: string), value (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string) + sort order: +++ + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col4 (type: double) + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: hr=11 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + partition values: + ds 2008-04-08 + hr 11 + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.srcpart + numFiles 1 + numRows 500 + partition_columns ds/hr + partition_columns.types string:string + rawDataSize 5312 + serialization.ddl struct srcpart { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.srcpart + partition_columns ds/hr + partition_columns.types string:string + serialization.ddl struct srcpart { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.srcpart + name: default.srcpart +#### A masked pattern was here #### + Partition + base file name: hr=12 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + partition values: + ds 2008-04-08 + hr 12 + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.srcpart + numFiles 1 + numRows 500 + partition_columns ds/hr + partition_columns.types string:string + rawDataSize 5312 + serialization.ddl struct srcpart { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.srcpart + partition_columns ds/hr + partition_columns.types string:string + serialization.ddl struct srcpart { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.srcpart + name: default.srcpart + Truncated Path -> Alias: + /srcpart/ds=2008-04-08/hr=11 [src] + /srcpart/ds=2008-04-08/hr=12 [src] + Reducer 2 + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + aggregations: count(DISTINCT KEY._col1:0._col0), sum(VALUE._col1), sum(DISTINCT KEY._col1:1._col0), count(DISTINCT KEY._col1:2._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), UDFToInteger(_col1) (type: int), concat(_col0, _col2) (type: string), UDFToInteger(_col3) (type: int), UDFToInteger(_col4) (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 1 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,c1,c2,c3,c4 + columns.comments + columns.types string:int:string:int:int +#### A masked pattern was here #### + name default.dest1 + serialization.ddl struct dest1 { string key, i32 c1, string c2, i32 c3, i32 c4} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + TotalFiles: 1 + GatherStats: true + MultiFileSpray: false + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: true +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,c1,c2,c3,c4 + columns.comments + columns.types string:int:string:int:int +#### A masked pattern was here #### + name default.dest1 + serialization.ddl struct dest1 { string key, i32 c1, string c2, i32 c3, i32 c4} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-3 + Stats-Aggr Operator +#### A masked pattern was here #### + +PREHOOK: query: FROM srcpart src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(DISTINCT src.value) +WHERE src.ds = '2008-04-08' +GROUP BY substr(src.key,1,1) +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM srcpart src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(DISTINCT src.value) +WHERE src.ds = '2008-04-08' +GROUP BY substr(src.key,1,1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.c1 EXPRESSION [(srcpart)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.c2 EXPRESSION [(srcpart)src.FieldSchema(name:key, type:string, comment:default), (srcpart)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.c3 EXPRESSION [(srcpart)src.null, ] +POSTHOOK: Lineage: dest1.c4 EXPRESSION [(srcpart)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.key EXPRESSION [(srcpart)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 1 00.0 0 1 +1 71 132828.0 10044 71 +2 69 251142.0 15780 69 +3 62 364008.0 20119 62 +4 74 4105526.0 30965 74 +5 6 5794.0 278 6 +6 5 6796.0 331 5 +7 6 71470.0 447 6 +8 8 81524.0 595 8 +9 7 92094.0 577 7