From 88bfe1588cb8c0fb3f027131cbcb5e21d23a7e3e Mon Sep 17 00:00:00 2001 From: Na Yang Date: Thu, 6 Nov 2014 18:23:23 -0800 Subject: [PATCH] HIVE-8756 --- .../test/resources/testconfiguration.properties | 1 + .../hive/ql/parse/spark/GenSparkProcContext.java | 2 + .../hadoop/hive/ql/parse/spark/GenSparkUtils.java | 30 ++- .../test/results/clientpositive/spark/stats1.q.out | 247 +++++++++++++++++++++ 4 files changed, 278 insertions(+), 2 deletions(-) create mode 100644 ql/src/test/results/clientpositive/spark/stats1.q.out diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 79a0132..977eb48 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -798,6 +798,7 @@ spark.query.files=add_part_multiple.q, \ stats_only_null.q, \ stats_partscan_1_23.q, \ stats0.q, \ + stats1.q, \ stats10.q, \ stats12.q, \ stats13.q, \ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index 8290568..e309aac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -130,6 +130,7 @@ public final Set clonedReduceSinks; public final Set fileSinkSet; + public final Map fileSinkMap; // remember which reducesinks we've already connected public final Set connectedReduceSinks; @@ -169,6 +170,7 @@ public GenSparkProcContext(HiveConf conf, ParseContext parseContext, this.workWithUnionOperators = new LinkedHashSet(); this.clonedReduceSinks = new LinkedHashSet(); this.fileSinkSet = new LinkedHashSet(); + this.fileSinkMap = new LinkedHashMap(); this.connectedReduceSinks = new LinkedHashSet(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index e8e18a7..0279127 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -216,6 +216,24 @@ public void removeUnionOperators(Configuration conf, GenSparkProcContext context // need to clone the plan. List> newRoots = Utilities.cloneOperatorTree(conf, roots); + // Create a mapping between the original FileSinkOperator and new FileSinkOperator. + // This mapping is used for setting the correct stats flag on the new FileSinkDesc + Iterator> newRoots_it = newRoots.iterator(); + for (Operator root : roots) { + Operator newRoot = newRoots_it.next(); + while (newRoot != null) { + if (newRoot instanceof FileSinkOperator ) { + context.fileSinkMap.put((FileSinkOperator)root, (FileSinkOperator)newRoot); + } + if (!newRoot.getChildOperators().isEmpty()) { + newRoot = (Operator) newRoot.getChildOperators().get(0); + root = (Operator) root.getChildOperators().get(0); + } else { + break; + } + } + } + // we're cloning the operator plan but we're retaining the original work. That means // that root operators have to be replaced with the cloned ops. The replacement map // tells you what that mapping is. @@ -292,8 +310,16 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi GenMapRedUtils.isInsertInto(parseContext, fileSink); HiveConf hconf = parseContext.getConf(); - boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask, - hconf, fileSink, context.currentTask, isInsertTable); + FileSinkOperator fileSinkOp = context.fileSinkMap.get(fileSink); + + boolean chDir = false; + if (fileSinkOp!= null) { + chDir = GenMapRedUtils.isMergeRequired(context.moveTask, + hconf, fileSinkOp, context.currentTask, isInsertTable); + } else{ + chDir = GenMapRedUtils.isMergeRequired(context.moveTask, + hconf, fileSink, context.currentTask, isInsertTable); + } Path finalName = GenMapRedUtils.createMoveTask(context.currentTask, chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask); diff --git ql/src/test/results/clientpositive/spark/stats1.q.out ql/src/test/results/clientpositive/spark/stats1.q.out new file mode 100644 index 0000000..18e40b4 --- /dev/null +++ ql/src/test/results/clientpositive/spark/stats1.q.out @@ -0,0 +1,247 @@ +PREHOOK: query: create table tmptable(key string, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tmptable +POSTHOOK: query: create table tmptable(key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tmptable +PREHOOK: query: EXPLAIN +INSERT OVERWRITE TABLE tmptable +SELECT unionsrc.key, unionsrc.value +FROM (SELECT 'tst1' AS key, cast(count(1) AS string) AS value FROM src s1 + UNION ALL + SELECT s2.key AS key, s2.value AS value FROM src1 s2) unionsrc +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +INSERT OVERWRITE TABLE tmptable +SELECT unionsrc.key, unionsrc.value +FROM (SELECT 'tst1' AS key, cast(count(1) AS string) AS value FROM src s1 + UNION ALL + SELECT s2.key AS key, s2.value AS value FROM src1 s2) unionsrc +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 1) + Union 3 <- Map 4 (NONE, 0), Reducer 2 (NONE, 0) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count(1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Map 4 + Map Operator Tree: + TableScan + alias: s2 + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: 'tst1' (type: string), UDFToString(_col0) (type: string) + outputColumnNames: _col0, _col1 + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + Union 3 + Vertex: Union 3 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmptable + + Stage: Stage-3 + Stats-Aggr Operator + +PREHOOK: query: INSERT OVERWRITE TABLE tmptable +SELECT unionsrc.key, unionsrc.value +FROM (SELECT 'tst1' AS key, cast(count(1) AS string) AS value FROM src s1 + UNION ALL + SELECT s2.key AS key, s2.value AS value FROM src1 s2) unionsrc +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +PREHOOK: Output: default@tmptable +POSTHOOK: query: INSERT OVERWRITE TABLE tmptable +SELECT unionsrc.key, unionsrc.value +FROM (SELECT 'tst1' AS key, cast(count(1) AS string) AS value FROM src s1 + UNION ALL + SELECT s2.key AS key, s2.value AS value FROM src1 s2) unionsrc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +POSTHOOK: Output: default@tmptable +POSTHOOK: Lineage: tmptable.key EXPRESSION [(src1)s2.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tmptable.value EXPRESSION [(src)s1.null, (src1)s2.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SELECT * FROM tmptable x SORT BY x.key, x.value +PREHOOK: type: QUERY +PREHOOK: Input: default@tmptable +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM tmptable x SORT BY x.key, x.value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmptable +#### A masked pattern was here #### + + + + + val_165 + val_193 + val_265 + val_27 + val_409 + val_484 +128 +146 val_146 +150 val_150 +213 val_213 +224 +238 val_238 +255 val_255 +273 val_273 +278 val_278 +311 val_311 +369 +401 val_401 +406 val_406 +66 val_66 +98 val_98 +tst1 500 +PREHOOK: query: DESCRIBE FORMATTED tmptable +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@tmptable +POSTHOOK: query: DESCRIBE FORMATTED tmptable +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@tmptable +# col_name data_type comment + +key string +value string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE true + numFiles 2 + numRows 25 + rawDataSize 191 + totalSize 225 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: -- Load a file into a existing table +-- Some stats (numFiles, totalSize) should be updated correctly +-- Some other stats (numRows, rawDataSize) should be cleared +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE tmptable +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@tmptable +POSTHOOK: query: -- Load a file into a existing table +-- Some stats (numFiles, totalSize) should be updated correctly +-- Some other stats (numRows, rawDataSize) should be cleared +load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE tmptable +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@tmptable +PREHOOK: query: DESCRIBE FORMATTED tmptable +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@tmptable +POSTHOOK: query: DESCRIBE FORMATTED tmptable +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@tmptable +# col_name data_type comment + +key string +value string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE true + numFiles 3 + numRows 0 + rawDataSize 0 + totalSize 1583 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 -- 1.8.5.2 (Apple Git-48)