Index: ql/src/gen-py/queryplan/ttypes.py =================================================================== --- ql/src/gen-py/queryplan/ttypes.py (revision 4386) +++ ql/src/gen-py/queryplan/ttypes.py (working copy) @@ -37,6 +37,7 @@ REDUCESINK = 11 UNION = 12 UDTF = 13 + LATERALVIEWJOIN = 14 class TaskType: MAP = 0 Index: ql/src/test/results/clientpositive/lateral_view.q.out =================================================================== --- ql/src/test/results/clientpositive/lateral_view.q.out (revision 0) +++ ql/src/test/results/clientpositive/lateral_view.q.out (revision 0) @@ -0,0 +1,289 @@ +PREHOOK: query: EXPLAIN SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myTable)) (TOK_TABREF src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 3))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + Lateral View Join Operator + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + expr: _UDTFcol0 + type: int + outputColumnNames: _col0, _col1, _col2 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + output table alias: myTable + output table columns: elt(_UDTFcol0) + function name: explode + Lateral View Join Operator + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + expr: _UDTFcol0 + type: int + outputColumnNames: _col0, _col1, _col2 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 3 + + +PREHOOK: query: EXPLAIN SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myTable)) (TOK_TABREF src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF myTable))) (TOK_LIMIT 3))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + Lateral View Join Operator + Select Operator + expressions: + expr: _UDTFcol1 + type: int + outputColumnNames: _col0 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + output table alias: myTable + output table columns: elt(_UDTFcol1) + function name: explode + Lateral View Join Operator + Select Operator + expressions: + expr: _UDTFcol1 + type: int + outputColumnNames: _col0 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 3 + + +PREHOOK: query: EXPLAIN SELECT myTable.elt, myTable2.elt FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LATERAL VIEW explode(array('a', 'b', 'c')) AS myTable2 LIMIT 9 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT myTable.elt, myTable2.elt FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LATERAL VIEW explode(array('a', 'b', 'c')) AS myTable2 LIMIT 9 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 'a' 'b' 'c')) myTable2)) (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myTable)) (TOK_TABREF src)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL myTable) elt)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL myTable2) elt))) (TOK_LIMIT 9))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + Lateral View Join Operator + Select Operator + Lateral View Join Operator + Select Operator + expressions: + expr: _UDTFcol2 + type: int + expr: _UDTFcol3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array('a','b','c') + type: array + outputColumnNames: _col0 + UDTF Operator + output table alias: myTable2 + output table columns: elt(_UDTFcol3) + function name: explode + Lateral View Join Operator + Select Operator + expressions: + expr: _UDTFcol2 + type: int + expr: _UDTFcol3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + output table alias: myTable + output table columns: elt(_UDTFcol2) + function name: explode + Lateral View Join Operator + Select Operator + Lateral View Join Operator + Select Operator + expressions: + expr: _UDTFcol2 + type: int + expr: _UDTFcol3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array('a','b','c') + type: array + outputColumnNames: _col0 + UDTF Operator + output table alias: myTable2 + output table columns: elt(_UDTFcol3) + function name: explode + Lateral View Join Operator + Select Operator + expressions: + expr: _UDTFcol2 + type: int + expr: _UDTFcol3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 9 + + +PREHOOK: query: -- Verify that * selects columns from both tables +SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1598075522/10000 +POSTHOOK: query: -- Verify that * selects columns from both tables +SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1598075522/10000 +238 val_238 1 +238 val_238 2 +238 val_238 3 +PREHOOK: query: -- TABLE.* should be supported +SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/378474640/10000 +POSTHOOK: query: -- TABLE.* should be supported +SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/378474640/10000 +1 +2 +3 +PREHOOK: query: -- Multiple lateral views should result in a Cartesian product +SELECT myTable.elt, myTable2.elt FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LATERAL VIEW explode(array('a', 'b', 'c')) AS myTable2 LIMIT 9 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1658658528/10000 +POSTHOOK: query: -- Multiple lateral views should result in a Cartesian product +SELECT myTable.elt, myTable2.elt FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LATERAL VIEW explode(array('a', 'b', 'c')) AS myTable2 LIMIT 9 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1658658528/10000 +1 a +1 b +1 c +2 a +2 b +2 c +3 a +3 b +3 c Index: ql/src/test/results/clientpositive/udtf_explode.q.out =================================================================== --- ql/src/test/results/clientpositive/udtf_explode.q.out (revision 4386) +++ ql/src/test/results/clientpositive/udtf_explode.q.out (working copy) @@ -22,25 +22,26 @@ type: array outputColumnNames: _col0 UDTF Operator - outputColumnName: myCol + output table alias: myCol + output table columns: elt(_UDTFcol0) function name: explode Limit File Output Operator compressed: false GlobalTableId: 0 - directory: file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/1030905471/10001 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/256785963/10001 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - columns myCol + columns _UDTFcol0 serialization.format 1 columns.types int Needs Tagging: false Path -> Alias: - file:/data/users/njain/hive_commit1/hive_commit1/build/ql/test/data/warehouse/src [src] + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [src] Path -> Partition: - file:/data/users/njain/hive_commit1/hive_commit1/build/ql/test/data/warehouse/src + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src Partition input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -54,7 +55,8 @@ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - location file:/data/users/njain/hive_commit1/hive_commit1/build/ql/test/data/warehouse/src + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1259880912 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat @@ -69,8 +71,8 @@ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - location file:/data/users/njain/hive_commit1/hive_commit1/build/ql/test/data/warehouse/src - transient_lastDdlTime 1258608973 + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1259880912 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: src name: src @@ -80,12 +82,12 @@ limit: 3 -PREHOOK: query: EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: query: EXPLAIN EXTENDED SELECT a.elt, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.elt PREHOOK: type: QUERY -POSTHOOK: query: EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +POSTHOOK: query: EXPLAIN EXTENDED SELECT a.elt, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.elt POSTHOOK: type: QUERY ABSTRACT SYNTAX TREE: - (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL myCol)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (TOK_TABLE_OR_COL myCol)))) + (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) elt)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL a) elt)))) STAGE DEPENDENCIES: Stage-1 is a root stage @@ -105,20 +107,21 @@ type: array outputColumnNames: _col0 UDTF Operator - outputColumnName: myCol + output table alias: myCol + output table columns: elt(_UDTFcol1) function name: explode Limit Reduce Output Operator sort order: tag: -1 value expressions: - expr: myCol + expr: _UDTFcol1 type: int Needs Tagging: false Path -> Alias: - file:/data/users/njain/hive_commit1/hive_commit1/build/ql/test/data/warehouse/src [a:src] + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [a:src] Path -> Partition: - file:/data/users/njain/hive_commit1/hive_commit1/build/ql/test/data/warehouse/src + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src Partition input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -132,7 +135,8 @@ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - location file:/data/users/njain/hive_commit1/hive_commit1/build/ql/test/data/warehouse/src + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1259880912 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat @@ -147,8 +151,8 @@ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - location file:/data/users/njain/hive_commit1/hive_commit1/build/ql/test/data/warehouse/src - transient_lastDdlTime 1258608973 + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1259880912 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: src name: src @@ -171,7 +175,7 @@ File Output Operator compressed: false GlobalTableId: 0 - directory: file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/38030994/10002 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1126219824/10002 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -183,7 +187,7 @@ Stage: Stage-2 Map Reduce Alias -> Map Operator Tree: - file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/38030994/10002 + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1126219824/10002 Reduce Output Operator key expressions: expr: _col0 @@ -198,9 +202,9 @@ type: bigint Needs Tagging: false Path -> Alias: - file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/38030994/10002 [file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/38030994/10002] + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1126219824/10002 [file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1126219824/10002] Path -> Partition: - file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/38030994/10002 + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1126219824/10002 Partition input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -234,7 +238,7 @@ File Output Operator compressed: false GlobalTableId: 0 - directory: file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/38030994/10001 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1126219824/10001 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -251,22 +255,22 @@ PREHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 PREHOOK: type: QUERY PREHOOK: Input: default@src -PREHOOK: Output: file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/1652852305/10000 +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/2093133199/10000 POSTHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 POSTHOOK: type: QUERY POSTHOOK: Input: default@src -POSTHOOK: Output: file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/1652852305/10000 +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/2093133199/10000 1 2 3 -PREHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: query: SELECT a.elt, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.elt PREHOOK: type: QUERY PREHOOK: Input: default@src -PREHOOK: Output: file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/734444296/10000 -POSTHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1620820010/10000 +POSTHOOK: query: SELECT a.elt, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.elt POSTHOOK: type: QUERY POSTHOOK: Input: default@src -POSTHOOK: Output: file:/data/users/njain/hive_commit1/hive_commit1/build/ql/tmp/734444296/10000 +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1620820010/10000 1 1 2 1 3 1 Index: ql/src/test/queries/clientpositive/lateral_view.q =================================================================== --- ql/src/test/queries/clientpositive/lateral_view.q (revision 0) +++ ql/src/test/queries/clientpositive/lateral_view.q (revision 0) @@ -0,0 +1,10 @@ +EXPLAIN SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3; +EXPLAIN SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3; +EXPLAIN SELECT myTable.elt, myTable2.elt FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LATERAL VIEW explode(array('a', 'b', 'c')) AS myTable2 LIMIT 9; + +-- Verify that * selects columns from both tables +SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3; +-- TABLE.* should be supported +SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LIMIT 3; +-- Multiple lateral views should result in a Cartesian product +SELECT myTable.elt, myTable2.elt FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable LATERAL VIEW explode(array('a', 'b', 'c')) AS myTable2 LIMIT 9; Index: ql/src/test/queries/clientpositive/udtf_explode.q =================================================================== --- ql/src/test/queries/clientpositive/udtf_explode.q (revision 4386) +++ ql/src/test/queries/clientpositive/udtf_explode.q (working copy) @@ -1,5 +1,5 @@ EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; -EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; +EXPLAIN EXTENDED SELECT a.elt, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.elt; SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; -SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; +SELECT a.elt, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.elt; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (revision 0) @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.lateralViewJoinDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + + +/** + * The lateral view join operator is used to implement the lateral view + * functionality. This operator was implemented with the following + * operator DAG in mind. For a query such as + * + * SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS adid + * + * The top of the operator tree will look similar to: + * + * [Table Scan] + * / \ + *[Select](*) [Select](adid_list) + * | | + * | [UDTF] (explode) + * \ / + * [Lateral View Join] + * | + * | + * [Select] (pageid, adid.*) + * | + * .... + * + * Rows from the table scan operator are first sent to two select operators. + * The select operator on the left picks all the columns while the select + * operator on the right picks only the columns needed by the UDTF. + * + * The output of select in the left branch and output of the UDTF in the right + * branch are then sent to the lateral view join (LVJ). In most cases, the UDTF + * will generate > 1 row for every row received from the TS, while the left + * select operator will generate only one. For each row output from the TS, + * the LVJ outputs all possible rows that can be created by joining the row from + * the left select and one of the rows output from the UDTF. + * + * Additional lateral views can be supported by adding a similar DAG after the + * previous LVJ operator. + */ + +public class LateralViewJoinOperator extends Operator { + + private static final long serialVersionUID = 1L; + + // The expected tags from the parent operators. See processOp() before + // changing the tags. + static final int SELECT_TAG = 0; + static final int UDTF_TAG = 1; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + ArrayList ois = new ArrayList(); + ArrayList fieldNames = new ArrayList(); + + // The output of the lateral view join will be the columns from the select + // parent, followed by the column from the UDTF parent + StructObjectInspector soi = + (StructObjectInspector) inputObjInspectors[SELECT_TAG]; + List sfs = soi.getAllStructFieldRefs(); + for(StructField sf : sfs) { + fieldNames.add(sf.getFieldName()); + ois.add(sf.getFieldObjectInspector()); + } + + soi = (StructObjectInspector) inputObjInspectors[UDTF_TAG]; + sfs = soi.getAllStructFieldRefs(); + for(StructField sf : sfs) { + fieldNames.add(sf.getFieldName()); + ois.add(sf.getFieldObjectInspector()); + } + + outputObjInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, ois); + + // Initialize the rest of the operator DAG + super.initializeOp(hconf); + } + + // acc is short for accumulator. It's used to store objects before sending + // them off to the next operator + ArrayList acc = new ArrayList(); + ArrayList selectObjs = new ArrayList(); + /** + * An important assumption for processOp() is that for a given row from the + * TS, the LVJ will first get the row from the left select operator, followed + * by all the corresponding rows from the UDTF operator. And so on. + */ + @Override + public void processOp(Object row, int tag) throws HiveException { + StructObjectInspector soi = (StructObjectInspector)inputObjInspectors[tag]; + if (tag == SELECT_TAG) { + selectObjs.clear(); + selectObjs.addAll(soi.getStructFieldsDataAsList(row)); + } else if (tag == UDTF_TAG) { + acc.clear(); + acc.addAll(selectObjs); + acc.addAll(soi.getStructFieldsDataAsList(row)); + forward(acc, outputObjInspector); + } else { + throw new HiveException("Invalid tag"); + } + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; public class UDTFOperator extends Operator implements Serializable { private static final long serialVersionUID = 1L; @@ -52,9 +53,10 @@ * sends periodic reports back to the tracker. */ transient AutoProgressor autoProgressor; - + transient boolean closeCalled = false; + protected void initializeOp(Configuration hconf) throws HiveException { - conf.getUDTF().setCollector(new UDTFCollector(this)); + conf.getGenericUDTF().setCollector(new UDTFCollector(this)); // Make an object inspector [] of the arguments to the UDTF List inputFields = @@ -62,18 +64,30 @@ udtfInputOIs = new ObjectInspector[inputFields.size()]; for (int i=0; i fieldNames = new ArrayList(); + ArrayList fieldOIs = new ArrayList(); + + int i=0; + for(StructField sf : udtfOutputOI.getAllStructFieldRefs()) { + String internalName = conf.getInternalNameForColAlias(sf.getFieldName()); + if(internalName == null) { + throw new HiveException("No internal name for column alias " + + sf.getFieldName()); + } + fieldNames.add(internalName); + fieldOIs.add(sf.getFieldObjectInspector()); + } + // The output of this operator should be a struct, so create appropriate OI - ArrayList colNames = new ArrayList(); - ArrayList colOIs = new ArrayList(); - colNames.add(conf.getOutputColName()); - colOIs.add(udtfOutputOI); - outputObjInspector = - ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colOIs); + outputObjInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); // Set up periodic progress reporting in case the UDTF doesn't output rows // for a while @@ -97,22 +111,23 @@ objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i)); } - conf.getUDTF().process(objToSendToUDTF); + conf.getGenericUDTF().process(objToSendToUDTF); } /** * forwardUDTFOutput is typically called indirectly by the GenericUDTF when - * the GenericUDTF has generated output data that should be passed on to the + * the GenericUDTF has generated output rows that should be passed on to the * next operator(s) in the DAG. * * @param o * @throws HiveException */ public void forwardUDTFOutput(Object o) throws HiveException { - // Now that we have the data from the UDTF, repack it into an object[] as - // the output should be inspectable by a struct OI - forwardObj[0] = o; - forward(forwardObj, outputObjInspector); + if(closeCalled) { + throw new HiveException("UDTF's should not output rows on close"); + } + // Since the output of the UDTF is a struct, we can just forward that + forward(o, outputObjInspector); } public String getName() { @@ -124,6 +139,7 @@ } protected void closeOp(boolean abort) throws HiveException { - conf.getUDTF().close(); + closeCalled = true; + conf.getGenericUDTF().close(); } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -55,6 +55,7 @@ opvec.add(new opTuple (tableScanDesc.class, TableScanOperator.class)); opvec.add(new opTuple (unionDesc.class, UnionOperator.class)); opvec.add(new opTuple (udtfDesc.class, UDTFOperator.class)); + opvec.add(new opTuple(lateralViewJoinDesc.class, LateralViewJoinOperator.class)); } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (working copy) @@ -19,28 +19,38 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +/** + * All member variables should have a setters and getters of the form + * get and set or else they won't be recreated + * properly at run time. + * + */ @explain(displayName="UDTF Operator") public class udtfDesc implements Serializable { private static final long serialVersionUID = 1L; private GenericUDTF genericUDTF; - private String outputColName; - + private String outputTableAlias; + // Needed as the fields of the struct output by the UDTF operator needs to + // have unique internal names. + private HashMap colAliasToInternalName; + public udtfDesc() { } - public udtfDesc(final GenericUDTF genericUDTF, String outputColName) { + public udtfDesc(final GenericUDTF genericUDTF, String outputTableAlias, + HashMap colAliasToInternalName) { this.genericUDTF = genericUDTF; - this.outputColName = outputColName; + this.outputTableAlias = outputTableAlias; + this.colAliasToInternalName = colAliasToInternalName; } - public GenericUDTF getUDTF() { + public GenericUDTF getGenericUDTF() { return this.genericUDTF; } - public void setUDTF(final GenericUDTF genericUDTF) { + public void setGenericUDTF(final GenericUDTF genericUDTF) { this.genericUDTF=genericUDTF; } @explain(displayName="function name") @@ -48,12 +58,32 @@ return this.genericUDTF.toString(); } - @explain(displayName="outputColumnName") - public String getOutputColName() { - return this.outputColName; + @explain(displayName="output table alias") + public String getOutputTableAlias() { + return this.outputTableAlias; } - public void setOutputColName(String outputColName) { - this.outputColName = outputColName; + @explain(displayName = "output table columns") + public String getTableColumns() { + String str = ""; + for(String colAlias : colAliasToInternalName.keySet()) { + str += colAlias + "(" + colAliasToInternalName.get(colAlias) + ") "; + } + return str; } + public void setOutputTableAlias(String outputTableAlias) { + this.outputTableAlias = outputTableAlias; + } + + public void setColAliasToInternalName(HashMap colAliasToInternalName) { + this.colAliasToInternalName = colAliasToInternalName; + } + public HashMap getColAliasToInternalName() { + return this.colAliasToInternalName; + } + + public String getInternalNameForColAlias(String colAlias) { + return this.colAliasToInternalName.get(colAlias); + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/lateralViewJoinDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/lateralViewJoinDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/lateralViewJoinDesc.java (revision 0) @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +@explain(displayName="Lateral View Join Operator") +public class lateralViewJoinDesc implements Serializable { + private static final long serialVersionUID = 1L; + + public lateralViewJoinDesc() { + } + public lateralViewJoinDesc(int numInputOps) { + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (working copy) @@ -57,6 +57,12 @@ private HashMap destToSortby; + /** + * Maping from table/subquery aliases to all the associated lateral view + * nodes + */ + private HashMap> aliasToLateralViews; + /* Order by clause */ private HashMap destToOrderby; private HashMap destToLimit; @@ -88,6 +94,8 @@ this.alias = alias; this.isSubQ = isSubQ; this.outerQueryLimit = -1; + + this.aliasToLateralViews = new HashMap>(); } public void setAggregationExprsForClause(String clause, LinkedHashMap aggregationTrees) { @@ -290,7 +298,8 @@ (joinExpr != null) || (!nameToSample.isEmpty()) || (!destToGroupby.isEmpty()) || - (!destToClusterby.isEmpty())) + (!destToClusterby.isEmpty()) || + (!aliasToLateralViews.isEmpty())) return false; Iterator>> aggrIter = destToAggregationExprs.entrySet().iterator(); @@ -343,4 +352,21 @@ public ASTNode getHints() { return hints; } + + public Map> getAliasToLateralViews() { + return this.aliasToLateralViews; + } + public List getLateralViewsForAlias(String alias) { + return aliasToLateralViews.get(alias.toLowerCase()); + } + + public void addLateralViewForAlias(String alias, ASTNode lateralView) { + String lowerAlias = alias.toLowerCase(); + ArrayList lateralViews = aliasToLateralViews.get(lowerAlias); + if(lateralViews == null) { + lateralViews = new ArrayList(); + aliasToLateralViews.put(alias, lateralViews); + } + lateralViews.add(lateralView); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -145,6 +145,7 @@ TOK_RECORDREADER; TOK_RECORDWRITER; TOK_LEFTSEMIJOIN; +TOK_LATERAL_VIEW; } @@ -917,11 +918,18 @@ | KW_LEFT KW_SEMI KW_JOIN -> TOK_LEFTSEMIJOIN ; +lateralView +@init {msgs.push("lateral view"); } +@after {msgs.pop(); } + : + KW_LATERAL KW_VIEW function KW_AS Identifier -> ^(TOK_LATERAL_VIEW ^(TOK_SELECT ^(TOK_SELEXPR function Identifier))) + ; + fromSource @init { msgs.push("from source"); } @after { msgs.pop(); } : - (tableSource | subQuerySource) + (tableSource | subQuerySource) (lateralView^)* ; tableSample @@ -1459,8 +1467,8 @@ KW_RECORDREADER: 'RECORDREADER'; KW_RECORDWRITER: 'RECORDWRITER'; KW_SEMI: 'SEMI'; +KW_LATERAL: 'LATERAL'; - // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -29,9 +29,9 @@ import java.util.Set; import java.util.TreeSet; import java.util.Vector; +import java.util.Map.Entry; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; -import java.lang.ClassNotFoundException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; @@ -60,7 +61,11 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -73,22 +78,27 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; -import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.GenMROperator; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2; +import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; +import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink4; import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1; import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.Optimizer; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; -import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; -import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink4; +import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TestClass; import org.apache.hadoop.hive.ql.plan.aggregationDesc; +import org.apache.hadoop.hive.ql.plan.createTableDesc; +import org.apache.hadoop.hive.ql.plan.createTableLikeDesc; import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.exprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.exprNodeDesc; @@ -101,6 +111,7 @@ import org.apache.hadoop.hive.ql.plan.forwardDesc; import org.apache.hadoop.hive.ql.plan.groupByDesc; import org.apache.hadoop.hive.ql.plan.joinDesc; +import org.apache.hadoop.hive.ql.plan.lateralViewJoinDesc; import org.apache.hadoop.hive.ql.plan.limitDesc; import org.apache.hadoop.hive.ql.plan.loadFileDesc; import org.apache.hadoop.hive.ql.plan.loadTableDesc; @@ -114,20 +125,18 @@ import org.apache.hadoop.hive.ql.plan.tableScanDesc; import org.apache.hadoop.hive.ql.plan.udtfDesc; import org.apache.hadoop.hive.ql.plan.unionDesc; -import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -136,33 +145,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.ql.exec.TextRecordReader; -import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; - -import java.util.regex.Matcher; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.createTableDesc; -import org.apache.hadoop.hive.ql.plan.createTableLikeDesc; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.FetchOperator; -import java.util.Collection; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; - /** * Implementation of the semantic analyzer */ @@ -337,7 +320,13 @@ return expr; } - private void processTable(QB qb, ASTNode tabref) throws SemanticException { + /** + * Goes to the table ref tree and finds the alias for the table. Then, it + * makes an association from the alias to the table. Also in parse info, + * in the parseInfo, it makes a mapping from the alias to the table name in + * the AST + */ + private String processTable(QB qb, ASTNode tabref) throws SemanticException { // For each table reference get the table name // and the alias (if alias is not present, the table name // is used as an alias) @@ -390,9 +379,11 @@ qb.setTabAlias(alias, table_name); qb.getParseInfo().setSrcForAlias(alias, tableTree); + + return alias; } - private void processSubQuery(QB qb, ASTNode subq) throws SemanticException { + private String processSubQuery(QB qb, ASTNode subq) throws SemanticException { // This is a subquery and must have an alias if (subq.getChildCount() != 2) { @@ -412,6 +403,8 @@ } // Insert this map into the stats qb.setSubqAlias(alias, qbexpr); + + return alias; } private boolean isJoinToken(ASTNode node) @@ -427,6 +420,14 @@ return false; } + /** + * Given the AST with TOK_JOIN as the root, get all the aliases for the tables + * or subqueries in the join. + * + * @param qb + * @param join + * @throws SemanticException + */ @SuppressWarnings("nls") private void processJoin(QB qb, ASTNode join) throws SemanticException { int numChildren = join.getChildCount(); @@ -436,15 +437,73 @@ for (int num = 0; num < numChildren; num++) { ASTNode child = (ASTNode) join.getChild(num); - if (child.getToken().getType() == HiveParser.TOK_TABREF) + if (child.getToken().getType() == HiveParser.TOK_TABREF) { processTable(qb, child); - else if (child.getToken().getType() == HiveParser.TOK_SUBQUERY) + } + else if (child.getToken().getType() == HiveParser.TOK_SUBQUERY) { processSubQuery(qb, child); - else if (isJoinToken(child)) + } + else if (isJoinToken(child)) { processJoin(qb, child); + } } } + + /** + * Given the AST with TOK_LATERAL_VIEW as the root, get the alias for the + * table or subquery in the lateral view and also make a mapping from the + * alias to all the lateral view AST's + * + * @param qb + * @param lateralView + * @return the alias for the table/subquery + * @throws SemanticException + */ + + private String processLateralView(QB qb, ASTNode lateralView) throws SemanticException { + int numChildren = lateralView.getChildCount(); + if(numChildren != 2) { + throw new SemanticException("Lateral view with incorrect nubmer of children"); + } + ASTNode next = (ASTNode) lateralView.getChild(1); + + String alias = null; + + switch(next.getToken().getType()) { + case HiveParser.TOK_TABREF: + alias = processTable(qb, next); + break; + case HiveParser.TOK_SUBQUERY: + alias = processSubQuery(qb, next); + break; + case HiveParser.TOK_LATERAL_VIEW: + alias = processLateralView(qb, next); + break; + default: + throw new SemanticException("Invalid 3rd child under TOK_LATERAL_VIEW"); + } + qb.getParseInfo().addLateralViewForAlias(alias, lateralView); + return alias; + } + /** + * Phase 1: (including, but not limited to): + * + * 1. Gets all the aliases for all the tables / subqueries and makes the + * appropriate mapping in aliasToTabs, aliasToSubq + * 2. Gets the location of the destination and names to caluse "inclause" + i + * 3. Creates a map from a string representation of an aggregation tree to the + * actual aggregation AST + * 4. Creates a mapping from the clause name to the select expression AST in + * destToSelExpr + * 5. Creates a mapping from a table alias to the lateral view AST's in + * aliasToLateralViews + * + * @param ast + * @param qb + * @param ctx_1 + * @throws SemanticException + */ @SuppressWarnings({"fallthrough", "nls"}) public void doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1) throws SemanticException { @@ -495,14 +554,16 @@ if (child_count != 1) throw new SemanticException("Multiple Children " + child_count); - // Check if this is a subquery + // Check if this is a subquery / lateral view ASTNode frm = (ASTNode) ast.getChild(0); - if (frm.getToken().getType() == HiveParser.TOK_TABREF) + if (frm.getToken().getType() == HiveParser.TOK_TABREF) { processTable(qb, frm); - else if (frm.getToken().getType() == HiveParser.TOK_SUBQUERY) + } else if (frm.getToken().getType() == HiveParser.TOK_SUBQUERY) { processSubQuery(qb, frm); - else if (isJoinToken(frm)) - { + } else if(frm.getToken().getType() == HiveParser.TOK_LATERAL_VIEW) { + processLateralView(qb, frm); + } + else if (isJoinToken(frm)) { processJoin(qb, frm); qbp.setJoinExpr(frm); } @@ -713,7 +774,7 @@ return false; } - + @SuppressWarnings("nls") private void parseJoinCondPopulateAlias(QBJoinTree joinTree, ASTNode condn, Vector leftAliases, Vector rightAliases, @@ -853,8 +914,8 @@ Vector rightCondAl1 = new Vector(); Vector rightCondAl2 = new Vector(); parseJoinCondPopulateAlias(joinTree, rightCondn, rightCondAl1, rightCondAl2, null); - - // is it a filter or a join condition + + // is it a filter or a join condition if (((leftCondAl1.size() != 0) && (leftCondAl2.size() != 0)) || ((rightCondAl1.size() != 0) && (rightCondAl2.size() != 0))) throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_1.getMsg(joinCond)); @@ -1357,12 +1418,18 @@ return false; } + private Operator genSelectPlan(String dest, QB qb, + Operator input) throws SemanticException { + ASTNode selExprList = qb.getParseInfo().getSelForClause(dest); + + Operator op = genSelectPlan(selExprList, qb, input); + LOG.debug("Created Select Plan for clause: " + dest); + return op; + } @SuppressWarnings("nls") - private Operator genSelectPlan(String dest, QB qb, - Operator input) throws SemanticException { + private Operator genSelectPlan(ASTNode selExprList, QB qb, + Operator input) throws SemanticException { - ASTNode selExprList = qb.getParseInfo().getSelForClause(dest); - ArrayList col_list = new ArrayList(); RowResolver out_rwsch = new RowResolver(); ASTNode trfm = null; @@ -1383,10 +1450,12 @@ trfm = (ASTNode) selExprList.getChild(posn).getChild(0); } - // Detect a UDTF by looking up the function name in the registry. - // Not as clean TRANSFORM due to the lack of a special token. + // Detect queries of the form SELECT udtf(col) AS ... + // by looking for a function as the first child, then then checking to see + // if the function is a Generic UDTF. It's not as clean as TRANSFORM due to + // the lack of a special token. boolean isUDTF = false; - String udtfOutputColAlias = null; + String udtfOutputTableAlias = null; ASTNode udtfExpr = (ASTNode) selExprList.getChild(posn).getChild(0); GenericUDTF genericUDTF = null; @@ -1410,7 +1479,8 @@ selExprList.getChild(posn).getChild(1).getType() != HiveParser.Identifier ){ throw new SemanticException(ErrorMsg.UDTF_REQUIRE_AS.getMsg()); } - udtfOutputColAlias = unescapeIdentifier(selExprList.getChild(posn).getChild(1).getText()); + udtfOutputTableAlias = unescapeIdentifier(selExprList.getChild(posn).getChild(1).getText()); + LOG.debug("udtf table alias is " + udtfOutputTableAlias); } // The list of expressions after SELECT or SELECT TRANSFORM. @@ -1425,7 +1495,7 @@ LOG.debug("genSelectPlan: input = " + inputRR.toString()); - // For UDTF's, skip the function name + // For UDTF's, skip the function name to get the expressions int startPosn = isUDTF ? posn + 1 : posn; // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM) @@ -1516,10 +1586,10 @@ } if(isUDTF) { - output = genUDTFPlan(genericUDTF, udtfOutputColAlias, qb, output); + output = genUDTFPlan(genericUDTF, udtfOutputTableAlias, qb, output); } - LOG.debug("Created Select Plan for clause: " + dest + " row schema: " + out_rwsch.toString()); - + //LOG.debug("Created Select Plan for clause: " + dest + " row schema: " + out_rwsch.toString()); + LOG.debug("row resolver for select: " + out_rwsch.toString()); return output; } @@ -2892,7 +2962,14 @@ return limitMap; } - private Operator genUDTFPlan(GenericUDTF genericUDTF, String udtfOutputColumnAlias, + static int udtfColIndex = 0; + private String getNewInternalNameForUDTFCol() { + String str = "_UDTFcol" + udtfColIndex; + udtfColIndex++; + return str; + } + + private Operator genUDTFPlan(GenericUDTF genericUDTF, String udtfOutputTableAlias, QB qb, Operator input) throws SemanticException { // No GROUP BY / DISTRIBUTE BY / SORT BY / CLUSTER BY @@ -2925,24 +3002,43 @@ colOIs[i] = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo( inputCols.get(i).getType()); } - ObjectInspector outputOI = genericUDTF.initialize(colOIs); + StructObjectInspector outputOI = genericUDTF.initialize(colOIs); + + // The output of this operator is the output struct from the GenericUDTF. + // Since there may be multiple UDTFOperators with the same GenericUDTF's in + // the DAG, make some unique internal names for the columns. - ColumnInfo outputCol = - new ColumnInfo(udtfOutputColumnAlias, - TypeInfoUtils.getTypeInfoFromObjectInspector(outputOI), null, false); + // And also generate the output column info's / row resolver using the + // new internal names. + ArrayList udtfCols = new ArrayList(); + HashMap internalNameToColAlias = new HashMap(); + HashMap colAliasToInternalName = new HashMap(); + + for(StructField sf : outputOI.getAllStructFieldRefs()) { + + String colAlias = sf.getFieldName(); + String internalName = getNewInternalNameForUDTFCol(); + + ColumnInfo col = new ColumnInfo(internalName, + TypeInfoUtils.getTypeInfoFromObjectInspector(sf.getFieldObjectInspector()), + udtfOutputTableAlias, false); + internalNameToColAlias.put(internalName, colAlias); + colAliasToInternalName.put(colAlias, internalName); + udtfCols.add(col); + } + // Create the row resolver for this operator from the output columns RowResolver out_rwsch = new RowResolver(); - out_rwsch.put( - null, - outputCol.getInternalName(), - outputCol); + for(ColumnInfo c : udtfCols ) { + out_rwsch.put(udtfOutputTableAlias, internalNameToColAlias.get(c.getInternalName()), c); + } // Add the UDTFOperator to the operator DAG - Operator udtf = + Operator udtf = putOpInsertMap(OperatorFactory.getAndMakeChild( - new udtfDesc(genericUDTF, udtfOutputColumnAlias), + new udtfDesc(genericUDTF, udtfOutputTableAlias, colAliasToInternalName), new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch); return udtf; @@ -3671,7 +3767,7 @@ filters.add(new Vector()); joinTree.setFilters(filters); - ASTNode joinCond = (ASTNode) joinParseTree.getChild(2); + ASTNode joinCond = (ASTNode) joinParseTree.getChild(2); Vector leftSrc = new Vector(); parseJoinCondition(joinTree, joinCond, leftSrc); if (leftSrc.size() == 1) @@ -4533,6 +4629,9 @@ for (String alias : qb.getTabAliases()) { aliasToOpInfo.put(alias, genTablePlan(alias, qb)); } + + // For all the source tables, make the necessary lateral views + genLateralViews(aliasToOpInfo, qb); Operator srcOpInfo = null; @@ -4565,6 +4664,87 @@ return bodyOpInfo; } + /** + * Generates the operator DAG needed to implement lateral views and attaches + * it to the TS operator. + * + * @param aliasToOpInfo A mapping from a table alias to the TS operator. This + * function replaces the operator mapping as necessary + * @param qb + * @throws SemanticException + */ + + void genLateralViews(HashMap aliasToOpInfo, QB qb) throws SemanticException { + Map> aliasToLateralViews = + qb.getParseInfo().getAliasToLateralViews(); + for(Entry e : aliasToOpInfo.entrySet()) { + String alias = e.getKey(); + // See if the alias has a lateral view. If so, chain the lateral view + // operator on + ArrayList lateralViews = aliasToLateralViews.get(alias); + if( lateralViews != null) { + Operator op = e.getValue(); + + for(ASTNode lateralViewTree : aliasToLateralViews.get(alias)) { + // There are 2 paths from the TS operator to the same LateralViewJoinOperator. + // TS -> SelectOperator (gets all cols) -> LateralViewJoinOperator + // For every lateral view: + // TS -> SelectOperator (gets cols for UDTF) -> UDTFOperator -> LateralViewJoinOperator + + // Note: The order in which the two paths are added is important. The + // lateral view join operator depends on having the select operator + // give it the row first. + + // Get the all path + RowResolver allPathRR = opParseCtx.get(op).getRR(); + Operator allPath = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new selectDesc(true), + new RowSchema(allPathRR.getColumnInfos()), + op), allPathRR); + + // Get the UDTF Path + QB blankQb = new QB(null, null, false); + Operator udtfPath = genSelectPlan((ASTNode)lateralViewTree.getChild(0), blankQb, op); + RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRR(); + + + // Merge the two into the lateral view join + + // The cols of the merged result will be the combination of both the + // cols of the UDTF path and the cols of the all path + + RowResolver lateralViewRR = new RowResolver(); + mergeRowResolvers(allPathRR, lateralViewRR); + mergeRowResolvers(udtfPathRR, lateralViewRR); + + Operator lateralViewJoin = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new lateralViewJoinDesc(), + new RowSchema(lateralViewRR.getColumnInfos()), + allPath, udtfPath), lateralViewRR); + op = lateralViewJoin; + } + e.setValue(op); + } + } + } + + /** + * A helper function that gets all the columns and respective aliases in the + * source and puts them into dest + * @param source + * @param dest + */ + private void mergeRowResolvers(RowResolver source, RowResolver dest) { + for(String tableAlias : source.getTableNames()) { + HashMap colAliasToColInfo = source.getFieldMap(tableAlias); + for(Entry entry : colAliasToColInfo.entrySet()) { + dest.put(tableAlias, entry.getKey(), entry.getValue()); + } + } + } + private Operator getReduceSink(Operator top) { if (top.getClass() == ReduceSinkOperator.class) { // Get the operator following the reduce sink @@ -4949,6 +5129,7 @@ topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer); + Optimizer optm = new Optimizer(); optm.setPctx(pCtx); optm.initialize(conf); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (working copy) @@ -127,7 +127,11 @@ } public HashMap getFieldMap(String tab_alias) { - return rslvMap.get(tab_alias.toLowerCase()); + if(tab_alias == null) { + return rslvMap.get(null); + } else { + return rslvMap.get(tab_alias.toLowerCase()); + } } public int getPosition(String internalName) { Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (working copy) @@ -21,12 +21,13 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; /** * A Generic User-defined Table Generating Function (UDTF) * - * Generates a variable number of output rows for a variable number of input - * rows. Useful for explode(array()), histograms, etc + * Generates a variable number of output rows for a single input row. Useful for + * explode(array)... */ public abstract class GenericUDTF { @@ -37,20 +38,24 @@ * instance. * * @param args An array of ObjectInspectors for the arguments - * @return ObjectInspector for the output + * @return A StructObjectInspector for output. The output struct + * represents a row of the table. The field names of the struct + * are the column names. */ - public abstract ObjectInspector initialize(ObjectInspector [] argOIs) + public abstract StructObjectInspector initialize(ObjectInspector [] argOIs) throws UDFArgumentException; /** - * Give a a set of arguments for the UDTF to process. + * Give a set of arguments for the UDTF to process. * * @param o object array of arguments */ public abstract void process(Object [] args) throws HiveException; /** - * Notify the UDTF that there are no more rows to process. + * Called to notify the UDTF that there are no more rows to process. Note + * that forward() should not be called in this function. Only clean up code + * should be run. */ public abstract void close() throws HiveException; @@ -66,7 +71,7 @@ } /** - * Passes output data to collector + * Passes an output row to the collector * * @param o * @throws HiveException Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (revision 4386) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.udf.generic; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; @@ -25,6 +26,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @description( name = "explode", @@ -39,7 +42,7 @@ } @Override - public ObjectInspector initialize(ObjectInspector [] args) throws UDFArgumentException { + public StructObjectInspector initialize(ObjectInspector [] args) throws UDFArgumentException { if(args.length != 1) { throw new UDFArgumentException("explode() takes only one argument"); @@ -50,15 +53,22 @@ } listOI = (ListObjectInspector)args[0]; - return listOI.getListElementObjectInspector(); + ArrayList fieldNames = new ArrayList(); + ArrayList fieldOIs = new ArrayList(); + // elt -> element of array + fieldNames.add("elt"); + fieldOIs.add(listOI.getListElementObjectInspector()); + return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } + Object forwardObj[] = new Object[1]; @Override public void process(Object [] o) throws HiveException { List list = listOI.getList(o[0]); for(Object r : list) { - this.forward(r); + forwardObj[0] = r; + this.forward(forwardObj); } } Index: ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java =================================================================== --- ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (revision 4386) +++ ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (working copy) @@ -28,6 +28,7 @@ public static final int REDUCESINK = 11; public static final int UNION = 12; public static final int UDTF = 13; + public static final int LATERALVIEWJOIN = 14; public static final IntRangeSet VALID_VALUES = new IntRangeSet( JOIN, @@ -43,7 +44,8 @@ FILESINK, REDUCESINK, UNION, - UDTF ); + UDTF, + LATERALVIEWJOIN ); public static final Map VALUES_TO_NAMES = new HashMap() {{ put(JOIN, "JOIN"); @@ -60,5 +62,6 @@ put(REDUCESINK, "REDUCESINK"); put(UNION, "UNION"); put(UDTF, "UDTF"); + put(LATERALVIEWJOIN, "LATERALVIEWJOIN"); }}; } Index: ql/src/gen-php/queryplan_types.php =================================================================== --- ql/src/gen-php/queryplan_types.php (revision 4386) +++ ql/src/gen-php/queryplan_types.php (working copy) @@ -50,6 +50,7 @@ 'REDUCESINK' => 11, 'UNION' => 12, 'UDTF' => 13, + 'LATERALVIEWJOIN' => 14, ); final class OperatorType { @@ -67,6 +68,7 @@ const REDUCESINK = 11; const UNION = 12; const UDTF = 13; + const LATERALVIEWJOIN = 14; static public $__names = array( 0 => 'JOIN', 1 => 'MAPJOIN', @@ -82,6 +84,7 @@ 11 => 'REDUCESINK', 12 => 'UNION', 13 => 'UDTF', + 14 => 'LATERALVIEWJOIN', ); } Index: ql/if/queryplan.thrift =================================================================== --- ql/if/queryplan.thrift (revision 4386) +++ ql/if/queryplan.thrift (working copy) @@ -16,7 +16,7 @@ } #Represents a operator along with its counters -enum OperatorType { JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF } +enum OperatorType { JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN } struct Operator { 1: string operatorId, 2: OperatorType operatorType,