Index: ql/src/test/results/clientnegative/udtf_not_supported2.q.out =================================================================== --- ql/src/test/results/clientnegative/udtf_not_supported2.q.out (revision 0) +++ ql/src/test/results/clientnegative/udtf_not_supported2.q.out (revision 0) @@ -0,0 +1 @@ +FAILED: Error in semantic analysis: UDTF's require an AS clause Index: ql/src/test/results/clientnegative/udtf_not_supported1.q.out =================================================================== --- ql/src/test/results/clientnegative/udtf_not_supported1.q.out (revision 0) +++ ql/src/test/results/clientnegative/udtf_not_supported1.q.out (revision 0) @@ -0,0 +1 @@ +FAILED: Error in semantic analysis: Only a single expression in SELECT is supported with UDTF's Index: ql/src/test/results/clientnegative/udtf_not_supported3.q.out =================================================================== --- ql/src/test/results/clientnegative/udtf_not_supported3.q.out (revision 0) +++ ql/src/test/results/clientnegative/udtf_not_supported3.q.out (revision 0) @@ -0,0 +1 @@ +FAILED: Error in semantic analysis: GROUP BY is not supported with UDTF's Index: ql/src/test/results/clientpositive/show_functions.q.out =================================================================== --- ql/src/test/results/clientpositive/show_functions.q.out (revision 4330) +++ ql/src/test/results/clientpositive/show_functions.q.out (working copy) @@ -46,6 +46,7 @@ double elt exp +explode find_in_set float floor @@ -145,6 +146,7 @@ case coalesce double +explode from_unixtime lcase like Index: ql/src/test/results/clientpositive/udtf_explode.q.out =================================================================== --- ql/src/test/results/clientpositive/udtf_explode.q.out (revision 0) +++ ql/src/test/results/clientpositive/udtf_explode.q.out (revision 0) @@ -0,0 +1,238 @@ +PREHOOK: query: EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + inputColumnNames: _col0 + outputColumnName: myCol + function name: explode + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1834683504/10001 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns myCol + serialization.format 1 + columns.types int + Needs Tagging: false + Path -> Alias: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [src] + Path -> Partition: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + Partition + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + name src + columns.types string:string + serialization.ddl struct src { string key, string value} + serialization.format 1 + columns key,value + bucket_count -1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + file.inputformat org.apache.hadoop.mapred.TextInputFormat + file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1258418159 + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: src + + Stage: Stage-0 + Fetch Operator + limit: 3 + + +PREHOOK: query: EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL myCol)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (TOK_TABLE_OR_COL myCol)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a:src + TableScan + alias: src + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + inputColumnNames: _col0 + outputColumnName: myCol + function name: explode + Limit + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: myCol + type: int + Needs Tagging: false + Path -> Alias: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [a:src] + Path -> Partition: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + Partition + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + name src + columns.types string:string + serialization.ddl struct src { string key, string value} + serialization.format 1 + columns key,value + bucket_count -1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + file.inputformat org.apache.hadoop.mapred.TextInputFormat + file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1258418159 + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: src + Reduce Operator Tree: + Extract + Limit + Select Operator + expressions: + expr: _col0 + type: int + outputColumnNames: _col0 + Group By Operator + aggregations: + expr: count(1) + keys: + expr: _col0 + type: int + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/572784041/10002 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types int,bigint + escape.delim \ + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/572784041/10002 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + Map-reduce partition columns: + expr: _col0 + type: int + tag: -1 + value expressions: + expr: _col1 + type: bigint + Needs Tagging: false + Path -> Alias: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/572784041/10002 [file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/572784041/10002] + Path -> Partition: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/572784041/10002 + Partition + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types int,bigint + escape.delim \ + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + keys: + expr: KEY._col0 + type: int + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/572784041/10001 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + serialization.format 1 + columns.types int:bigint + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1680443128/10000 +POSTHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1680443128/10000 +1 +2 +3 +PREHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/494717493/10000 +POSTHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/494717493/10000 +1 1 +2 1 +3 1 Index: ql/src/test/queries/clientnegative/udtf_not_supported1.q =================================================================== --- ql/src/test/queries/clientnegative/udtf_not_supported1.q (revision 0) +++ ql/src/test/queries/clientnegative/udtf_not_supported1.q (revision 0) @@ -0,0 +1 @@ +SELECT explode(array(1,2,3)) as myCol, key FROM src; Index: ql/src/test/queries/clientnegative/udtf_not_supported3.q =================================================================== --- ql/src/test/queries/clientnegative/udtf_not_supported3.q (revision 0) +++ ql/src/test/queries/clientnegative/udtf_not_supported3.q (revision 0) @@ -0,0 +1 @@ +SELECT explode(array(1,2,3)) as myCol FROM src GROUP BY key; Index: ql/src/test/queries/clientnegative/udtf_not_supported2.q =================================================================== --- ql/src/test/queries/clientnegative/udtf_not_supported2.q (revision 0) +++ ql/src/test/queries/clientnegative/udtf_not_supported2.q (revision 0) @@ -0,0 +1 @@ +SELECT explode(array(1,2,3)) FROM src; Index: ql/src/test/queries/clientpositive/udtf_explode.q =================================================================== --- ql/src/test/queries/clientpositive/udtf_explode.q (revision 0) +++ ql/src/test/queries/clientpositive/udtf_explode.q (revision 0) @@ -0,0 +1,5 @@ +EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; +EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; + +SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; +SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (working copy) @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.UDTFOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -253,7 +254,7 @@ // Without this break, a bug in ReduceSink to Extract edge column pruning will manifest // which should be fixed before remove this if ((child instanceof FileSinkOperator) - || (child instanceof ScriptOperator) + || (child instanceof ScriptOperator) ||(child instanceof UDTFOperator) || (child instanceof LimitOperator) || (child instanceof UnionOperator)) { cppCtx.getPrunedColLists().put(op, cppCtx.getColsFromSelectExpr(op)); return null; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (working copy) @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; public class FunctionInfo { @@ -27,24 +28,30 @@ private String displayName; - private GenericUDF genericUDF; + private GenericUDF genericUDF = null; - private GenericUDAFResolver genericUDAFResolver; - + private GenericUDTF genericUDTF = null; + + private GenericUDAFResolver genericUDAFResolver = null; + public FunctionInfo(boolean isNative, String displayName, GenericUDF genericUDF) { this.isNative = isNative; this.displayName = displayName; this.genericUDF = genericUDF; - this.genericUDAFResolver = null; } public FunctionInfo(boolean isNative, String displayName, GenericUDAFResolver genericUDAFResolver) { this.isNative = isNative; this.displayName = displayName; - this.genericUDF = null; this.genericUDAFResolver = genericUDAFResolver; } + public FunctionInfo(boolean isNative, String displayName, GenericUDTF genericUDTF) { + this.isNative = isNative; + this.displayName = displayName; + this.genericUDTF = genericUDTF; + } + /** * Get a new GenericUDF object for the function. */ @@ -54,6 +61,16 @@ } /** + * Get a new GenericUDTF object for the function. + */ + public GenericUDTF getGenericUDTF() { + // GenericUDTF is stateful - we have to make a copy here + if(genericUDTF == null) + return null; + return FunctionRegistry.cloneGenericUDTF(genericUDTF); + } + + /** * Get the GenericUDAFResolver object for the function. */ public GenericUDAFResolver getGenericUDAFResolver() { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 4339) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -216,6 +216,9 @@ registerGenericUDF("locate", GenericUDFLocate.class); registerGenericUDF("elt", GenericUDFElt.class); registerGenericUDF("concat_ws", GenericUDFConcatWS.class); + + // Generic UDTF's + registerGenericUDTF("explode", GenericUDTFExplode.class); } public static void registerTemporaryUDF(String functionName, Class UDFClass, @@ -268,6 +271,21 @@ } } + static void registerGenericUDTF(String functionName, Class genericUDTFClass) { + registerGenericUDTF(true, functionName, genericUDTFClass); + } + + public static void registerGenericUDTF(boolean isNative, String functionName, Class genericUDTFClass) { + if (GenericUDTF.class.isAssignableFrom(genericUDTFClass)) { + FunctionInfo fI = new FunctionInfo(isNative, functionName, + (GenericUDTF)ReflectionUtils.newInstance(genericUDTFClass, null)); + mFunctions.put(functionName.toLowerCase(), fI); + } else { + throw new RuntimeException("Registering GenericUDF Class " + genericUDTFClass + + " which does not extends " + GenericUDTF.class); + } + } + public static FunctionInfo getFunctionInfo(String functionName) { return mFunctions.get(functionName.toLowerCase()); } @@ -630,6 +648,13 @@ } /** + * Create a copy of an existing GenericUDTF. + */ + public static GenericUDTF cloneGenericUDTF(GenericUDTF genericUDTF) { + return (GenericUDTF)ReflectionUtils.newInstance(genericUDTF.getClass(), null); + } + + /** * Get the UDF class from an exprNodeDesc. * Returns null if the exprNodeDesc does not contain a UDF class. */ Index: ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 0) @@ -0,0 +1,161 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.udtfDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + +public class UDTFOperator extends Operator implements Serializable { + private static final long serialVersionUID = 1L; + + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + List udtfInputColNames = null; + Set udtfInputColNamesSet = null; + ObjectInspector [] udtfInputOIs = null; + Object [] objToSendToUDTF = null; + + Object previousRow = null; + int previousTag = 0; + + protected void initializeOp(Configuration hconf) throws HiveException { + conf.getUDTF().setCollector(new UDTFCollector(this)); + // The input object inspector for the UDTFOperator is a for row that + // includes all the columns in the query. However, the UDTF only takes + // a subset of the available columns, specified in udtfDesc.inputColNames, + // as parameter inputs. Consequently, a new object inspector needs to be + // created that only includes columns that the UDTF expects + + // Make a map from the column names to the respective OI's for easier + // processing + udtfInputColNames = conf.getInputColNames(); + List inputFields = + ((StandardStructObjectInspector)inputObjInspectors[0]).getAllStructFieldRefs(); + Map fieldToOI = new HashMap(); + for(StructField sf : inputFields) { + fieldToOI.put(sf.getFieldName(), sf.getFieldObjectInspector()); + } + // Create the object inspector for the UDTF + udtfInputOIs = new ObjectInspector[udtfInputColNames.size()]; + for(int i=0; i(); + udtfInputColNamesSet.addAll(udtfInputColNames); + + ArrayList colNames = new ArrayList(); + ArrayList colOIs = new ArrayList(); + for(StructField sf : inputFields) { + String fieldName = sf.getFieldName(); + ObjectInspector fieldOI = sf.getFieldObjectInspector(); + // If the column was an argument to the UDTF, it shouldn't be in the + // output. Instead, the output column of the UDTF should be inserted into + // its place, only once) + if(udtfInputColNamesSet.contains(fieldName)) { + if(!insertedUDTFCol) { + colNames.add(conf.getOutputColName()); + colOIs.add(udtfOutputOI); + insertedUDTFCol = true; + } + } else { + colNames.add(fieldName); + colOIs.add(fieldOI); + } + } + + outputObjInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colOIs); + + // Initialize the rest of the operator DAG + super.initializeOp(hconf); + } + + public void processOp(Object row, int tag) throws HiveException { + // The UDTF only expects a subset of the data in the row. Repackage the + // row into one containing only the columns that the udtf expects to see. + StandardStructObjectInspector soi = + (StandardStructObjectInspector) inputObjInspectors[tag]; + List rowList = soi.getStructFieldsDataAsList(row); + + for(int i=0; i newRow = new ArrayList(); + List fields = soi.getAllStructFieldRefs(); + boolean insertedUDTFCol = false; + + for(StructField sf : fields) { + if(udtfInputColNamesSet.contains(sf.getFieldName())) { + if(!insertedUDTFCol) { + newRow.add(o); + insertedUDTFCol = true; + } + } else { + newRow.add(soi.getStructFieldData(previousRow, sf)); + } + } + forward(newRow, outputObjInspector); + } + + public String getName() { + return "UDTF"; + } + + public int getType() { + return OperatorType.UDTF; + } + + protected void closeOp(boolean abort) throws HiveException { + conf.getUDTF().close(); + } +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -21,6 +21,8 @@ import java.util.*; import java.io.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.plan.*; public class OperatorFactory { @@ -52,6 +54,7 @@ opvec.add(new opTuple (limitDesc.class, LimitOperator.class)); opvec.add(new opTuple (tableScanDesc.class, TableScanOperator.class)); opvec.add(new opTuple (unionDesc.class, UnionOperator.class)); + opvec.add(new opTuple (udtfDesc.class, UDTFOperator.class)); } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (revision 0) @@ -0,0 +1,56 @@ +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; + +@explain(displayName="UDTF Operator") +public class udtfDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private GenericUDTF genericUDTF; + private String outputColName; + //The UDTF operator gets the entire row from the select operator but + // the GenericUDTF only acts on a subset of the columns. Hence, this desc + // needs to store which columns are inputs. + private List inputColNames = new ArrayList(); + + public udtfDesc() { } + public udtfDesc(final GenericUDTF genericUDTF, String outputColName, + List inputCols) { + this.genericUDTF = genericUDTF; + this.outputColName = outputColName; + this.inputColNames.addAll(inputCols); + } + + public GenericUDTF getUDTF() { + return this.genericUDTF; + } + public void setUDTF(final GenericUDTF genericUDTF) { + this.genericUDTF=genericUDTF; + } + @explain(displayName="function name") + public String getUDTFName() { + return this.genericUDTF.toString(); + } + + @explain(displayName="outputColumnName") + public String getOutputColName() { + return this.outputColName; + } + + public void setOutputColName(String outputColName) { + this.outputColName = outputColName; + } + @explain(displayName="inputColumnNames") + public List getInputColNames() { + return this.inputColNames; + } + + public void setInputColNames(List inputColNames) { + this.inputColNames.clear(); + this.inputColNames.addAll(inputColNames); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (working copy) @@ -180,6 +180,9 @@ public ASTNode getGroupByForClause(String clause) { return this.destToGroupby.get(clause); } + public HashMap getDestToGroupBy() { + return this.destToGroupby; + } public ASTNode getSelForClause(String clause) { return this.destToSelExpr.get(clause); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (working copy) @@ -532,6 +532,10 @@ else throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg((ASTNode)expr)); } + // Detect UDTF's in nested SELECT, GROUP BY, etc as they aren't supported + if (fi.getGenericUDTF() != null) { + throw new SemanticException("UDTF's are not supported outside the SELECT clause, nor nested in expressions"); + } try { desc = getFuncExprNodeDesc(funcText, children); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -20,13 +20,10 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Formatter; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -61,10 +59,8 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -116,17 +112,22 @@ import org.apache.hadoop.hive.ql.plan.selectDesc; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.ql.plan.tableScanDesc; +import org.apache.hadoop.hive.ql.plan.udtfDesc; import org.apache.hadoop.hive.ql.plan.unionDesc; import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -136,19 +137,17 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.ql.exec.TextRecordReader; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import java.util.regex.Pattern; import java.util.regex.Matcher; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; @@ -163,7 +162,6 @@ import org.apache.hadoop.hive.ql.exec.FetchOperator; import java.util.Collection; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; /** * Implementation of the semantic analyzer @@ -1367,6 +1365,48 @@ return false; } + /** + * A simple data structure to hold information about UDTF's that must be passed + * from genSelectPlan to genUDTFPlan + */ + private class UDTFInfo { + // The name/alias of the output column + String outputColName; + // The internal column names for the inputs to the UDTF. List order is + // same as the paramter order + List inputColNames; + + public UDTFInfo() { + outputColName = null; + inputColNames = new ArrayList(); + } + + public void setOutputColName(String outputColName) { + this.outputColName = outputColName; + } + public String getOutputColName() { + return outputColName; + } + public void addInputColName(String inputColName) { + inputColNames.add(inputColName); + } + public List getInputColNames() { + return inputColNames; + } + + public String toString() { + String ret = ""; + ret += "{output: (" + outputColName + "), input: ("; + for(int i=0; i 1) { + throw new SemanticException("Only a single expression in SELECT is supported with UDTF's"); + } + //Require an AS for UDTFs + if(((ASTNode) selExprList.getChild(posn)).getChildCount() != 2 || + selExprList.getChild(posn).getChild(1).getType() != HiveParser.Identifier ){ + throw new SemanticException("UDTF's require an AS clause"); + } + udtfInfo.setOutputColName(unescapeIdentifier(selExprList.getChild(posn).getChild(1).getText())); + } + // The list of expressions after SELECT or SELECT TRANSFORM. - ASTNode exprList = (isInTransform ? (ASTNode) trfm.getChild(0) : selExprList); + ASTNode exprList; + if(isInTransform) { + exprList = (ASTNode) trfm.getChild(0); + } else if(isUDTF) { + exprList = (ASTNode) udtfExpr; + } else { + exprList = selExprList; + } + LOG.debug("genSelectPlan: input = " + inputRR.toString()); + + // For UDTF's, skip the function name + int startPosn = isUDTF ? posn + 1 : posn; + // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM) - for (int i = posn; i < exprList.getChildCount(); ++i) { + for (int i = startPosn; i < exprList.getChildCount(); ++i) { // child can be EXPR AS ALIAS, or EXPR. ASTNode child = (ASTNode) exprList.getChild(i); @@ -1408,7 +1488,7 @@ String tabAlias; String colAlias; - if (isInTransform) { + if (isInTransform || isUDTF) { tabAlias = null; colAlias = "_C" + i; expr = child; @@ -1457,6 +1537,10 @@ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(getColumnInternalName(pos), exp.getTypeInfo(), tabAlias, false)); + + if(isUDTF) { + udtfInfo.addInputColName(getColumnInternalName(pos)); + } pos = Integer.valueOf(pos.intValue() + 1); } } @@ -1483,11 +1567,14 @@ output = genScriptPlan(trfm, qb, output); } + if(isUDTF) { + output = genUDTFPlan(genericUDTF, udtfInfo, qb, output); + } LOG.debug("Created Select Plan for clause: " + dest + " row schema: " + out_rwsch.toString()); return output; } - + /** * Class to store GenericUDAF related information. */ @@ -2856,7 +2943,100 @@ return limitMap; } + + private Operator genUDTFPlan(GenericUDTF genericUDTF, UDTFInfo udtfInfo, + QB qb, Operator input) throws SemanticException { + + // No GROUP BY / DISTRIBUTE BY / SORT BY / CLUSTER BY + QBParseInfo qbp = qb.getParseInfo(); + String notSupported = " is not supported with UDTF's"; + if(!qbp.getDestToGroupBy().isEmpty()) { + throw new SemanticException("GROUP BY" + notSupported); + } + if(!qbp.getDestToDistributeBy().isEmpty()) { + throw new SemanticException("DISTRYBUTE BY" + notSupported); + } + if(!qbp.getDestToSortBy().isEmpty()) { + throw new SemanticException("SORT BY" + notSupported); + } + if(!qbp.getDestToClusterBy().isEmpty()) { + throw new SemanticException("CLUSTER BY" + notSupported); + } + + // Use the RowResolver from the input operator to generate a input + // ObjectInspector that can be used to initialize the UDTF. Then, the + // resulting output object inspector can be used to make the RowResolver + // for the UDTF operator + RowResolver selectRR = opParseCtx.get(input).getRR(); + Vector inputCols = selectRR.getColumnInfos(); + // UDTF's must always have an input col as it is replaced with + // the output col + List inputColNames = udtfInfo.getInputColNames(); + if(inputColNames.size() == 0) { + throw new SemanticException("UDTF's must take an input argument"); + } + // Print out input col names + LOG.debug("UDTF " + genericUDTF + " input columns: "); + for(int i=0; i colNames = new ArrayList(); + ObjectInspector [] colOIs = new ObjectInspector[inputColNames.size()]; + for(int i=0; i outputCols = new ArrayList(); + + // Create the output columns, using the fact the the UDTF will take multiple + // input cols and output a single output col. + Set inputColSet = new HashSet(); + inputColSet.addAll(inputColNames); + + boolean insertedUDTFCol = false; + for(int i=0; i opRules = new LinkedHashMap(); - StringBuilder sb = new StringBuilder(); - Formatter fm = new Formatter(sb); + opRules.put(new RuleRegExp("R1", HiveParser.TOK_NULL + "%"), TypeCheckProcFactory.getNullExprProcessor()); opRules.put(new RuleRegExp("R2", HiveParser.Number + "%"), TypeCheckProcFactory.getNumExprProcessor()); opRules.put(new RuleRegExp("R3", HiveParser.Identifier + "%|" + Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (revision 0) @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * A Generic User-defined Table Generating Function (UDTF) + * + * Generates a variable number of output rows for a variable number of input + * rows. Useful for explode(array()), histograms, etc + */ + +public abstract class GenericUDTF { + Collector collector = null; + + /** + * Initialize this GenericUDTF. This will be called only once per + * instance. + * + * @param args An array of ObjectInspectors for the arguments + * @return ObjectInspector for the output + */ + public abstract ObjectInspector initialize(ObjectInspector [] argOIs) + throws UDFArgumentException; + + /** + * Give a a set of arguments for the UDTF to process. + * + * @param o object array of arguments + */ + public abstract void process(Object [] args) throws HiveException; + + /** + * Notify the UDTF that there are no more rows to process. The UDTF may + * emit rows + */ + public abstract void close() throws HiveException; + + /** + * Associates a collector with this UDTF. Can't be specified in the + * constructor as the UDTF may be initialized before the collector has been + * constructed. + * + * @param collector + */ + public final void setCollector(Collector collector) { + this.collector = collector; + } + + /** + * Passes output data to collector + * + * @param o + * @throws HiveException + */ + final void forward(Object o) throws HiveException { + collector.collect(o); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java (revision 0) @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.UDTFOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * UDTFCollector collects data from a GenericUDTF and passes the data to a + * UDTFOperator + */ +public class UDTFCollector implements Collector { + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.udf.generic.Collector#collect(java.lang.Object) + */ + UDTFOperator op = null; + + public UDTFCollector(UDTFOperator op) { + this.op = op; + } + @Override + public void collect(Object input) throws HiveException{ + op.forwardUDTFOutput(input); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (revision 0) @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.description; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +@description( + name = "explode", + value = "_FUNC_(a) - separates the elements of array a into multiple rows " +) +public class GenericUDTFExplode extends GenericUDTF { + + ListObjectInspector listOI = null; + + @Override + public void close() throws HiveException{ + } + + @Override + public ObjectInspector initialize(ObjectInspector [] args) throws UDFArgumentException { + + if(args.length != 1) { + throw new UDFArgumentException("explode() takes only one argument"); + } + + if(args[0].getCategory() != ObjectInspector.Category.LIST) { + throw new UDFArgumentException("explode() takes an array as a parameter"); + } + listOI = (ListObjectInspector)args[0]; + + return listOI.getListElementObjectInspector(); + } + + @Override + public void process(Object [] o) throws HiveException { + + List list = listOI.getList(o[0]); + for(Object r : list) { + this.forward(r); + } + } + + @Override + public String toString() { + return "explode"; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Collector gets data from a source. + */ +public interface Collector { + /** + * Other classes will call collect() with the data that it has. + * @param input + */ + void collect(Object input) throws HiveException; +} Index: ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java =================================================================== --- ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (revision 4330) +++ ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (working copy) @@ -27,6 +27,7 @@ public static final int FILESINK = 10; public static final int REDUCESINK = 11; public static final int UNION = 12; + public static final int UDTF = 13; public static final IntRangeSet VALID_VALUES = new IntRangeSet( JOIN, @@ -41,7 +42,8 @@ TABLESCAN, FILESINK, REDUCESINK, - UNION ); + UNION, + UDTF); public static final Map VALUES_TO_NAMES = new HashMap() {{ put(JOIN, "JOIN"); @@ -57,5 +59,6 @@ put(FILESINK, "FILESINK"); put(REDUCESINK, "REDUCESINK"); put(UNION, "UNION"); + put(UDTF, "UDTF"); }}; }