diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 9caffb6..e37cc5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -11761,9 +11761,9 @@ private BoundarySpec processBoundary(int frameType, ASTNode node) throws Semant else { int amt = Integer.parseInt(amtNode.getText()); - if ( amt < 0 ) { + if ( amt <= 0 ) { throw new SemanticException( - "Window Frame Boundary Amount must be a +ve integer, amount provide is: " + amt); + "Window Frame Boundary Amount must be a positive integer, provided amount is: " + amt); } bs.setAmt(amt); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java index 160ce91..7ad4eb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java @@ -262,7 +262,7 @@ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveExcep } } - if (s.numRows >= wFrameDef.getEnd().getRelativeOffset()) { + if (s.hasResultReady()) { /* * if skipNulls is true and there are no rows in valueChain => all rows * in partition are null so far; so add null in o/p @@ -293,12 +293,14 @@ public Object terminate(AggregationBuffer agg) throws HiveException { // For the case: X following and Y following, process first Y-X results and then insert X nulls. // For the case X preceding and Y following, process Y results. for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) { - s.results.add(r == null ? null : r.val); + if (s.hasResultReady()) { + s.results.add(r == null ? null : r.val); + } s.numRows++; if (r != null) { int fIdx = (Integer) r.idx; if (!wFrameDef.isStartUnbounded() - && s.numRows + i >= fIdx + wFrameDef.getWindowSize() + && s.numRows >= fIdx + wFrameDef.getWindowSize() && !s.valueChain.isEmpty()) { s.valueChain.removeFirst(); r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r; @@ -307,7 +309,9 @@ public Object terminate(AggregationBuffer agg) throws HiveException { } for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) { - s.results.add(null); + if (s.hasResultReady()) { + s.results.add(null); + } s.numRows++; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java index f917621..4989a0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java @@ -251,10 +251,15 @@ public Object terminate(AggregationBuffer agg) throws HiveException { // For the case: X following and Y following, process first Y-X results and then insert X nulls. // For the case X preceding and Y following, process Y results. for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) { - s.results.add(s.lastValue); + if (s.hasResultReady()) { + s.results.add(s.lastValue); + } + s.numRows++; } for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) { - s.results.add(null); + if (s.hasResultReady()) { + s.results.add(null); + } s.numRows++; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java index 98abd5c..43b23fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java @@ -251,7 +251,7 @@ public void iterate(AggregationBuffer agg, Object[] parameters) s.maxChain.addLast(new Object[] { o, s.numRows }); } - if (s.numRows >= wFrameDef.getEnd().getRelativeOffset()) { + if (s.hasResultReady()) { s.results.add(s.maxChain.getFirst()[0]); } s.numRows++; @@ -287,20 +287,24 @@ public Object terminate(AggregationBuffer agg) throws HiveException { // For the case: X following and Y following, process first Y-X results and then insert X nulls. // For the case X preceding and Y following, process Y results. for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) { - s.results.add(r == null ? null : r[0]); + if (s.hasResultReady()) { + s.results.add(r == null ? null : r[0]); + } s.numRows++; if (r != null) { int fIdx = (Integer) r[1]; if (!wFrameDef.isStartUnbounded() - && s.numRows + i >= fIdx + wFrameDef.getWindowSize() + && s.numRows >= fIdx + wFrameDef.getWindowSize() && !s.maxChain.isEmpty()) { s.maxChain.removeFirst(); - r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : r; + r = !s.maxChain.isEmpty() ? s.maxChain.getFirst() : null; } } } for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) { - s.results.add(null); + if (s.hasResultReady()) { + s.results.add(null); + } s.numRows++; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java index 3c76404..d2e1b26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFStreamingEvaluator.java @@ -56,6 +56,16 @@ protected void reset() { results.clear(); numRows = 0; } + + /** + * For the cases "X preceding and Y preceding" or the number of processed rows + * is more than the size of FOLLOWING window, we are able to generate a PTF result + * for a previous row. + * @return + */ + public boolean hasResultReady() { + return this.numRows >= wFrameDef.getEnd().getRelativeOffset(); + } } @Override @@ -141,16 +151,6 @@ protected void reset() { } /** - * For the cases "X preceding and Y preceding" or the number of processed rows - * is more than the size of FOLLOWING window, we are able to generate a PTF result - * for a previous row. - * @return - */ - public boolean hasResultReady() { - return this.numRows >= wFrameDef.getEnd().getRelativeOffset(); - } - - /** * Retrieve the next stored intermediate result, i.e., * Get S[x-1] in the computation of S[x..y] = S[y] - S[x-1]. */ @@ -206,11 +206,15 @@ public Object terminate(AggregationBuffer agg) throws HiveException { // For the case: X following and Y following, process first Y-X results and then insert X nulls. // For the case X preceding and Y following, process Y results. for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) { - ss.results.add(getNextResult(ss)); + if (ss.hasResultReady()) { + ss.results.add(getNextResult(ss)); + } ss.numRows++; } for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) { - ss.results.add(null); + if (ss.hasResultReady()) { + ss.results.add(null); + } ss.numRows++; } diff --git a/ql/src/test/queries/clientpositive/windowing_windowspec4.q b/ql/src/test/queries/clientpositive/windowing_windowspec4.q new file mode 100644 index 0000000..3c497b7 --- /dev/null +++ b/ql/src/test/queries/clientpositive/windowing_windowspec4.q @@ -0,0 +1,17 @@ +--Test small dataset with larger windowing + +drop table if exists smalltable_windowing; + +create table smalltable_windowing( + i int, + type string); +insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a'); + +select type, i, +max(i) over (partition by type order by i rows between 1 preceding and 7 following), +min(i) over (partition by type order by i rows between 1 preceding and 7 following), +first_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +last_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +avg(i) over (partition by type order by i rows between 1 preceding and 7 following), +sum(i) over (partition by type order by i rows between 1 preceding and 7 following) +from smalltable_windowing; diff --git a/ql/src/test/results/clientpositive/windowing_windowspec4.q.out b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out new file mode 100644 index 0000000..deaf572 --- /dev/null +++ b/ql/src/test/results/clientpositive/windowing_windowspec4.q.out @@ -0,0 +1,55 @@ +PREHOOK: query: --Test small dataset with larger windowing + +drop table if exists smalltable_windowing +PREHOOK: type: DROPTABLE +POSTHOOK: query: --Test small dataset with larger windowing + +drop table if exists smalltable_windowing +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table smalltable_windowing( + i int, + type string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@smalltable_windowing +POSTHOOK: query: create table smalltable_windowing( + i int, + type string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@smalltable_windowing +PREHOOK: query: insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@smalltable_windowing +POSTHOOK: query: insert into smalltable_windowing values(3, 'a'), (1, 'a'), (2, 'a') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@smalltable_windowing +POSTHOOK: Lineage: smalltable_windowing.i EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: smalltable_windowing.type SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select type, i, +max(i) over (partition by type order by i rows between 1 preceding and 7 following), +min(i) over (partition by type order by i rows between 1 preceding and 7 following), +first_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +last_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +avg(i) over (partition by type order by i rows between 1 preceding and 7 following), +sum(i) over (partition by type order by i rows between 1 preceding and 7 following) +from smalltable_windowing +PREHOOK: type: QUERY +PREHOOK: Input: default@smalltable_windowing +#### A masked pattern was here #### +POSTHOOK: query: select type, i, +max(i) over (partition by type order by i rows between 1 preceding and 7 following), +min(i) over (partition by type order by i rows between 1 preceding and 7 following), +first_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +last_value(i) over (partition by type order by i rows between 1 preceding and 7 following), +avg(i) over (partition by type order by i rows between 1 preceding and 7 following), +sum(i) over (partition by type order by i rows between 1 preceding and 7 following) +from smalltable_windowing +POSTHOOK: type: QUERY +POSTHOOK: Input: default@smalltable_windowing +#### A masked pattern was here #### +a 1 3 1 1 3 2.0 6 +a 2 3 1 1 3 2.0 6 +a 3 3 2 2 3 2.5 5