diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 288270e..4ab9280 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -29,6 +29,7 @@ minimr.query.files=auto_sortmerge_join_16.q,\ list_bucket_dml_10.q,\ load_fs2.q,\ load_hdfs_file_with_space_in_the_name.q,\ + non_native_window_udf.q, \ optrstat_groupby.q,\ parallel_orderby.q,\ ql_rewrite_gbtoidx.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java index 0423466..d7817d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java @@ -20,10 +20,14 @@ import java.util.AbstractList; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -60,10 +64,42 @@ @SuppressWarnings("deprecation") public class WindowingTableFunction extends TableFunctionEvaluator { + public static final Log LOG =LogFactory.getLog(WindowingTableFunction.class.getName()); + static class WindowingFunctionInfoHelper { + private boolean supportsWindow; + + WindowingFunctionInfoHelper() { + } + + public WindowingFunctionInfoHelper(boolean supportsWindow) { + this.supportsWindow = supportsWindow; + } + + public boolean isSupportsWindow() { + return supportsWindow; + } + public void setSupportsWindow(boolean supportsWindow) { + this.supportsWindow = supportsWindow; + } + } StreamingState streamingState; RankLimit rnkLimitDef; + + // There is some information about the windowing functions that needs to be initialized + // during query compilation time, and made available to during the map/reduce tasks via + // plan serialization. + Map windowingFunctionHelpers = null; + public Map getWindowingFunctionHelpers() { + return windowingFunctionHelpers; + } + + public void setWindowingFunctionHelpers( + Map windowingFunctionHelpers) { + this.windowingFunctionHelpers = windowingFunctionHelpers; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void execute(PTFPartitionIterator pItr, PTFPartition outP) throws HiveException { @@ -147,9 +183,8 @@ private boolean processWindow(WindowFunctionDef wFn) { private boolean streamingPossible(Configuration cfg, WindowFunctionDef wFnDef) throws HiveException { WindowFrameDef wdwFrame = wFnDef.getWindowFrame(); - WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFnDef - .getName()); + WindowingFunctionInfoHelper wFnInfo = getWindowingFunctionInfoHelper(wFnDef.getName()); if (!wFnInfo.isSupportsWindow()) { return true; } @@ -259,6 +294,45 @@ private boolean streamingPossible(Configuration cfg, WindowFunctionDef wFnDef) return new int[] {precedingSpan, followingSpan}; } + private void initializeWindowingFunctionInfoHelpers() throws SemanticException { + // getWindowFunctionInfo() cannot be called during map/reduce tasks. So cache necessary + // values during query compilation, and rely on plan serialization to bring this info + // to the object during the map/reduce tasks. + if (windowingFunctionHelpers != null) { + return; + } + + windowingFunctionHelpers = new HashMap(); + WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef(); + for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) { + WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i); + GenericUDAFEvaluator fnEval = wFn.getWFnEval(); + WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFn.getName()); + boolean supportsWindow = wFnInfo.isSupportsWindow(); + windowingFunctionHelpers.put(wFn.getName(), new WindowingFunctionInfoHelper(supportsWindow)); + } + } + + @Override + protected void setOutputOI(StructObjectInspector outputOI) { + super.setOutputOI(outputOI); + // Call here because at this point the WindowTableFunctionDef has been set + try { + initializeWindowingFunctionInfoHelpers(); + } catch (SemanticException err) { + throw new RuntimeException("Unexpected error while setting up windowing function", err); + } + } + + private WindowingFunctionInfoHelper getWindowingFunctionInfoHelper(String fnName) { + WindowingFunctionInfoHelper wFnInfoHelper = windowingFunctionHelpers.get(fnName); + if (wFnInfoHelper == null) { + // Should not happen + throw new RuntimeException("No cached WindowingFunctionInfoHelper for " + fnName); + } + return wFnInfoHelper; + } + @Override public void initializeStreaming(Configuration cfg, StructObjectInspector inputOI, boolean isMapSide) throws HiveException { @@ -412,8 +486,7 @@ public void startPartition() throws HiveException { if (fnEval instanceof ISupportStreamingModeForWindowing) { fnEval.terminate(streamingState.aggBuffers[i]); - WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFn - .getName()); + WindowingFunctionInfoHelper wFnInfo = getWindowingFunctionInfoHelper(wFn.getName()); if (!wFnInfo.isSupportsWindow()) { numRowsRemaining = ((ISupportStreamingModeForWindowing) fnEval) .getRowsRemainingAfterTerminate(); diff --git a/ql/src/test/queries/clientpositive/non_native_window_udf.q b/ql/src/test/queries/clientpositive/non_native_window_udf.q new file mode 100644 index 0000000..c3b8845 --- /dev/null +++ b/ql/src/test/queries/clientpositive/non_native_window_udf.q @@ -0,0 +1,11 @@ + +create temporary function mylastval as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLastValue'; + +select p_mfgr,p_name, p_size, +sum(p_size) over (distribute by p_mfgr sort by p_name rows between current row and current row) as s2, +first_value(p_size) over w1 as f, +last_value(p_size, false) over w1 as l, +mylastval(p_size, false) over w1 as m +from part +window w1 as (distribute by p_mfgr sort by p_name rows between 2 preceding and 2 following); + diff --git a/ql/src/test/results/clientpositive/non_native_window_udf.q.out b/ql/src/test/results/clientpositive/non_native_window_udf.q.out new file mode 100644 index 0000000..605e5b2 --- /dev/null +++ b/ql/src/test/results/clientpositive/non_native_window_udf.q.out @@ -0,0 +1,52 @@ +PREHOOK: query: create temporary function mylastval as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLastValue' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: mylastval +POSTHOOK: query: create temporary function mylastval as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLastValue' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: mylastval +PREHOOK: query: select p_mfgr,p_name, p_size, +sum(p_size) over (distribute by p_mfgr sort by p_name rows between current row and current row) as s2, +first_value(p_size) over w1 as f, +last_value(p_size, false) over w1 as l, +mylastval(p_size, false) over w1 as m +from part +window w1 as (distribute by p_mfgr sort by p_name rows between 2 preceding and 2 following) +PREHOOK: type: QUERY +PREHOOK: Input: default@part +#### A masked pattern was here #### +POSTHOOK: query: select p_mfgr,p_name, p_size, +sum(p_size) over (distribute by p_mfgr sort by p_name rows between current row and current row) as s2, +first_value(p_size) over w1 as f, +last_value(p_size, false) over w1 as l, +mylastval(p_size, false) over w1 as m +from part +window w1 as (distribute by p_mfgr sort by p_name rows between 2 preceding and 2 following) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@part +#### A masked pattern was here #### +Manufacturer#1 almond antique burnished rose metallic 2 2 2 34 34 +Manufacturer#1 almond antique burnished rose metallic 2 2 2 6 6 +Manufacturer#1 almond antique chartreuse lavender yellow 34 34 2 28 28 +Manufacturer#1 almond antique salmon chartreuse burlywood 6 6 2 42 42 +Manufacturer#1 almond aquamarine burnished black steel 28 28 34 42 42 +Manufacturer#1 almond aquamarine pink moccasin thistle 42 42 6 42 42 +Manufacturer#2 almond antique violet chocolate turquoise 14 14 14 2 2 +Manufacturer#2 almond antique violet turquoise frosted 40 40 14 25 25 +Manufacturer#2 almond aquamarine midnight light salmon 2 2 14 18 18 +Manufacturer#2 almond aquamarine rose maroon antique 25 25 40 18 18 +Manufacturer#2 almond aquamarine sandy cyan gainsboro 18 18 2 18 18 +Manufacturer#3 almond antique chartreuse khaki white 17 17 17 19 19 +Manufacturer#3 almond antique forest lavender goldenrod 14 14 17 1 1 +Manufacturer#3 almond antique metallic orange dim 19 19 17 45 45 +Manufacturer#3 almond antique misty red olive 1 1 14 45 45 +Manufacturer#3 almond antique olive coral navajo 45 45 19 45 45 +Manufacturer#4 almond antique gainsboro frosted violet 10 10 10 27 27 +Manufacturer#4 almond antique violet mint lemon 39 39 10 7 7 +Manufacturer#4 almond aquamarine floral ivory bisque 27 27 10 12 12 +Manufacturer#4 almond aquamarine yellow dodger mint 7 7 39 12 12 +Manufacturer#4 almond azure aquamarine papaya violet 12 12 27 12 12 +Manufacturer#5 almond antique blue firebrick mint 31 31 31 2 2 +Manufacturer#5 almond antique medium spring khaki 6 6 31 46 46 +Manufacturer#5 almond antique sky peru orange 2 2 31 23 23 +Manufacturer#5 almond aquamarine dodger light gainsboro 46 46 6 23 23 +Manufacturer#5 almond azure blanched chiffon midnight 23 23 2 23 23