Index: ql/src/test/results/clientpositive/show_functions.q.out =================================================================== --- ql/src/test/results/clientpositive/show_functions.q.out (revision 4330) +++ ql/src/test/results/clientpositive/show_functions.q.out (working copy) @@ -46,6 +46,7 @@ double elt exp +explode find_in_set float floor @@ -145,6 +146,7 @@ case coalesce double +explode from_unixtime lcase like Index: ql/src/test/results/clientpositive/udtf_explode.q.out =================================================================== --- ql/src/test/results/clientpositive/udtf_explode.q.out (revision 0) +++ ql/src/test/results/clientpositive/udtf_explode.q.out (revision 0) @@ -0,0 +1,90 @@ +PREHOOK: query: EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + inputColumnNames: _col0 + outputColumnName: myCol + function name: explode + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/220468441/10001 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns myCol + serialization.format 1 + columns.types int + Needs Tagging: false + Path -> Alias: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [src] + Path -> Partition: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + Partition + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + name src + columns.types string:string + serialization.ddl struct src { string key, string value} + serialization.format 1 + columns key,value + bucket_count -1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + file.inputformat org.apache.hadoop.mapred.TextInputFormat + file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1258081154 + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: src + + Stage: Stage-0 + Fetch Operator + limit: 3 + + +PREHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/123040219/10000 +POSTHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/123040219/10000 +1 +2 +3 +PREHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1297819065/10000 +POSTHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1297819065/10000 +1 1 +2 1 +3 1 Index: ql/src/test/queries/clientpositive/udtf_explode.q =================================================================== --- ql/src/test/queries/clientpositive/udtf_explode.q (revision 0) +++ ql/src/test/queries/clientpositive/udtf_explode.q (revision 0) @@ -0,0 +1,4 @@ +EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; + +SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; +SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (working copy) @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.UDTFOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -253,7 +254,7 @@ // Without this break, a bug in ReduceSink to Extract edge column pruning will manifest // which should be fixed before remove this if ((child instanceof FileSinkOperator) - || (child instanceof ScriptOperator) + || (child instanceof ScriptOperator) ||(child instanceof UDTFOperator) || (child instanceof LimitOperator) || (child instanceof UnionOperator)) { cppCtx.getPrunedColLists().put(op, cppCtx.getColsFromSelectExpr(op)); return null; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (working copy) @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; public class FunctionInfo { @@ -27,24 +28,30 @@ private String displayName; - private GenericUDF genericUDF; + private GenericUDF genericUDF = null; - private GenericUDAFResolver genericUDAFResolver; - + private GenericUDTF genericUDTF = null; + + private GenericUDAFResolver genericUDAFResolver = null; + public FunctionInfo(boolean isNative, String displayName, GenericUDF genericUDF) { this.isNative = isNative; this.displayName = displayName; this.genericUDF = genericUDF; - this.genericUDAFResolver = null; } public FunctionInfo(boolean isNative, String displayName, GenericUDAFResolver genericUDAFResolver) { this.isNative = isNative; this.displayName = displayName; - this.genericUDF = null; this.genericUDAFResolver = genericUDAFResolver; } + public FunctionInfo(boolean isNative, String displayName, GenericUDTF genericUDTF) { + this.isNative = isNative; + this.displayName = displayName; + this.genericUDTF = genericUDTF; + } + /** * Get a new GenericUDF object for the function. */ @@ -54,6 +61,16 @@ } /** + * Get a new GenericUDTF object for the function. + */ + public GenericUDTF getGenericUDTF() { + // GenericUDTF is stateful - we have to make a copy here + if(genericUDTF == null) + return null; + return FunctionRegistry.cloneGenericUDTF(genericUDTF); + } + + /** * Get the GenericUDAFResolver object for the function. */ public GenericUDAFResolver getGenericUDAFResolver() { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -216,6 +216,11 @@ registerGenericUDF("locate", GenericUDFLocate.class); registerGenericUDF("elt", GenericUDFElt.class); registerGenericUDF("concat_ws", GenericUDFConcatWS.class); + registerGenericUDF("array", GenericUDFArray.class); + registerGenericUDF("map", GenericUDFMap.class); + + // Generic UDTF's + registerGenericUDTF("explode", GenericUDTFExplode.class); } public static void registerTemporaryUDF(String functionName, Class UDFClass, @@ -268,6 +273,21 @@ } } + static void registerGenericUDTF(String functionName, Class genericUDTFClass) { + registerGenericUDTF(true, functionName, genericUDTFClass); + } + + public static void registerGenericUDTF(boolean isNative, String functionName, Class genericUDTFClass) { + if (GenericUDTF.class.isAssignableFrom(genericUDTFClass)) { + FunctionInfo fI = new FunctionInfo(isNative, functionName, + (GenericUDTF)ReflectionUtils.newInstance(genericUDTFClass, null)); + mFunctions.put(functionName.toLowerCase(), fI); + } else { + throw new RuntimeException("Registering GenericUDF Class " + genericUDTFClass + + " which does not extends " + GenericUDTF.class); + } + } + public static FunctionInfo getFunctionInfo(String functionName) { return mFunctions.get(functionName.toLowerCase()); } @@ -630,6 +650,13 @@ } /** + * Create a copy of an existing GenericUDTF. + */ + public static GenericUDTF cloneGenericUDTF(GenericUDTF genericUDTF) { + return (GenericUDTF)ReflectionUtils.newInstance(genericUDTF.getClass(), null); + } + + /** * Get the UDF class from an exprNodeDesc. * Returns null if the exprNodeDesc does not contain a UDF class. */ Index: ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 0) @@ -0,0 +1,159 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.udtfDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +public class UDTFOperator extends Operator implements Serializable { + private static final long serialVersionUID = 1L; + + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + List udtfInputColNames = null; + Set udtfInputColNamesSet = null; + ObjectInspector [] udtfInputOIs = null; + Object [] objToSendToUDTF = null; + + Object previousRow = null; + int previousTag = 0; + + protected void initializeOp(Configuration hconf) throws HiveException { + conf.getUDTF().setCollector(new UDTFCollector(this)); + // The input object inspector for the UDTFOperator is a for row that + // includes all the columns in the query. However, the UDTF only takes + // a subset of the available columns, specified in udtfDesc.inputColNames, + // as parameter inputs. Hence, a new 'row' object inspector needs to be + // created that only includes columns that the UDTF expects + + // Make a map from the column names to the respective OI's for easier + // processing + udtfInputColNames = conf.getInputColNames(); + List inputFields = + ((StandardStructObjectInspector)inputObjInspectors[0]).getAllStructFieldRefs(); + Map fieldToOI = new HashMap(); + for(StructField sf : inputFields) { + fieldToOI.put(sf.getFieldName(), sf.getFieldObjectInspector()); + } + // Create the object inspector for the UDTF + udtfInputOIs = new ObjectInspector[udtfInputColNames.size()]; + for(int i=0; i(); + udtfInputColNamesSet.addAll(udtfInputColNames); + + ArrayList colNames = new ArrayList(); + ArrayList colOIs = new ArrayList(); + for(StructField sf : inputFields) { + String fieldName = sf.getFieldName(); + ObjectInspector fieldOI = sf.getFieldObjectInspector(); + // If the column was an argument to the UDTF, it shouldn't be in the + // output. Instead, the output column of the UDTF should be inserted into + // its place, only once) + if(udtfInputColNamesSet.contains(fieldName)) { + if(!insertedUDTFCol) { + colNames.add(conf.getOutputColName()); + colOIs.add(udtfOutputOI); + insertedUDTFCol = true; + } + } else { + colNames.add(fieldName); + colOIs.add(fieldOI); + } + } + + outputObjInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colOIs); + + // Initialize the rest of the operator DAG + super.initializeOp(hconf); + } + + public void processOp(Object row, int tag) throws HiveException { + // The UDTF only expects a subset of the data in the row. Repackage the + // row into one containing only the columns that the udtf expects to see + // TODO: Inefficient? + StandardStructObjectInspector soi = + (StandardStructObjectInspector) inputObjInspectors[tag]; + List rowList = soi.getStructFieldsDataAsList(row); + + for(int i=0; i newRow = new ArrayList(); + List fields = soi.getAllStructFieldRefs(); + boolean insertedUDTFCol = false; + + for(StructField sf : fields) { + if(udtfInputColNamesSet.contains(sf.getFieldName())) { + if(!insertedUDTFCol) { + newRow.add(o); + insertedUDTFCol = true; + } + } else { + newRow.add(soi.getStructFieldData(previousRow, sf)); + } + } + forward(newRow, outputObjInspector); + } + + public String getName() { + return "UDTF"; + } + + public int getType() { + return OperatorType.UDTF; + } + + protected void closeOp(boolean abort) throws HiveException { + conf.getUDTF().close(); + } +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (working copy) @@ -21,6 +21,8 @@ import java.util.*; import java.io.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.plan.*; public class OperatorFactory { @@ -52,6 +54,7 @@ opvec.add(new opTuple (limitDesc.class, LimitOperator.class)); opvec.add(new opTuple (tableScanDesc.class, TableScanOperator.class)); opvec.add(new opTuple (unionDesc.class, UnionOperator.class)); + opvec.add(new opTuple (udtfDesc.class, UDTFOperator.class)); } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (revision 0) @@ -0,0 +1,53 @@ +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; + +@explain(displayName="UDTF Operator") +public class udtfDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private GenericUDTF genericUDTF; + private String outputColName; + private List inputColNames = new ArrayList(); + + public udtfDesc() { } + public udtfDesc(final GenericUDTF genericUDTF, String outputColName, + List inputCols) { + this.genericUDTF = genericUDTF; + this.outputColName = outputColName; + this.inputColNames.addAll(inputCols); + } + + public GenericUDTF getUDTF() { + return this.genericUDTF; + } + public void setUDTF(final GenericUDTF genericUDTF) { + this.genericUDTF=genericUDTF; + } + @explain(displayName="function name") + public String getUDTFName() { + return this.genericUDTF.toString(); + } + + @explain(displayName="outputColumnName") + public String getOutputColName() { + return this.outputColName; + } + + public void setOutputColName(String outputColName) { + this.outputColName = outputColName; + } + @explain(displayName="inputColumnNames") + public List getInputColNames() { + return this.inputColNames; + } + + public void setInputColNames(List inputColNames) { + this.inputColNames.clear(); + this.inputColNames.addAll(inputColNames); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (working copy) @@ -532,6 +532,10 @@ else throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg((ASTNode)expr)); } + // Detect nested UDTF's as they aren't supported + if (fi.getGenericUDTF() != null) { + throw new SemanticException("Nested UDTF's are currently not supported."); + } try { desc = getFuncExprNodeDesc(funcText, children); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -20,18 +20,16 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Formatter; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.Vector; +import java.util.Map.Entry; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import java.lang.ClassNotFoundException; @@ -46,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -61,10 +60,8 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -91,6 +88,7 @@ import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink4; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory.DefaultExprProcessor; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.aggregationDesc; import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc; @@ -116,17 +114,22 @@ import org.apache.hadoop.hive.ql.plan.selectDesc; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.ql.plan.tableScanDesc; +import org.apache.hadoop.hive.ql.plan.udtfDesc; import org.apache.hadoop.hive.ql.plan.unionDesc; import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -136,19 +139,17 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.ql.exec.TextRecordReader; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import java.util.regex.Pattern; import java.util.regex.Matcher; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; @@ -163,7 +164,6 @@ import org.apache.hadoop.hive.ql.exec.FetchOperator; import java.util.Collection; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; /** * Implementation of the semantic analyzer @@ -1367,6 +1367,48 @@ return false; } + /** + * A simple data structure to hold information about UDTF's that must be passed + * from genSelectPlan to genUDTFPlan + */ + private class UDTFInfo { + // The name/alias of the output column + String outputColName; + // The internal column names for the inputs to the UDTF. List order is + // same as the paramter order + List inputColNames; + + public UDTFInfo() { + outputColName = null; + inputColNames = new ArrayList(); + } + + public void setOutputColName(String outputColName) { + this.outputColName = outputColName; + } + public String getOutputColName() { + return outputColName; + } + public void addInputColName(String inputColName) { + inputColNames.add(inputColName); + } + public List getInputColNames() { + return inputColNames; + } + + public String toString() { + String ret = ""; + ret += "{output: (" + outputColName + "), input: ("; + for(int i=0; i udtfMap = new HashMap(); + // The list of expressions after SELECT or SELECT TRANSFORM. ASTNode exprList = (isInTransform ? (ASTNode) trfm.getChild(0) : selExprList); @@ -1448,16 +1494,68 @@ alias, expr, col_list, inputRR, pos, out_rwsch); } else { // Case when this is an expression - exprNodeDesc exp = genExprNodeDesc(expr, inputRR); - col_list.add(exp); - if (!StringUtils.isEmpty(alias) && - (out_rwsch.get(null, colAlias) != null)) { - throw new SemanticException(ErrorMsg.AMBIGUOUS_COLUMN.getMsg(expr.getChild(1))); + LOG.debug("expression is: " + expr.toStringTree()); + + GenericUDTF genericUDTF = null; + boolean funcIsUDTF = false; + UDTFInfo udtfInfo = new UDTFInfo(); + + // Since UDTF's are parsed the same way as other functions, a UDTF can + // be detected by looking up the function name in the registry. + if (expr.getType() == HiveParser.TOK_FUNCTION) { + String funcName = + TypeCheckProcFactory.DefaultExprProcessor.getFunctionText(expr, true); + FunctionInfo fi = FunctionRegistry.getFunctionInfo(funcName); + + genericUDTF = fi.getGenericUDTF(); + funcIsUDTF = (genericUDTF != null); + + // Currently disable functionality for multiple UDTF's in the SELECT + // expression. Remove this check to enable. + if(selectHasUDTF && funcIsUDTF) { + throw new SemanticException("Only a single UDTF is supported in " + + "the SELECT expression."); + } + + selectHasUDTF = funcIsUDTF || selectHasUDTF; } - out_rwsch.put(tabAlias, colAlias, - new ColumnInfo(getColumnInternalName(pos), - exp.getTypeInfo(), tabAlias, false)); - pos = Integer.valueOf(pos.intValue() + 1); + + ArrayList exprsToProcess = new ArrayList(); + // For UDTF's, expand the parameters to be columns in the row. + // So a query like SELECT key, explode(value)... will look something + // like SELECT key, value ... to this section of code + if (funcIsUDTF) { + // index starts at 1 to skip the function name + for (int j=1; j e : udtfMap.entrySet()) { + LOG.debug("UDTF: " + e.getKey() + " Info: " + e.getValue()); + } + if(selectHasUDTF) { + output = genUDTFPlan(udtfMap, qb, output); + } LOG.debug("Created Select Plan for clause: " + dest + " row schema: " + out_rwsch.toString()); return output; @@ -2857,6 +2962,96 @@ return limitMap; } + private Operator genUDTFPlan(Map udtfMap, + QB qb, Operator input) throws SemanticException { + // Chain UDTF operators for multiple UDTF's in the SELECT expression + Operator output = input; + for(Map.Entry e : udtfMap.entrySet()) { + GenericUDTF genericUDTF = e.getKey(); + UDTFInfo udtfInfo = e.getValue(); + output = insertSingleUDTF(genericUDTF, udtfInfo, qb, output); + } + return output; + } + + private Operator insertSingleUDTF(GenericUDTF genericUDTF, UDTFInfo udtfInfo, + QB qb, Operator input) throws SemanticException { + + // Use the row resolver from the input operator to generate a ObjectInspector that can be used + // to initialize the UDTF. Then, the resulting output object inspector can be used to make + // the RowResolver + RowResolver selectRR = opParseCtx.get(input).getRR(); + Vector inputCols = selectRR.getColumnInfos(); + + // Verify that the input columns for the UDTF are sequential + // This check is for debugging purposes + List inputColNames = udtfInfo.getInputColNames(); + if(inputColNames.size() == 0) { + throw new SemanticException("UDTF's must take an input argument"); + } + int startPos = selectRR.getPosition(inputColNames.get(0)); + for(int i=1; i colNames = new ArrayList(); + ObjectInspector [] colOIs = new ObjectInspector[inputColNames.size()]; + for(int i=0; i outputCols = new ArrayList(); + + // Create the output columns, using the fact the the UDTF will take multiple + // input cols and output a single output col + Set inputColSet = new HashSet(); + inputColSet.addAll(inputColNames); + + boolean insertedUDTFCol = false; + for(int i=0; i opRules = new LinkedHashMap(); - StringBuilder sb = new StringBuilder(); - Formatter fm = new Formatter(sb); + opRules.put(new RuleRegExp("R1", HiveParser.TOK_NULL + "%"), TypeCheckProcFactory.getNullExprProcessor()); opRules.put(new RuleRegExp("R2", HiveParser.Number + "%"), TypeCheckProcFactory.getNumExprProcessor()); opRules.put(new RuleRegExp("R3", HiveParser.Identifier + "%|" + Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (revision 0) @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDTFOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.udtfDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * A Generic User-defined Table Generating Function (UDTF) + * + * Generates a variable number of output rows for a variable number of input + * rows. Useful for explode(array()), histograms, etc + */ + +public abstract class GenericUDTF { + Collector collector = null; + + /** + * Initialize this GenericUDTF. This will be called only once per + * instance. + * + * @param args An array of ObjectInspectors for the arguments + * @return ObjectInspector for the output + */ + public abstract ObjectInspector initialize(ObjectInspector [] argOIs) + throws UDFArgumentException; + + /** + * Give a row for the UDTF to process. + * + * @param o row object + */ + public abstract void process(Object [] args) throws HiveException; + + /** + * Notify the UDTF that there are no more rows to process. The UDTF may + * emit rows + */ + public abstract void close() throws HiveException; + + /** + * Associates a collector with this UDTF. Can't be specified in the + * constructor as the UDTF may be initialized before the collector has been + * constructed. + * + * @param collector + */ + public final void setCollector(Collector collector) { + this.collector = collector; + } + + /** + * Passes output data to collector + * + * @param o + * @throws HiveException + */ + final void forward(Object o) throws HiveException { + collector.collect(o); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java (revision 0) @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.UDTFOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * UDTFCollector collects data from a GenericUDTF and passes the data to a + * UDTFOperator + */ +public class UDTFCollector implements Collector { + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.udf.generic.Collector#collect(java.lang.Object) + */ + UDTFOperator op = null; + + public UDTFCollector(UDTFOperator op) { + this.op = op; + } + @Override + public void collect(Object input) throws HiveException{ + op.forwardUDTFOutput(input); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (revision 0) @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.description; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +@description( + name = "explode", + value = "_FUNC_(a) - separates the elements of array a into multiple rows " +) +public class GenericUDTFExplode extends GenericUDTF { + + ListObjectInspector listOI = null; + + @Override + public void close() throws HiveException{ + } + + @Override + public ObjectInspector initialize(ObjectInspector [] args) throws UDFArgumentException { + + if(args.length != 1) { + throw new UDFArgumentException("explode() takes only one argument"); + } + + if(args[0].getCategory() != ObjectInspector.Category.LIST) { + throw new UDFArgumentException("explode() takes an array as a parameter"); + } + listOI = (ListObjectInspector)args[0]; + + return listOI.getListElementObjectInspector(); + } + + @Override + public void process(Object [] o) throws HiveException { + + List list = listOI.getList(o[0]); + for(Object r : list) { + this.forward(r); + } + } + + @Override + public String toString() { + return "explode"; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Collector gets data from a source. What it does with the data is unspecified. + */ +public interface Collector { + /** + * Other classes will call collect() with the data that it has. + * @param input + */ + void collect(Object input) throws HiveException; +} Index: ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java =================================================================== --- ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (revision 4330) +++ ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (working copy) @@ -27,6 +27,7 @@ public static final int FILESINK = 10; public static final int REDUCESINK = 11; public static final int UNION = 12; + public static final int UDTF = 13; public static final IntRangeSet VALID_VALUES = new IntRangeSet( JOIN, @@ -41,7 +42,8 @@ TABLESCAN, FILESINK, REDUCESINK, - UNION ); + UNION, + UDTF); public static final Map VALUES_TO_NAMES = new HashMap() {{ put(JOIN, "JOIN"); @@ -57,5 +59,6 @@ put(FILESINK, "FILESINK"); put(REDUCESINK, "REDUCESINK"); put(UNION, "UNION"); + put(UDTF, "UDTF"); }}; }