Index: ql/src/gen-py/queryplan/ttypes.py =================================================================== --- ql/src/gen-py/queryplan/ttypes.py (revision 4426) +++ ql/src/gen-py/queryplan/ttypes.py (working copy) @@ -37,6 +37,7 @@ REDUCESINK = 11 UNION = 12 UDTF = 13 + LATERALVIEWJOIN = 14 class TaskType: MAP = 0 Index: ql/src/test/results/clientnegative/lateral_view_join.q.out =================================================================== --- ql/src/test/results/clientnegative/lateral_view_join.q.out (revision 0) +++ ql/src/test/results/clientnegative/lateral_view_join.q.out (revision 0) @@ -0,0 +1 @@ +FAILED: Error in semantic analysis: Join source with a lateral view is not supported Index: ql/src/test/results/clientpositive/lateral_view.q.out =================================================================== --- ql/src/test/results/clientpositive/lateral_view.q.out (revision 0) +++ ql/src/test/results/clientpositive/lateral_view.q.out (revision 0) @@ -0,0 +1,519 @@ +PREHOOK: query: EXPLAIN SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol SORT BY key ASC, myCol ASC LIMIT 1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol SORT BY key ASC, myCol ASC LIMIT 1 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol (TOK_TABALIAS myTable))) (TOK_TABREF src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_SORTBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL key)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL myCol))) (TOK_LIMIT 1))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: int + outputColumnNames: _col0, _col1, _col2 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col2 + type: int + sort order: ++ + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: int + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + function name: explode + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: int + outputColumnNames: _col0, _col1, _col2 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col2 + type: int + sort order: ++ + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: int + Reduce Operator Tree: + Extract + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/440369354/10002 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col2 + type: int + sort order: ++ + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: int + Reduce Operator Tree: + Extract + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 1 + + +PREHOOK: query: EXPLAIN SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LIMIT 3 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol (TOK_TABALIAS myTable))) (TOK_TABREF src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF myTable))) (TOK_LIMIT 3))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col2 + type: int + outputColumnNames: _col0 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + function name: explode + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col2 + type: int + outputColumnNames: _col0 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 3 + + +PREHOOK: query: EXPLAIN SELECT myTable.myCol, myTable2.myCol2 FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LATERAL VIEW explode(array('a', 'b', 'c')) myTable2 AS myCol2 LIMIT 9 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT myTable.myCol, myTable2.myCol2 FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LATERAL VIEW explode(array('a', 'b', 'c')) myTable2 AS myCol2 LIMIT 9 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 'a' 'b' 'c')) myCol2 (TOK_TABALIAS myTable2))) (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol (TOK_TABALIAS myTable))) (TOK_TABREF src)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL myTable) myCol)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL myTable2) myCol2))) (TOK_LIMIT 9))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: int + outputColumnNames: _col0, _col1, _col2 + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col2 + type: int + expr: _col3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array('a','b','c') + type: array + outputColumnNames: _col0 + UDTF Operator + function name: explode + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col2 + type: int + expr: _col3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + function name: explode + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: int + outputColumnNames: _col0, _col1, _col2 + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col2 + type: int + expr: _col3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array('a','b','c') + type: array + outputColumnNames: _col0 + UDTF Operator + function name: explode + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col2 + type: int + expr: _col3 + type: string + outputColumnNames: _col0, _col1 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 9 + + +PREHOOK: query: EXPLAIN SELECT myTable2.* FROM src LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2 LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT myTable2.* FROM src LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2 LIMIT 3 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (. (TOK_TABLE_OR_COL myTable) myCol)) myCol2 (TOK_TABALIAS myTable2))) (TOK_LATERAL_VIEW (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array (TOK_FUNCTION array 1 2 3))) myCol (TOK_TABALIAS myTable))) (TOK_TABREF src)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF myTable2))) (TOK_LIMIT 3))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: array + outputColumnNames: _col0, _col1, _col2 + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col3 + type: int + outputColumnNames: _col0 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: _col2 + type: array + outputColumnNames: _col0 + UDTF Operator + function name: explode + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col3 + type: int + outputColumnNames: _col0 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: array(array(1,2,3)) + type: array> + outputColumnNames: _col0 + UDTF Operator + function name: explode + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: array + outputColumnNames: _col0, _col1, _col2 + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col3 + type: int + outputColumnNames: _col0 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + Select Operator + expressions: + expr: _col2 + type: array + outputColumnNames: _col0 + UDTF Operator + function name: explode + Lateral View Join Operator + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col3 + type: int + outputColumnNames: _col0 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: 3 + + +PREHOOK: query: -- Verify that * selects columns from both tables +SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol SORT BY key ASC, myCol ASC LIMIT 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1522486935/10000 +POSTHOOK: query: -- Verify that * selects columns from both tables +SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol SORT BY key ASC, myCol ASC LIMIT 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1522486935/10000 +0 val_0 1 +PREHOOK: query: -- TABLE.* should be supported +SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/299005410/10000 +POSTHOOK: query: -- TABLE.* should be supported +SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/299005410/10000 +1 +2 +3 +PREHOOK: query: -- Multiple lateral views should result in a Cartesian product +SELECT myTable.myCol, myTable2.myCol2 FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LATERAL VIEW explode(array('a', 'b', 'c')) myTable2 AS myCol2 LIMIT 9 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1806900989/10000 +POSTHOOK: query: -- Multiple lateral views should result in a Cartesian product +SELECT myTable.myCol, myTable2.myCol2 FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LATERAL VIEW explode(array('a', 'b', 'c')) myTable2 AS myCol2 LIMIT 9 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1806900989/10000 +1 a +1 b +1 c +2 a +2 b +2 c +3 a +3 b +3 c +PREHOOK: query: -- Should be able to reference tables generated earlier +SELECT myTable2.* FROM src LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2 LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/287122168/10000 +POSTHOOK: query: -- Should be able to reference tables generated earlier +SELECT myTable2.* FROM src LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2 LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/287122168/10000 +1 +2 +3 Index: ql/src/test/results/clientpositive/udtf_explode.q.out =================================================================== --- ql/src/test/results/clientpositive/udtf_explode.q.out (revision 4426) +++ ql/src/test/results/clientpositive/udtf_explode.q.out (working copy) @@ -22,25 +22,24 @@ type: array outputColumnNames: _col0 UDTF Operator - outputColumnName: myCol function name: explode Limit File Output Operator compressed: false GlobalTableId: 0 - directory: file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/2089365960/10001 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1915461006/10001 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - columns myCol + columns col serialization.format 1 columns.types int Needs Tagging: false Path -> Alias: - file:/data/users/heyongqiang/hive-trunk-commit/build/ql/test/data/warehouse/src [src] + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [src] Path -> Partition: - file:/data/users/heyongqiang/hive-trunk-commit/build/ql/test/data/warehouse/src + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src Partition input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -54,7 +53,8 @@ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - location file:/data/users/heyongqiang/hive-trunk-commit/build/ql/test/data/warehouse/src + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1260405571 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat @@ -69,8 +69,8 @@ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - location file:/data/users/heyongqiang/hive-trunk-commit/build/ql/test/data/warehouse/src - transient_lastDdlTime 1258713613 + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1260405571 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: src name: src @@ -80,12 +80,12 @@ limit: 3 -PREHOOK: query: EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: query: EXPLAIN EXTENDED SELECT a.myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.myCol PREHOOK: type: QUERY -POSTHOOK: query: EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +POSTHOOK: query: EXPLAIN EXTENDED SELECT a.myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.myCol POSTHOOK: type: QUERY ABSTRACT SYNTAX TREE: - (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL myCol)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (TOK_TABLE_OR_COL myCol)))) + (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) myCol)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL a) myCol)))) STAGE DEPENDENCIES: Stage-1 is a root stage @@ -105,20 +105,19 @@ type: array outputColumnNames: _col0 UDTF Operator - outputColumnName: myCol function name: explode Limit Reduce Output Operator sort order: tag: -1 value expressions: - expr: myCol + expr: col type: int Needs Tagging: false Path -> Alias: - file:/data/users/heyongqiang/hive-trunk-commit/build/ql/test/data/warehouse/src [a:src] + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [a:src] Path -> Partition: - file:/data/users/heyongqiang/hive-trunk-commit/build/ql/test/data/warehouse/src + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src Partition input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -132,7 +131,8 @@ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - location file:/data/users/heyongqiang/hive-trunk-commit/build/ql/test/data/warehouse/src + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1260405571 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat @@ -147,8 +147,8 @@ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - location file:/data/users/heyongqiang/hive-trunk-commit/build/ql/test/data/warehouse/src - transient_lastDdlTime 1258713613 + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1260405571 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: src name: src @@ -172,7 +172,7 @@ File Output Operator compressed: false GlobalTableId: 0 - directory: file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/1309593257/10002 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/910965936/10002 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -184,7 +184,7 @@ Stage: Stage-2 Map Reduce Alias -> Map Operator Tree: - file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/1309593257/10002 + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/910965936/10002 Reduce Output Operator key expressions: expr: _col0 @@ -199,9 +199,9 @@ type: bigint Needs Tagging: false Path -> Alias: - file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/1309593257/10002 [file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/1309593257/10002] + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/910965936/10002 [file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/910965936/10002] Path -> Partition: - file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/1309593257/10002 + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/910965936/10002 Partition input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -236,7 +236,7 @@ File Output Operator compressed: false GlobalTableId: 0 - directory: file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/1309593257/10001 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/910965936/10001 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -253,22 +253,33 @@ PREHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 PREHOOK: type: QUERY PREHOOK: Input: default@src -PREHOOK: Output: file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/99085211/10000 +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1857255630/10000 POSTHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 POSTHOOK: type: QUERY POSTHOOK: Input: default@src -POSTHOOK: Output: file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/99085211/10000 +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1857255630/10000 1 2 3 -PREHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: query: SELECT explode(array(1,2,3)) AS (myCol) FROM src LIMIT 3 PREHOOK: type: QUERY PREHOOK: Input: default@src -PREHOOK: Output: file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/1717351634/10000 -POSTHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/127817775/10000 +POSTHOOK: query: SELECT explode(array(1,2,3)) AS (myCol) FROM src LIMIT 3 POSTHOOK: type: QUERY POSTHOOK: Input: default@src -POSTHOOK: Output: file:/data/users/heyongqiang/hive-trunk-commit/build/ql/tmp/1717351634/10000 +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/127817775/10000 +1 +2 +3 +PREHOOK: query: SELECT a.myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.myCol +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1954512145/10000 +POSTHOOK: query: SELECT a.myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.myCol +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1954512145/10000 1 1 2 1 3 1 Index: ql/src/test/queries/clientnegative/lateral_view_join.q =================================================================== --- ql/src/test/queries/clientnegative/lateral_view_join.q (revision 0) +++ ql/src/test/queries/clientnegative/lateral_view_join.q (revision 0) @@ -0,0 +1 @@ +SELECT src.key FROM src LATERAL VIEW explode(array(1,2,3)) AS myTable JOIN src b ON src.key; Index: ql/src/test/queries/clientpositive/lateral_view.q =================================================================== --- ql/src/test/queries/clientpositive/lateral_view.q (revision 0) +++ ql/src/test/queries/clientpositive/lateral_view.q (revision 0) @@ -0,0 +1,13 @@ +EXPLAIN SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol SORT BY key ASC, myCol ASC LIMIT 1; +EXPLAIN SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LIMIT 3; +EXPLAIN SELECT myTable.myCol, myTable2.myCol2 FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LATERAL VIEW explode(array('a', 'b', 'c')) myTable2 AS myCol2 LIMIT 9; +EXPLAIN SELECT myTable2.* FROM src LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2 LIMIT 3; + +-- Verify that * selects columns from both tables +SELECT * FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol SORT BY key ASC, myCol ASC LIMIT 1; +-- TABLE.* should be supported +SELECT myTable.* FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LIMIT 3; +-- Multiple lateral views should result in a Cartesian product +SELECT myTable.myCol, myTable2.myCol2 FROM src LATERAL VIEW explode(array(1,2,3)) myTable AS myCol LATERAL VIEW explode(array('a', 'b', 'c')) myTable2 AS myCol2 LIMIT 9; +-- Should be able to reference tables generated earlier +SELECT myTable2.* FROM src LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2 LIMIT 3; Index: ql/src/test/queries/clientpositive/udtf_explode.q =================================================================== --- ql/src/test/queries/clientpositive/udtf_explode.q (revision 4426) +++ ql/src/test/queries/clientpositive/udtf_explode.q (working copy) @@ -1,5 +1,6 @@ EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; -EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; +EXPLAIN EXTENDED SELECT a.myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.myCol; SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; -SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; +SELECT explode(array(1,2,3)) AS (myCol) FROM src LIMIT 3; +SELECT a.myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY a.myCol; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -253,8 +254,8 @@ // If one of my children is a FileSink or Script, return all columns. // Without this break, a bug in ReduceSink to Extract edge column pruning will manifest // which should be fixed before remove this - if ((child instanceof FileSinkOperator) - || (child instanceof ScriptOperator) || (child instanceof UDTFOperator) + if ((child instanceof FileSinkOperator) || (child instanceof UDTFOperator) + || (child instanceof ScriptOperator) || (child instanceof LateralViewJoinOperator) || (child instanceof LimitOperator) || (child instanceof UnionOperator)) { cppCtx.getPrunedColLists().put(op, cppCtx.getColsFromSelectExpr(op)); return null; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (revision 0) @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.lateralViewJoinDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + + +/** + * The lateral view join operator is used to implement the lateral view + * functionality. This operator was implemented with the following + * operator DAG in mind. For a query such as + * + * SELECT pageid, adid.* FROM example_table LATERAL VIEW explode(adid_list) AS adid + * + * The top of the operator tree will look similar to + * + * [Table Scan] + * / \ + * [Select](*) [Select](adid_list) + * | | + * | [UDTF] (explode) + * \ / + * [Lateral View Join] + * | + * | + * [Select] (pageid, adid.*) + * | + * .... + * + * Rows from the table scan operator are first sent to two select operators. + * The select operator on the left picks all the columns while the select + * operator on the right picks only the columns needed by the UDTF. + * + * The output of select in the left branch and output of the UDTF in the right + * branch are then sent to the lateral view join (LVJ). In most cases, the UDTF + * will generate > 1 row for every row received from the TS, while the left + * select operator will generate only one. For each row output from the TS, + * the LVJ outputs all possible rows that can be created by joining the row from + * the left select and one of the rows output from the UDTF. + * + * Additional lateral views can be supported by adding a similar DAG after the + * previous LVJ operator. + */ + +public class LateralViewJoinOperator extends Operator { + + private static final long serialVersionUID = 1L; + + // The expected tags from the parent operators. See processOp() before + // changing the tags. + static final int SELECT_TAG = 0; + static final int UDTF_TAG = 1; + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + + ArrayList ois = new ArrayList(); + ArrayList fieldNames = conf.getOutputInternalColNames(); + + // The output of the lateral view join will be the columns from the select + // parent, followed by the column from the UDTF parent + StructObjectInspector soi = + (StructObjectInspector) inputObjInspectors[SELECT_TAG]; + List sfs = soi.getAllStructFieldRefs(); + for (StructField sf : sfs) { + ois.add(sf.getFieldObjectInspector()); + } + + soi = (StructObjectInspector) inputObjInspectors[UDTF_TAG]; + sfs = soi.getAllStructFieldRefs(); + for (StructField sf : sfs) { + ois.add(sf.getFieldObjectInspector()); + } + + outputObjInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, ois); + + // Initialize the rest of the operator DAG + super.initializeOp(hconf); + } + + // acc is short for accumulator. It's used to build the row before forwarding + ArrayList acc = new ArrayList(); + // selectObjs hold the row from the select op, until receiving a row from + // the udtf op + ArrayList selectObjs = new ArrayList(); + /** + * An important assumption for processOp() is that for a given row from the + * TS, the LVJ will first get the row from the left select operator, followed + * by all the corresponding rows from the UDTF operator. And so on. + */ + @Override + public void processOp(Object row, int tag) throws HiveException { + StructObjectInspector soi = (StructObjectInspector)inputObjInspectors[tag]; + if (tag == SELECT_TAG) { + selectObjs.clear(); + selectObjs.addAll(soi.getStructFieldsDataAsList(row)); + } else if (tag == UDTF_TAG) { + acc.clear(); + acc.addAll(selectObjs); + acc.addAll(soi.getStructFieldsDataAsList(row)); + forward(acc, outputObjInspector); + } else { + throw new HiveException("Invalid tag"); + } + + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; public class UDTFOperator extends Operator implements Serializable { private static final long serialVersionUID = 1L; @@ -52,9 +53,10 @@ * sends periodic reports back to the tracker. */ transient AutoProgressor autoProgressor; - + transient boolean closeCalled = false; + protected void initializeOp(Configuration hconf) throws HiveException { - conf.getUDTF().setCollector(new UDTFCollector(this)); + conf.getGenericUDTF().setCollector(new UDTFCollector(this)); // Make an object inspector [] of the arguments to the UDTF List inputFields = @@ -62,18 +64,14 @@ udtfInputOIs = new ObjectInspector[inputFields.size()]; for (int i=0; i colNames = new ArrayList(); - ArrayList colOIs = new ArrayList(); - colNames.add(conf.getOutputColName()); - colOIs.add(udtfOutputOI); - outputObjInspector = - ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colOIs); + // Since we're passing the object output by the UDTF directly to the next + // operator, we can use the same OI. + outputObjInspector = udtfOutputOI; // Set up periodic progress reporting in case the UDTF doesn't output rows // for a while @@ -97,22 +95,23 @@ objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i)); } - conf.getUDTF().process(objToSendToUDTF); + conf.getGenericUDTF().process(objToSendToUDTF); } /** * forwardUDTFOutput is typically called indirectly by the GenericUDTF when - * the GenericUDTF has generated output data that should be passed on to the + * the GenericUDTF has generated output rows that should be passed on to the * next operator(s) in the DAG. * * @param o * @throws HiveException */ public void forwardUDTFOutput(Object o) throws HiveException { - // Now that we have the data from the UDTF, repack it into an object[] as - // the output should be inspectable by a struct OI - forwardObj[0] = o; - forward(forwardObj, outputObjInspector); + if (closeCalled) { + throw new HiveException("UDTF's should not output rows on close"); + } + // Since the output of the UDTF is a struct, we can just forward that + forward(o, outputObjInspector); } public String getName() { @@ -124,6 +123,7 @@ } protected void closeOp(boolean abort) throws HiveException { - conf.getUDTF().close(); + closeCalled = true; + conf.getGenericUDTF().close(); } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -55,6 +55,7 @@ opvec.add(new opTuple (tableScanDesc.class, TableScanOperator.class)); opvec.add(new opTuple (unionDesc.class, UnionOperator.class)); opvec.add(new opTuple (udtfDesc.class, UDTFOperator.class)); + opvec.add(new opTuple(lateralViewJoinDesc.class, LateralViewJoinOperator.class)); } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (working copy) @@ -20,40 +20,35 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +/** + * All member variables should have a setters and getters of the form + * get and set or else they won't be recreated + * properly at run time. + * + */ @explain(displayName="UDTF Operator") public class udtfDesc implements Serializable { private static final long serialVersionUID = 1L; private GenericUDTF genericUDTF; - private String outputColName; - + public udtfDesc() { } - public udtfDesc(final GenericUDTF genericUDTF, String outputColName) { + public udtfDesc(final GenericUDTF genericUDTF) { this.genericUDTF = genericUDTF; - this.outputColName = outputColName; } - public GenericUDTF getUDTF() { + public GenericUDTF getGenericUDTF() { return this.genericUDTF; } - public void setUDTF(final GenericUDTF genericUDTF) { + public void setGenericUDTF(final GenericUDTF genericUDTF) { this.genericUDTF=genericUDTF; } @explain(displayName="function name") public String getUDTFName() { return this.genericUDTF.toString(); } - - @explain(displayName="outputColumnName") - public String getOutputColName() { - return this.outputColName; - } - - public void setOutputColName(String outputColName) { - this.outputColName = outputColName; - } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/lateralViewJoinDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/lateralViewJoinDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/lateralViewJoinDesc.java (revision 0) @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; + +@explain(displayName="Lateral View Join Operator") +public class lateralViewJoinDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private ArrayList outputInternalColNames; + public lateralViewJoinDesc() { + } + public lateralViewJoinDesc(ArrayList outputInternalColNames) { + this.outputInternalColNames = outputInternalColNames; + } + public void setOutputInternalColNames( + ArrayList outputInternalColNames) { + this.outputInternalColNames = outputInternalColNames; + } + @explain(displayName="outputColumnNames") + public ArrayList getOutputInternalColNames() { + return this.outputInternalColNames; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (working copy) @@ -57,6 +57,12 @@ private HashMap destToSortby; + /** + * Maping from table/subquery aliases to all the associated lateral view + * nodes + */ + private HashMap> aliasToLateralViews; + /* Order by clause */ private HashMap destToOrderby; private HashMap destToLimit; @@ -88,6 +94,8 @@ this.alias = alias; this.isSubQ = isSubQ; this.outerQueryLimit = -1; + + this.aliasToLateralViews = new HashMap>(); } public void setAggregationExprsForClause(String clause, LinkedHashMap aggregationTrees) { @@ -290,7 +298,8 @@ (joinExpr != null) || (!nameToSample.isEmpty()) || (!destToGroupby.isEmpty()) || - (!destToClusterby.isEmpty())) + (!destToClusterby.isEmpty()) || + (!aliasToLateralViews.isEmpty())) return false; Iterator>> aggrIter = destToAggregationExprs.entrySet().iterator(); @@ -343,4 +352,21 @@ public ASTNode getHints() { return hints; } + + public Map> getAliasToLateralViews() { + return this.aliasToLateralViews; + } + public List getLateralViewsForAlias(String alias) { + return aliasToLateralViews.get(alias.toLowerCase()); + } + + public void addLateralViewForAlias(String alias, ASTNode lateralView) { + String lowerAlias = alias.toLowerCase(); + ArrayList lateralViews = aliasToLateralViews.get(lowerAlias); + if (lateralViews == null) { + lateralViews = new ArrayList(); + aliasToLateralViews.put(alias, lateralViews); + } + lateralViews.add(lateralView); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -146,6 +146,8 @@ TOK_RECORDREADER; TOK_RECORDWRITER; TOK_LEFTSEMIJOIN; +TOK_LATERAL_VIEW; +TOK_TABALIAS; } @@ -819,7 +821,7 @@ @init { msgs.push("selection target"); } @after { msgs.pop(); } : - ( selectExpression (KW_AS? Identifier)?) -> ^(TOK_SELEXPR selectExpression Identifier?) + ( selectExpression ((KW_AS? Identifier) | (KW_AS LPAREN Identifier (COMMA Identifier)* RPAREN))?) -> ^(TOK_SELEXPR selectExpression Identifier*) ; trfmClause @@ -835,7 +837,7 @@ outSerde=rowFormat outRec=recordReader -> ^(TOK_TRANSFORM selectExpressionList $inSerde $inRec StringLiteral $outSerde $outRec aliasList? columnNameTypeList?) ; - + selectExpression @init { msgs.push("select expression"); } @after { msgs.pop(); } @@ -926,11 +928,25 @@ | KW_LEFT KW_SEMI KW_JOIN -> TOK_LEFTSEMIJOIN ; +lateralView +@init {msgs.push("lateral view"); } +@after {msgs.pop(); } + : + KW_LATERAL KW_VIEW function tableAlias KW_AS Identifier (COMMA Identifier)* -> ^(TOK_LATERAL_VIEW ^(TOK_SELECT ^(TOK_SELEXPR function Identifier+ tableAlias))) + ; + +tableAlias +@init {msgs.push("table alias"); } +@after {msgs.pop(); } + : + Identifier -> ^(TOK_TABALIAS Identifier) + ; + fromSource @init { msgs.push("from source"); } @after { msgs.pop(); } : - (tableSource | subQuerySource) + (tableSource | subQuerySource) (lateralView^)* ; tableSample @@ -1469,8 +1485,8 @@ KW_RECORDREADER: 'RECORDREADER'; KW_RECORDWRITER: 'RECORDWRITER'; KW_SEMI: 'SEMI'; +KW_LATERAL: 'LATERAL'; - // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (working copy) @@ -113,8 +113,12 @@ UDTF_NO_SORT_BY("SORT BY is not supported with a UDTF in the SELECT clause"), UDTF_NO_CLUSTER_BY("CLUSTER BY is not supported with a UDTF in the SELECT clause"), UDTF_NO_DISTRIBUTE_BY("DISTRUBTE BY is not supported with a UDTF in the SELECT clause"), - UDTF_INVALID_LOCATION("UDTF's are not supported outside the SELECT clause, nor nested in expressions"); - + UDTF_INVALID_LOCATION("UDTF's are not supported outside the SELECT clause, nor nested in expressions"), + UDTF_LATERAL_VIEW("UDTF's cannot be in a select expression when there is a lateral view"), + UDTF_ALIAS_MISMATCH("The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF"), + LATERAL_VIEW_WITH_JOIN("Join with a lateral view is not supported"), + LATERAL_VIEW_INVALID_CHILD("Lateral view AST with invalid child"), + INVALID_AS("AS clause has an invalid number of aliases"); private String mesg; private String SQLState; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -29,9 +29,9 @@ import java.util.Set; import java.util.TreeSet; import java.util.Vector; +import java.util.Map.Entry; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; -import java.lang.ClassNotFoundException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; @@ -60,7 +61,11 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -73,22 +78,26 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; -import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.GenMROperator; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2; +import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; +import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink4; import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1; import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.Optimizer; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; -import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; -import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink4; +import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.aggregationDesc; +import org.apache.hadoop.hive.ql.plan.createTableDesc; +import org.apache.hadoop.hive.ql.plan.createTableLikeDesc; import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.exprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.exprNodeDesc; @@ -101,6 +110,7 @@ import org.apache.hadoop.hive.ql.plan.forwardDesc; import org.apache.hadoop.hive.ql.plan.groupByDesc; import org.apache.hadoop.hive.ql.plan.joinDesc; +import org.apache.hadoop.hive.ql.plan.lateralViewJoinDesc; import org.apache.hadoop.hive.ql.plan.limitDesc; import org.apache.hadoop.hive.ql.plan.loadFileDesc; import org.apache.hadoop.hive.ql.plan.loadTableDesc; @@ -114,20 +124,18 @@ import org.apache.hadoop.hive.ql.plan.tableScanDesc; import org.apache.hadoop.hive.ql.plan.udtfDesc; import org.apache.hadoop.hive.ql.plan.unionDesc; -import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -136,33 +144,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.ql.exec.TextRecordReader; -import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; - -import java.util.regex.Matcher; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.createTableDesc; -import org.apache.hadoop.hive.ql.plan.createTableLikeDesc; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.FetchOperator; -import java.util.Collection; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; - /** * Implementation of the semantic analyzer */ @@ -345,7 +327,14 @@ return expr; } - private void processTable(QB qb, ASTNode tabref) throws SemanticException { + /** + * Goes though the tabref tree and finds the alias for the table. Once found, + * it records the table name-> alias association in aliasToTabs. It also makes + * an association from the alias to the table AST in parse info. + * + * @return the alias of the table + */ + private String processTable(QB qb, ASTNode tabref) throws SemanticException { // For each table reference get the table name // and the alias (if alias is not present, the table name // is used as an alias) @@ -398,9 +387,11 @@ qb.setTabAlias(alias, table_name); qb.getParseInfo().setSrcForAlias(alias, tableTree); + + return alias; } - private void processSubQuery(QB qb, ASTNode subq) throws SemanticException { + private String processSubQuery(QB qb, ASTNode subq) throws SemanticException { // This is a subquery and must have an alias if (subq.getChildCount() != 2) { @@ -420,6 +411,8 @@ } // Insert this map into the stats qb.setSubqAlias(alias, qbexpr); + + return alias; } private boolean isJoinToken(ASTNode node) @@ -435,6 +428,14 @@ return false; } + /** + * Given the AST with TOK_JOIN as the root, get all the aliases for the tables + * or subqueries in the join. + * + * @param qb + * @param join + * @throws SemanticException + */ @SuppressWarnings("nls") private void processJoin(QB qb, ASTNode join) throws SemanticException { int numChildren = join.getChildCount(); @@ -444,15 +445,79 @@ for (int num = 0; num < numChildren; num++) { ASTNode child = (ASTNode) join.getChild(num); - if (child.getToken().getType() == HiveParser.TOK_TABREF) + if (child.getToken().getType() == HiveParser.TOK_TABREF) { processTable(qb, child); - else if (child.getToken().getType() == HiveParser.TOK_SUBQUERY) + } else if (child.getToken().getType() == HiveParser.TOK_SUBQUERY) { processSubQuery(qb, child); - else if (isJoinToken(child)) + } else if (child.getToken().getType() == HiveParser.TOK_LATERAL_VIEW) { + // SELECT * FROM src1 LATERAL VIEW udtf() AS myTable JOIN src2 ... + // is not supported. Instead, the lateral view must be in a subquery + // SELECT * FROM (SELECT * FROM src1 LATERAL VIEW udtf() AS myTable) a + // JOIN src2 ... + throw new + SemanticException(ErrorMsg.LATERAL_VIEW_WITH_JOIN.getMsg(join)); + } else if (isJoinToken(child)) { processJoin(qb, child); + } } } + + /** + * Given the AST with TOK_LATERAL_VIEW as the root, get the alias for the + * table or subquery in the lateral view and also make a mapping from the + * alias to all the lateral view AST's + * + * @param qb + * @param lateralView + * @return the alias for the table/subquery + * @throws SemanticException + */ + + private String processLateralView(QB qb, ASTNode lateralView) + throws SemanticException { + int numChildren = lateralView.getChildCount(); + + assert(numChildren == 2); + ASTNode next = (ASTNode) lateralView.getChild(1); + + String alias = null; + + switch(next.getToken().getType()) { + case HiveParser.TOK_TABREF: + alias = processTable(qb, next); + break; + case HiveParser.TOK_SUBQUERY: + alias = processSubQuery(qb, next); + break; + case HiveParser.TOK_LATERAL_VIEW: + alias = processLateralView(qb, next); + break; + default: + throw new SemanticException( + ErrorMsg.LATERAL_VIEW_INVALID_CHILD.getMsg(lateralView)); + } + qb.getParseInfo().addLateralViewForAlias(alias, lateralView); + return alias; + } + /** + * Phase 1: (including, but not limited to): + * + * 1. Gets all the aliases for all the tables / subqueries and makes the + * appropriate mapping in aliasToTabs, aliasToSubq + * 2. Gets the location of the destination and names the clase "inclause" + i + * 3. Creates a map from a string representation of an aggregation tree to the + * actual aggregation AST + * 4. Creates a mapping from the clause name to the select expression AST in + * destToSelExpr + * 5. Creates a mapping from a table alias to the lateral view AST's in + * aliasToLateralViews + * + * @param ast + * @param qb + * @param ctx_1 + * @throws SemanticException + */ @SuppressWarnings({"fallthrough", "nls"}) public void doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1) throws SemanticException { @@ -503,14 +568,15 @@ if (child_count != 1) throw new SemanticException("Multiple Children " + child_count); - // Check if this is a subquery + // Check if this is a subquery / lateral view ASTNode frm = (ASTNode) ast.getChild(0); - if (frm.getToken().getType() == HiveParser.TOK_TABREF) + if (frm.getToken().getType() == HiveParser.TOK_TABREF) { processTable(qb, frm); - else if (frm.getToken().getType() == HiveParser.TOK_SUBQUERY) + } else if (frm.getToken().getType() == HiveParser.TOK_SUBQUERY) { processSubQuery(qb, frm); - else if (isJoinToken(frm)) - { + } else if (frm.getToken().getType() == HiveParser.TOK_LATERAL_VIEW) { + processLateralView(qb, frm); + } else if (isJoinToken(frm)) { processJoin(qb, frm); qbp.setJoinExpr(frm); } @@ -721,7 +787,7 @@ return false; } - + @SuppressWarnings("nls") private void parseJoinCondPopulateAlias(QBJoinTree joinTree, ASTNode condn, Vector leftAliases, Vector rightAliases, @@ -774,7 +840,7 @@ ArrayList fields1 = null; // if it is a dot operator, remember the field name of the rhs of the left semijoin if (joinTree.getNoSemiJoin() == false && - condn.getToken().getText().equals("." )) { + condn.getToken().getType() == HiveParser.DOT) { // get the semijoin rhs table name and field name fields1 = new ArrayList(); int rhssize = rightAliases.size(); @@ -1365,12 +1431,19 @@ return false; } + private Operator genSelectPlan(String dest, QB qb, + Operator input) throws SemanticException { + ASTNode selExprList = qb.getParseInfo().getSelForClause(dest); + + Operator op = genSelectPlan(selExprList, qb, input); + LOG.debug("Created Select Plan for clause: " + dest); + return op; + } @SuppressWarnings("nls") - private Operator genSelectPlan(String dest, QB qb, - Operator input) throws SemanticException { - - ASTNode selExprList = qb.getParseInfo().getSelForClause(dest); - + private Operator genSelectPlan(ASTNode selExprList, QB qb, + Operator input) throws SemanticException { + LOG.debug("tree: " + selExprList.toStringTree()); + ArrayList col_list = new ArrayList(); RowResolver out_rwsch = new RowResolver(); ASTNode trfm = null; @@ -1391,16 +1464,20 @@ trfm = (ASTNode) selExprList.getChild(posn).getChild(0); } - // Detect a UDTF by looking up the function name in the registry. - // Not as clean TRANSFORM due to the lack of a special token. + // Detect queries of the form SELECT udtf(col) AS ... + // by looking for a function as the first child, and then checking to see + // if the function is a Generic UDTF. It's not as clean as TRANSFORM due to + // the lack of a special token. boolean isUDTF = false; - String udtfOutputColAlias = null; + String udtfTableAlias = null; + ArrayList udtfColAliases = new ArrayList(); ASTNode udtfExpr = (ASTNode) selExprList.getChild(posn).getChild(0); GenericUDTF genericUDTF = null; if (udtfExpr.getType() == HiveParser.TOK_FUNCTION) { String funcName = - TypeCheckProcFactory.DefaultExprProcessor.getFunctionText(udtfExpr, true); + TypeCheckProcFactory.DefaultExprProcessor.getFunctionText( + udtfExpr, true); FunctionInfo fi = FunctionRegistry.getFunctionInfo(funcName); if (fi != null) { genericUDTF = fi.getGenericUDTF(); @@ -1413,13 +1490,31 @@ if (selExprList.getChildCount() > 1) { throw new SemanticException(ErrorMsg.UDTF_MULTIPLE_EXPR.getMsg()); } - //Require an AS for UDTFs - if (((ASTNode) selExprList.getChild(posn)).getChildCount() != 2 || - selExprList.getChild(posn).getChild(1).getType() != HiveParser.Identifier ){ + // Require an AS for UDTFs for column aliases + ASTNode selExpr = (ASTNode) selExprList.getChild(posn); + if (selExpr.getChildCount() < 2) { throw new SemanticException(ErrorMsg.UDTF_REQUIRE_AS.getMsg()); } - udtfOutputColAlias = unescapeIdentifier(selExprList.getChild(posn).getChild(1).getText()); - } + // Get the column / table aliases from the expression. Start from 1 as + // 0 is the TOK_FUNCTION + for (int i=1; i colAliases, QB qb, Operator input) + throws SemanticException { // No GROUP BY / DISTRIBUTE BY / SORT BY / CLUSTER BY QBParseInfo qbp = qb.getParseInfo(); @@ -2921,7 +3023,12 @@ if (!qbp.getDestToClusterBy().isEmpty()) { throw new SemanticException(ErrorMsg.UDTF_NO_CLUSTER_BY.getMsg()); } + if (!qbp.getAliasToLateralViews().isEmpty()) { + throw new SemanticException(ErrorMsg.UDTF_LATERAL_VIEW.getMsg()); + } + LOG.debug("Table alias: " + outputTableAlias + " Col aliases: " + colAliases); + // Use the RowResolver from the input operator to generate a input // ObjectInspector that can be used to initialize the UDTF. Then, the // resulting output object inspector can be used to make the RowResolver @@ -2937,24 +3044,47 @@ colOIs[i] = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo( inputCols.get(i).getType()); } - ObjectInspector outputOI = genericUDTF.initialize(colOIs); + StructObjectInspector outputOI = genericUDTF.initialize(colOIs); + + // Make sure that the number of column aliases in the AS clause matches + // the number of columns output by the UDTF + int numUdtfCols = outputOI.getAllStructFieldRefs().size(); + int numSuppliedAliases = colAliases.size(); + if (numUdtfCols != numSuppliedAliases) { + throw new SemanticException(ErrorMsg.UDTF_ALIAS_MISMATCH.getMsg( + "expected " + numUdtfCols + " aliases" + + "but got " + numSuppliedAliases)); + } - ColumnInfo outputCol = - new ColumnInfo(udtfOutputColumnAlias, - TypeInfoUtils.getTypeInfoFromObjectInspector(outputOI), null, false); + // Generate the output column info's / row resolver using internal names. + ArrayList udtfCols = new ArrayList(); + Iterator colAliasesIter = colAliases.iterator(); + for (StructField sf : outputOI.getAllStructFieldRefs()) { + + String colAlias = colAliasesIter.next(); + assert(colAlias != null); + + // Since the UDTF operator feeds into a LVJ operator that will rename + // all the internal names, we can just use field name from the UDTF's OI + // as the internal name + ColumnInfo col = new ColumnInfo(sf.getFieldName(), + TypeInfoUtils.getTypeInfoFromObjectInspector( + sf.getFieldObjectInspector()), + outputTableAlias, false); + udtfCols.add(col); + } + // Create the row resolver for this operator from the output columns RowResolver out_rwsch = new RowResolver(); - - out_rwsch.put( - null, - outputCol.getInternalName(), - outputCol); + for (int i=0; i udtf = putOpInsertMap(OperatorFactory.getAndMakeChild( - new udtfDesc(genericUDTF, udtfOutputColumnAlias), + new udtfDesc(genericUDTF), new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch); return udtf; @@ -3683,7 +3813,7 @@ filters.add(new Vector()); joinTree.setFilters(filters); - ASTNode joinCond = (ASTNode) joinParseTree.getChild(2); + ASTNode joinCond = (ASTNode) joinParseTree.getChild(2); Vector leftSrc = new Vector(); parseJoinCondition(joinTree, joinCond, leftSrc); if (leftSrc.size() == 1) @@ -4545,6 +4675,10 @@ for (String alias : qb.getTabAliases()) { aliasToOpInfo.put(alias, genTablePlan(alias, qb)); } + + // For all the source tables that have a lateral view, attach the + // appropriate operators to the TS + genLateralViewPlans(aliasToOpInfo, qb); Operator srcOpInfo = null; @@ -4577,6 +4711,119 @@ return bodyOpInfo; } + /** + * Generates the operator DAG needed to implement lateral views and attaches + * it to the TS operator. + * + * @param aliasToOpInfo A mapping from a table alias to the TS operator. This + * function replaces the operator mapping as necessary + * @param qb + * @throws SemanticException + */ + + void genLateralViewPlans(HashMap aliasToOpInfo, QB qb) + throws SemanticException { + Map> aliasToLateralViews = + qb.getParseInfo().getAliasToLateralViews(); + for (Entry e : aliasToOpInfo.entrySet()) { + String alias = e.getKey(); + // See if the alias has a lateral view. If so, chain the lateral view + // operator on + ArrayList lateralViews = aliasToLateralViews.get(alias); + if (lateralViews != null) { + Operator op = e.getValue(); + + for (ASTNode lateralViewTree : aliasToLateralViews.get(alias)) { + // There are 2 paths from the TS operator (or a previous LVJ operator) + // to the same LateralViewJoinOperator. + // TS -> SelectOperator(*) -> LateralViewJoinOperator + // TS -> SelectOperator (gets cols for UDTF) -> UDTFOperator0 + // -> LateralViewJoinOperator + + // The order in which the two paths are added is important. The + // lateral view join operator depends on having the select operator + // give it the row first. + + // Get the all path by making a select(*) + ArrayList colList = new ArrayList(); + RowResolver inputRR = opParseCtx.get(op).getRR(); + RowResolver allPathRR = new RowResolver(); + genColListRegex(".*", null, null, null, colList, inputRR, + Integer.valueOf(0), allPathRR); + Vector cols = allPathRR.getColumnInfos(); + ArrayList outputColumnNames = new ArrayList(); + for (ColumnInfo c : cols) { + outputColumnNames.add(c.getInternalName()); + } + Operator allPath = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new selectDesc(colList, outputColumnNames, true), + new RowSchema(allPathRR.getColumnInfos()), + op), allPathRR); + + // Get the UDTF Path + QB blankQb = new QB(null, null, false); + Operator udtfPath = + genSelectPlan((ASTNode)lateralViewTree.getChild(0), blankQb, op); + RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRR(); + + + // Merge the two into the lateral view join + // The cols of the merged result will be the combination of both the + // cols of the UDTF path and the cols of the all path. The internal + // names have to be changed to avoid conflicts + + RowResolver lateralViewRR = new RowResolver(); + ArrayList outputInternalColNames = new ArrayList(); + + LVmergeRowResolvers(allPathRR, lateralViewRR, + outputInternalColNames); + LVmergeRowResolvers(udtfPathRR, lateralViewRR, + outputInternalColNames); + + Operator lateralViewJoin = + putOpInsertMap(OperatorFactory.getAndMakeChild( + new lateralViewJoinDesc(outputInternalColNames), + new RowSchema(lateralViewRR.getColumnInfos()), + allPath, udtfPath), lateralViewRR); + op = lateralViewJoin; + } + e.setValue(op); + } + } + } + + /** + * A helper function that gets all the columns and respective aliases in the + * source and puts them into dest. It renames the internal names of the + * columns based on getColumnInternalName(position). + * + * Note that this helper method relies on RowResolver.getColumnInfos() + * returning the columns in the same order as they will be passed in the + * operator DAG. + * + * @param source + * @param dest + * @param outputColNames - a list to which the new internal column names will + * be added, in the same order as in the dest row + * resolver + */ + private void LVmergeRowResolvers(RowResolver source, RowResolver dest, + ArrayList outputInternalColNames) { + Vector cols = source.getColumnInfos(); + for (ColumnInfo c : cols) { + String internalName = + getColumnInternalName(outputInternalColNames.size()); + outputInternalColNames.add(internalName); + ColumnInfo newCol = new ColumnInfo(internalName, c.getType(), + c.getTabAlias(), c.getIsPartitionCol()); + String [] tableCol = source.reverseLookup(c.getInternalName()); + String tableAlias = tableCol[0]; + String colAlias = tableCol[1]; + dest.put(tableAlias, colAlias, newCol); + } + } + private Operator getReduceSink(Operator top) { if (top.getClass() == ReduceSinkOperator.class) { // Get the operator following the reduce sink @@ -4991,7 +5238,7 @@ * @throws SemanticException */ @SuppressWarnings("nls") - public static exprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input) + public exprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input) throws SemanticException { // We recursively create the exprNodeDesc. Base cases: when we encounter // a column ref, we convert that into an exprNodeColumnDesc; when we encounter Index: ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (working copy) @@ -127,7 +127,11 @@ } public HashMap getFieldMap(String tab_alias) { - return rslvMap.get(tab_alias.toLowerCase()); + if (tab_alias == null) { + return rslvMap.get(null); + } else { + return rslvMap.get(tab_alias.toLowerCase()); + } } public int getPosition(String internalName) { Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (working copy) @@ -21,12 +21,13 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; /** * A Generic User-defined Table Generating Function (UDTF) * - * Generates a variable number of output rows for a variable number of input - * rows. Useful for explode(array()), histograms, etc + * Generates a variable number of output rows for a single input row. Useful for + * explode(array)... */ public abstract class GenericUDTF { @@ -37,20 +38,25 @@ * instance. * * @param args An array of ObjectInspectors for the arguments - * @return ObjectInspector for the output + * @return A StructObjectInspector for output. The output struct + * represents a row of the table where the fields of the stuct + * are the columns. The field names are unimportant as they + * will be overridden by user supplied column aliases. */ - public abstract ObjectInspector initialize(ObjectInspector [] argOIs) + public abstract StructObjectInspector initialize(ObjectInspector [] argOIs) throws UDFArgumentException; /** - * Give a a set of arguments for the UDTF to process. + * Give a set of arguments for the UDTF to process. * * @param o object array of arguments */ public abstract void process(Object [] args) throws HiveException; /** - * Notify the UDTF that there are no more rows to process. + * Called to notify the UDTF that there are no more rows to process. Note + * that forward() should not be called in this function. Only clean up code + * should be run. */ public abstract void close() throws HiveException; @@ -66,7 +72,7 @@ } /** - * Passes output data to collector + * Passes an output row to the collector * * @param o * @throws HiveException Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (revision 4426) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.udf.generic; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; @@ -25,6 +26,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @description( name = "explode", @@ -39,26 +42,34 @@ } @Override - public ObjectInspector initialize(ObjectInspector [] args) throws UDFArgumentException { + public StructObjectInspector initialize(ObjectInspector [] args) + throws UDFArgumentException { - if(args.length != 1) { + if (args.length != 1) { throw new UDFArgumentException("explode() takes only one argument"); } - if(args[0].getCategory() != ObjectInspector.Category.LIST) { + if (args[0].getCategory() != ObjectInspector.Category.LIST) { throw new UDFArgumentException("explode() takes an array as a parameter"); } listOI = (ListObjectInspector)args[0]; - return listOI.getListElementObjectInspector(); + ArrayList fieldNames = new ArrayList(); + ArrayList fieldOIs = new ArrayList(); + fieldNames.add("col"); + fieldOIs.add(listOI.getListElementObjectInspector()); + return ObjectInspectorFactory.getStandardStructObjectInspector( + fieldNames, fieldOIs); } + Object forwardObj[] = new Object[1]; @Override public void process(Object [] o) throws HiveException { List list = listOI.getList(o[0]); - for(Object r : list) { - this.forward(r); + for (Object r : list) { + forwardObj[0] = r; + this.forward(forwardObj); } } Index: ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java =================================================================== --- ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (revision 4426) +++ ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (working copy) @@ -28,6 +28,7 @@ public static final int REDUCESINK = 11; public static final int UNION = 12; public static final int UDTF = 13; + public static final int LATERALVIEWJOIN = 14; public static final IntRangeSet VALID_VALUES = new IntRangeSet( JOIN, @@ -43,7 +44,8 @@ FILESINK, REDUCESINK, UNION, - UDTF ); + UDTF, + LATERALVIEWJOIN ); public static final Map VALUES_TO_NAMES = new HashMap() {{ put(JOIN, "JOIN"); @@ -60,5 +62,6 @@ put(REDUCESINK, "REDUCESINK"); put(UNION, "UNION"); put(UDTF, "UDTF"); + put(LATERALVIEWJOIN, "LATERALVIEWJOIN"); }}; } Index: ql/src/gen-php/queryplan_types.php =================================================================== --- ql/src/gen-php/queryplan_types.php (revision 4426) +++ ql/src/gen-php/queryplan_types.php (working copy) @@ -50,6 +50,7 @@ 'REDUCESINK' => 11, 'UNION' => 12, 'UDTF' => 13, + 'LATERALVIEWJOIN' => 14, ); final class OperatorType { @@ -67,6 +68,7 @@ const REDUCESINK = 11; const UNION = 12; const UDTF = 13; + const LATERALVIEWJOIN = 14; static public $__names = array( 0 => 'JOIN', 1 => 'MAPJOIN', @@ -82,6 +84,7 @@ 11 => 'REDUCESINK', 12 => 'UNION', 13 => 'UDTF', + 14 => 'LATERALVIEWJOIN', ); } Index: ql/if/queryplan.thrift =================================================================== --- ql/if/queryplan.thrift (revision 4426) +++ ql/if/queryplan.thrift (working copy) @@ -16,7 +16,7 @@ } #Represents a operator along with its counters -enum OperatorType { JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF } +enum OperatorType { JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN } struct Operator { 1: string operatorId, 2: OperatorType operatorType,