Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 4330) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -129,6 +129,9 @@ HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float)0.5), HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float)0.5), + // for hive udtf operator + HIVEUDTFAUTOPROGRESS("hive.udtf.auto.progress", false), + // Default file format for CREATE TABLE statement // Options: TextFile, SequenceFile HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"), Index: ql/src/test/results/clientnegative/udtf_not_supported2.q.out =================================================================== --- ql/src/test/results/clientnegative/udtf_not_supported2.q.out (revision 0) +++ ql/src/test/results/clientnegative/udtf_not_supported2.q.out (revision 0) @@ -0,0 +1 @@ +FAILED: Error in semantic analysis: UDTF's require an AS clause Index: ql/src/test/results/clientnegative/udtf_not_supported1.q.out =================================================================== --- ql/src/test/results/clientnegative/udtf_not_supported1.q.out (revision 0) +++ ql/src/test/results/clientnegative/udtf_not_supported1.q.out (revision 0) @@ -0,0 +1 @@ +FAILED: Error in semantic analysis: Only a single expression in SELECT is supported with UDTF's Index: ql/src/test/results/clientnegative/udtf_not_supported3.q.out =================================================================== --- ql/src/test/results/clientnegative/udtf_not_supported3.q.out (revision 0) +++ ql/src/test/results/clientnegative/udtf_not_supported3.q.out (revision 0) @@ -0,0 +1 @@ +FAILED: Error in semantic analysis: GROUP BY is not supported with UDTF's Index: ql/src/test/results/clientpositive/show_functions.q.out =================================================================== --- ql/src/test/results/clientpositive/show_functions.q.out (revision 4330) +++ ql/src/test/results/clientpositive/show_functions.q.out (working copy) @@ -46,6 +46,7 @@ double elt exp +explode find_in_set float floor @@ -145,6 +146,7 @@ case coalesce double +explode from_unixtime lcase like Index: ql/src/test/results/clientpositive/udtf_explode.q.out =================================================================== --- ql/src/test/results/clientpositive/udtf_explode.q.out (revision 0) +++ ql/src/test/results/clientpositive/udtf_explode.q.out (revision 0) @@ -0,0 +1,236 @@ +PREHOOK: query: EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + outputColumnName: myCol + function name: explode + Limit + File Output Operator + compressed: false + GlobalTableId: 0 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1475393544/10001 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns myCol + serialization.format 1 + columns.types int + Needs Tagging: false + Path -> Alias: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [src] + Path -> Partition: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + Partition + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + name src + columns.types string:string + serialization.ddl struct src { string key, string value} + serialization.format 1 + columns key,value + bucket_count -1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + file.inputformat org.apache.hadoop.mapred.TextInputFormat + file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1258511320 + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: src + + Stage: Stage-0 + Fetch Operator + limit: 3 + + +PREHOOK: query: EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION explode (TOK_FUNCTION array 1 2 3)) myCol)) (TOK_LIMIT 3))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL myCol)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (TOK_TABLE_OR_COL myCol)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a:src + TableScan + alias: src + Select Operator + expressions: + expr: array(1,2,3) + type: array + outputColumnNames: _col0 + UDTF Operator + outputColumnName: myCol + function name: explode + Limit + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: myCol + type: int + Needs Tagging: false + Path -> Alias: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src [a:src] + Path -> Partition: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + Partition + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + name src + columns.types string:string + serialization.ddl struct src { string key, string value} + serialization.format 1 + columns key,value + bucket_count -1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + file.inputformat org.apache.hadoop.mapred.TextInputFormat + file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + location file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/test/data/warehouse/src + transient_lastDdlTime 1258511320 + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: src + Reduce Operator Tree: + Extract + Limit + Select Operator + expressions: + expr: _col0 + type: int + outputColumnNames: _col0 + Group By Operator + aggregations: + expr: count(1) + keys: + expr: _col0 + type: int + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/2132331167/10002 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types int,bigint + escape.delim \ + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/2132331167/10002 + Reduce Output Operator + key expressions: + expr: _col0 + type: int + sort order: + + Map-reduce partition columns: + expr: _col0 + type: int + tag: -1 + value expressions: + expr: _col1 + type: bigint + Needs Tagging: false + Path -> Alias: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/2132331167/10002 [file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/2132331167/10002] + Path -> Partition: + file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/2132331167/10002 + Partition + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types int,bigint + escape.delim \ + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + keys: + expr: KEY._col0 + type: int + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + directory: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/2132331167/10001 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + serialization.format 1 + columns.types int:bigint + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1307066345/10000 +POSTHOOK: query: SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/1307066345/10000 +1 +2 +3 +PREHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/523956254/10000 +POSTHOOK: query: SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/pyang/udtf/trunk/VENDOR.hive/trunk/build/ql/tmp/523956254/10000 +1 1 +2 1 +3 1 Index: ql/src/test/queries/clientnegative/udtf_not_supported1.q =================================================================== --- ql/src/test/queries/clientnegative/udtf_not_supported1.q (revision 0) +++ ql/src/test/queries/clientnegative/udtf_not_supported1.q (revision 0) @@ -0,0 +1 @@ +SELECT explode(array(1,2,3)) as myCol, key FROM src; Index: ql/src/test/queries/clientnegative/udtf_not_supported3.q =================================================================== --- ql/src/test/queries/clientnegative/udtf_not_supported3.q (revision 0) +++ ql/src/test/queries/clientnegative/udtf_not_supported3.q (revision 0) @@ -0,0 +1 @@ +SELECT explode(array(1,2,3)) as myCol FROM src GROUP BY key; Index: ql/src/test/queries/clientnegative/udtf_not_supported2.q =================================================================== --- ql/src/test/queries/clientnegative/udtf_not_supported2.q (revision 0) +++ ql/src/test/queries/clientnegative/udtf_not_supported2.q (revision 0) @@ -0,0 +1 @@ +SELECT explode(array(1,2,3)) FROM src; Index: ql/src/test/queries/clientpositive/udtf_explode.q =================================================================== --- ql/src/test/queries/clientpositive/udtf_explode.q (revision 0) +++ ql/src/test/queries/clientpositive/udtf_explode.q (revision 0) @@ -0,0 +1,5 @@ +EXPLAIN EXTENDED SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; +EXPLAIN EXTENDED SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; + +SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3; +SELECT myCol, count(1) FROM (SELECT explode(array(1,2,3)) AS myCol FROM src LIMIT 3) a GROUP BY myCol; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.ScriptOperator; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UDTFOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; @@ -253,7 +254,7 @@ // Without this break, a bug in ReduceSink to Extract edge column pruning will manifest // which should be fixed before remove this if ((child instanceof FileSinkOperator) - || (child instanceof ScriptOperator) + || (child instanceof ScriptOperator) || (child instanceof UDTFOperator) || (child instanceof LimitOperator) || (child instanceof UnionOperator)) { cppCtx.getPrunedColLists().put(op, cppCtx.getColsFromSelectExpr(op)); return null; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionInfo.java (working copy) @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; public class FunctionInfo { @@ -27,24 +28,30 @@ private String displayName; - private GenericUDF genericUDF; + private GenericUDF genericUDF = null; - private GenericUDAFResolver genericUDAFResolver; - + private GenericUDTF genericUDTF = null; + + private GenericUDAFResolver genericUDAFResolver = null; + public FunctionInfo(boolean isNative, String displayName, GenericUDF genericUDF) { this.isNative = isNative; this.displayName = displayName; this.genericUDF = genericUDF; - this.genericUDAFResolver = null; } public FunctionInfo(boolean isNative, String displayName, GenericUDAFResolver genericUDAFResolver) { this.isNative = isNative; this.displayName = displayName; - this.genericUDF = null; this.genericUDAFResolver = genericUDAFResolver; } + public FunctionInfo(boolean isNative, String displayName, GenericUDTF genericUDTF) { + this.isNative = isNative; + this.displayName = displayName; + this.genericUDTF = genericUDTF; + } + /** * Get a new GenericUDF object for the function. */ @@ -54,6 +61,16 @@ } /** + * Get a new GenericUDTF object for the function. + */ + public GenericUDTF getGenericUDTF() { + // GenericUDTF is stateful too, copy + if (genericUDTF == null) + return null; + return FunctionRegistry.cloneGenericUDTF(genericUDTF); + } + + /** * Get the GenericUDAFResolver object for the function. */ public GenericUDAFResolver getGenericUDAFResolver() { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 4339) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -216,6 +216,9 @@ registerGenericUDF("locate", GenericUDFLocate.class); registerGenericUDF("elt", GenericUDFElt.class); registerGenericUDF("concat_ws", GenericUDFConcatWS.class); + + // Generic UDTF's + registerGenericUDTF("explode", GenericUDTFExplode.class); } public static void registerTemporaryUDF(String functionName, Class UDFClass, @@ -245,7 +248,7 @@ new GenericUDFBridge(displayName, isOperator, UDFClass)); mFunctions.put(functionName.toLowerCase(), fI); } else { - throw new RuntimeException("Registering UDF Class " + UDFClass + " which does not extends " + UDF.class); + throw new RuntimeException("Registering UDF Class " + UDFClass + " which does not extend " + UDF.class); } } @@ -264,10 +267,25 @@ mFunctions.put(functionName.toLowerCase(), fI); } else { throw new RuntimeException("Registering GenericUDF Class " + genericUDFClass - + " which does not extends " + GenericUDF.class); + + " which does not extend " + GenericUDF.class); } } + static void registerGenericUDTF(String functionName, Class genericUDTFClass) { + registerGenericUDTF(true, functionName, genericUDTFClass); + } + + public static void registerGenericUDTF(boolean isNative, String functionName, Class genericUDTFClass) { + if (GenericUDTF.class.isAssignableFrom(genericUDTFClass)) { + FunctionInfo fI = new FunctionInfo(isNative, functionName, + (GenericUDTF)ReflectionUtils.newInstance(genericUDTFClass, null)); + mFunctions.put(functionName.toLowerCase(), fI); + } else { + throw new RuntimeException("Registering GenericUDTF Class " + genericUDTFClass + + " which does not extend " + GenericUDTF.class); + } + } + public static FunctionInfo getFunctionInfo(String functionName) { return mFunctions.get(functionName.toLowerCase()); } @@ -630,6 +648,13 @@ } /** + * Create a copy of an existing GenericUDTF. + */ + public static GenericUDTF cloneGenericUDTF(GenericUDTF genericUDTF) { + return (GenericUDTF)ReflectionUtils.newInstance(genericUDTF.getClass(), null); + } + + /** * Get the UDF class from an exprNodeDesc. * Returns null if the exprNodeDesc does not contain a UDF class. */ Index: ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (revision 0) @@ -0,0 +1,111 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.udtfDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; + +public class UDTFOperator extends Operator implements Serializable { + private static final long serialVersionUID = 1L; + + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + ObjectInspector [] udtfInputOIs = null; + Object [] objToSendToUDTF = null; + Object [] forwardObj = new Object[1]; + + /** + * sends periodic reports back to the tracker. + */ + transient AutoProgressor autoProgressor; + + protected void initializeOp(Configuration hconf) throws HiveException { + conf.getUDTF().setCollector(new UDTFCollector(this)); + + // Make an object inspector [] of the arguments to the UDTF + List inputFields = + ((StandardStructObjectInspector)inputObjInspectors[0]).getAllStructFieldRefs(); + + udtfInputOIs = new ObjectInspector[inputFields.size()]; + for (int i=0; i colNames = new ArrayList(); + ArrayList colOIs = new ArrayList(); + colNames.add(conf.getOutputColName()); + colOIs.add(udtfOutputOI); + outputObjInspector = + ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colOIs); + + // Set up periodic progress reporting in case the UDTF doesn't output rows + // for a while + if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) { + autoProgressor = new AutoProgressor(this.getClass().getName(), reporter, + Utilities.getDefaultNotificationInterval(hconf)); + autoProgressor.go(); + } + + // Initialize the rest of the operator DAG + super.initializeOp(hconf); + } + + public void processOp(Object row, int tag) throws HiveException { + // The UDTF expects arguments in an object[] + StandardStructObjectInspector soi = + (StandardStructObjectInspector) inputObjInspectors[tag]; + List fields = soi.getAllStructFieldRefs(); + + for (int i=0; i (limitDesc.class, LimitOperator.class)); opvec.add(new opTuple (tableScanDesc.class, TableScanOperator.class)); opvec.add(new opTuple (unionDesc.class, UnionOperator.class)); + opvec.add(new opTuple (udtfDesc.class, UDTFOperator.class)); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -785,4 +785,23 @@ } return sb.toString(); } + /** + * Gets the default notification interval to send progress updates to the + * tracker. Useful for operators that may not output data for a while. + * + * @param hconf + * @return the interval in miliseconds + */ + public static int getDefaultNotificationInterval(Configuration hconf) { + int notificationInterval; + Integer expInterval = Integer.decode(hconf.get("mapred.tasktracker.expiry.interval")); + + if (expInterval != null) { + notificationInterval = expInterval.intValue() / 2; + } else { + // 5 minutes + notificationInterval = 5 * 60 * 1000; + } + return notificationInterval; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AutoProgressor.java (revision 0) @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.Reporter; + +/** + * AutoProgressor periodically sends updates to the job tracker so that + * it doesn't consider this task attempt dead if there is a long period of + * inactivity. + * + * @author pyang + * + */ +public class AutoProgressor { + protected Log LOG = LogFactory.getLog(this.getClass().getName()); + + // Timer that reports every 5 minutes to the jobtracker. This ensures that + // even if the operator returning rows for greater than that + // duration, a progress report is sent to the tracker so that the tracker + // does not think that the job is dead. + Timer rpTimer = null; + // Name of the class to report for + String logClassName = null; + int notificationInterval; + Reporter reporter; + + class ReporterTask extends TimerTask { + + /** + * Reporter to report progress to the jobtracker. + */ + private Reporter rp; + + /** + * Constructor. + */ + public ReporterTask(Reporter rp) { + if (rp != null) + this.rp = rp; + } + + @Override + public void run() { + if (rp != null) { + LOG.info("ReporterTask calling reporter.progress() for " + logClassName); + rp.progress(); + } + } + } + + AutoProgressor(String logClassName, Reporter reporter, int notificationInterval) { + this.logClassName = logClassName; + this.reporter = reporter; + } + + public void go() { + LOG.info("Running ReporterTask every " + notificationInterval + " miliseconds."); + rpTimer = new Timer(true); + rpTimer.scheduleAtFixedRate(new ReporterTask(reporter), 0, notificationInterval); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/udtfDesc.java (revision 0) @@ -0,0 +1,41 @@ +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; + +@explain(displayName="UDTF Operator") +public class udtfDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private GenericUDTF genericUDTF; + private String outputColName; + + public udtfDesc() { } + public udtfDesc(final GenericUDTF genericUDTF, String outputColName) { + this.genericUDTF = genericUDTF; + this.outputColName = outputColName; + } + + public GenericUDTF getUDTF() { + return this.genericUDTF; + } + public void setUDTF(final GenericUDTF genericUDTF) { + this.genericUDTF=genericUDTF; + } + @explain(displayName="function name") + public String getUDTFName() { + return this.genericUDTF.toString(); + } + + @explain(displayName="outputColumnName") + public String getOutputColName() { + return this.outputColName; + } + + public void setOutputColName(String outputColName) { + this.outputColName = outputColName; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (working copy) @@ -180,6 +180,9 @@ public ASTNode getGroupByForClause(String clause) { return this.destToGroupby.get(clause); } + public HashMap getDestToGroupBy() { + return this.destToGroupby; + } public ASTNode getSelForClause(String clause) { return this.destToSelExpr.get(clause); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (working copy) @@ -532,6 +532,10 @@ else throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg((ASTNode)expr)); } + // Detect UDTF's in nested SELECT, GROUP BY, etc as they aren't supported + if (fi.getGenericUDTF() != null) { + throw new SemanticException("UDTF's are not supported outside the SELECT clause, nor nested in expressions"); + } try { desc = getFuncExprNodeDesc(funcText, children); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 4330) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -20,13 +20,10 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Formatter; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -61,10 +59,8 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -116,17 +112,22 @@ import org.apache.hadoop.hive.ql.plan.selectDesc; import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.ql.plan.tableScanDesc; +import org.apache.hadoop.hive.ql.plan.udtfDesc; import org.apache.hadoop.hive.ql.plan.unionDesc; import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -136,19 +137,17 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.ql.exec.TextRecordReader; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import java.util.regex.Pattern; import java.util.regex.Matcher; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; @@ -163,7 +162,6 @@ import org.apache.hadoop.hive.ql.exec.FetchOperator; import java.util.Collection; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; /** * Implementation of the semantic analyzer @@ -1392,13 +1390,55 @@ if (isInTransform) { trfm = (ASTNode) selExprList.getChild(posn).getChild(0); } - + + // Detect a UDTF by looking up the function name in the registry. + // Not as clean TRANSFORM due to the lack of a special token. + boolean isUDTF = false; + String udtfOutputColAlias = null; + ASTNode udtfExpr = (ASTNode) selExprList.getChild(posn).getChild(0); + GenericUDTF genericUDTF = null; + + if (udtfExpr.getType() == HiveParser.TOK_FUNCTION) { + String funcName = + TypeCheckProcFactory.DefaultExprProcessor.getFunctionText(udtfExpr, true); + FunctionInfo fi = FunctionRegistry.getFunctionInfo(funcName); + if (fi != null) { + genericUDTF = fi.getGenericUDTF(); + } + isUDTF = (genericUDTF != null); + } + + if (isUDTF) { + // Only support a single expression when it's a UDTF + if (selExprList.getChildCount() > 1) { + throw new SemanticException("Only a single expression in SELECT is supported with UDTF's"); + } + //Require an AS for UDTFs + if (((ASTNode) selExprList.getChild(posn)).getChildCount() != 2 || + selExprList.getChild(posn).getChild(1).getType() != HiveParser.Identifier ){ + throw new SemanticException("UDTF's require an AS clause"); + } + udtfOutputColAlias = unescapeIdentifier(selExprList.getChild(posn).getChild(1).getText()); + } + // The list of expressions after SELECT or SELECT TRANSFORM. - ASTNode exprList = (isInTransform ? (ASTNode) trfm.getChild(0) : selExprList); + ASTNode exprList; + if (isInTransform) { + exprList = (ASTNode) trfm.getChild(0); + } else if (isUDTF) { + exprList = (ASTNode) udtfExpr; + } else { + exprList = selExprList; + } + LOG.debug("genSelectPlan: input = " + inputRR.toString()); + + // For UDTF's, skip the function name + int startPosn = isUDTF ? posn + 1 : posn; + // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM) - for (int i = posn; i < exprList.getChildCount(); ++i) { + for (int i = startPosn; i < exprList.getChildCount(); ++i) { // child can be EXPR AS ALIAS, or EXPR. ASTNode child = (ASTNode) exprList.getChild(i); @@ -1408,7 +1448,7 @@ String tabAlias; String colAlias; - if (isInTransform) { + if (isInTransform || isUDTF) { tabAlias = null; colAlias = "_C" + i; expr = child; @@ -1457,6 +1497,7 @@ out_rwsch.put(tabAlias, colAlias, new ColumnInfo(getColumnInternalName(pos), exp.getTypeInfo(), tabAlias, false)); + pos = Integer.valueOf(pos.intValue() + 1); } } @@ -1483,11 +1524,14 @@ output = genScriptPlan(trfm, qb, output); } + if(isUDTF) { + output = genUDTFPlan(genericUDTF, udtfOutputColAlias, qb, output); + } LOG.debug("Created Select Plan for clause: " + dest + " row schema: " + out_rwsch.toString()); return output; } - + /** * Class to store GenericUDAF related information. */ @@ -2856,7 +2900,64 @@ return limitMap; } + + private Operator genUDTFPlan(GenericUDTF genericUDTF, String udtfOutputColumnAlias, + QB qb, Operator input) throws SemanticException { + + // No GROUP BY / DISTRIBUTE BY / SORT BY / CLUSTER BY + QBParseInfo qbp = qb.getParseInfo(); + String notSupported = " is not supported with UDTF's"; + if (!qbp.getDestToGroupBy().isEmpty()) { + throw new SemanticException("GROUP BY" + notSupported); + } + if (!qbp.getDestToDistributeBy().isEmpty()) { + throw new SemanticException("DISTRYBUTE BY" + notSupported); + } + if (!qbp.getDestToSortBy().isEmpty()) { + throw new SemanticException("SORT BY" + notSupported); + } + if (!qbp.getDestToClusterBy().isEmpty()) { + throw new SemanticException("CLUSTER BY" + notSupported); + } + + // Use the RowResolver from the input operator to generate a input + // ObjectInspector that can be used to initialize the UDTF. Then, the + // resulting output object inspector can be used to make the RowResolver + // for the UDTF operator + RowResolver selectRR = opParseCtx.get(input).getRR(); + Vector inputCols = selectRR.getColumnInfos(); + + // Create the object inspector for the input columns and initialize the UDTF + ArrayList colNames = new ArrayList(); + ObjectInspector [] colOIs = new ObjectInspector[inputCols.size()]; + for (int i=0; i opRules = new LinkedHashMap(); - StringBuilder sb = new StringBuilder(); - Formatter fm = new Formatter(sb); + opRules.put(new RuleRegExp("R1", HiveParser.TOK_NULL + "%"), TypeCheckProcFactory.getNullExprProcessor()); opRules.put(new RuleRegExp("R2", HiveParser.Number + "%"), TypeCheckProcFactory.getNumExprProcessor()); opRules.put(new RuleRegExp("R3", HiveParser.Identifier + "%|" + Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java (revision 0) @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * A Generic User-defined Table Generating Function (UDTF) + * + * Generates a variable number of output rows for a variable number of input + * rows. Useful for explode(array()), histograms, etc + */ + +public abstract class GenericUDTF { + Collector collector = null; + + /** + * Initialize this GenericUDTF. This will be called only once per + * instance. + * + * @param args An array of ObjectInspectors for the arguments + * @return ObjectInspector for the output + */ + public abstract ObjectInspector initialize(ObjectInspector [] argOIs) + throws UDFArgumentException; + + /** + * Give a a set of arguments for the UDTF to process. + * + * @param o object array of arguments + */ + public abstract void process(Object [] args) throws HiveException; + + /** + * Notify the UDTF that there are no more rows to process. + */ + public abstract void close() throws HiveException; + + /** + * Associates a collector with this UDTF. Can't be specified in the + * constructor as the UDTF may be initialized before the collector has been + * constructed. + * + * @param collector + */ + public final void setCollector(Collector collector) { + this.collector = collector; + } + + /** + * Passes output data to collector + * + * @param o + * @throws HiveException + */ + final void forward(Object o) throws HiveException { + collector.collect(o); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/UDTFCollector.java (revision 0) @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.exec.UDTFOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * UDTFCollector collects data from a GenericUDTF and passes the data to a + * UDTFOperator + */ +public class UDTFCollector implements Collector { + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.udf.generic.Collector#collect(java.lang.Object) + */ + UDTFOperator op = null; + + public UDTFCollector(UDTFOperator op) { + this.op = op; + } + @Override + public void collect(Object input) throws HiveException{ + op.forwardUDTFOutput(input); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java (revision 0) @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.description; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +@description( + name = "explode", + value = "_FUNC_(a) - separates the elements of array a into multiple rows " +) +public class GenericUDTFExplode extends GenericUDTF { + + ListObjectInspector listOI = null; + + @Override + public void close() throws HiveException{ + } + + @Override + public ObjectInspector initialize(ObjectInspector [] args) throws UDFArgumentException { + + if(args.length != 1) { + throw new UDFArgumentException("explode() takes only one argument"); + } + + if(args[0].getCategory() != ObjectInspector.Category.LIST) { + throw new UDFArgumentException("explode() takes an array as a parameter"); + } + listOI = (ListObjectInspector)args[0]; + + return listOI.getListElementObjectInspector(); + } + + @Override + public void process(Object [] o) throws HiveException { + + List list = listOI.getList(o[0]); + for(Object r : list) { + this.forward(r); + } + } + + @Override + public String toString() { + return "explode"; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/Collector.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Collector gets data from a source. + */ +public interface Collector { + /** + * Other classes will call collect() with the data that it has. + * @param input + */ + void collect(Object input) throws HiveException; +} Index: ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java =================================================================== --- ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (revision 4330) +++ ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (working copy) @@ -27,6 +27,7 @@ public static final int FILESINK = 10; public static final int REDUCESINK = 11; public static final int UNION = 12; + public static final int UDTF = 13; public static final IntRangeSet VALID_VALUES = new IntRangeSet( JOIN, @@ -41,7 +42,8 @@ TABLESCAN, FILESINK, REDUCESINK, - UNION ); + UNION, + UDTF); public static final Map VALUES_TO_NAMES = new HashMap() {{ put(JOIN, "JOIN"); @@ -57,5 +59,6 @@ put(FILESINK, "FILESINK"); put(REDUCESINK, "REDUCESINK"); put(UNION, "UNION"); + put(UDTF, "UDTF"); }}; }