diff --git pom.xml pom.xml
index 1921b06..52a481e 100644
--- pom.xml
+++ pom.xml
@@ -98,7 +98,7 @@
5.5.0
1.9.1
3.4
- 1.7.5
+ 1.7.7
0.8.0.RELEASE
1.2.0-incubating
3.2.6
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
index 00b43c6..d7f1c7f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
@@ -538,10 +538,7 @@ private WindowFrameDef translate(ShapeDetails inpShape,
"Window range invalid, start boundary is greater than end boundary: %s", spec));
}
- WindowFrameDef wfDef = new WindowFrameDef();
- wfDef.setStart(translate(inpShape, s));
- wfDef.setEnd(translate(inpShape, e));
- return wfDef;
+ return new WindowFrameDef(translate(inpShape, s), translate(inpShape, e));
}
private BoundaryDef translate(ShapeDetails inpShape, BoundarySpec bndSpec)
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java
index f692fa2..eeb094c 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/BoundaryDef.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.plan.ptf;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
public abstract class BoundaryDef {
@@ -33,6 +34,10 @@ public void setDirection(Direction direction) {
public abstract int getAmt();
+ public boolean isUnbounded() {
+ return this.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT;
+ }
+
@Override
public String toString() {
return direction == null ? "" :
diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java
index e08bdd5..d153b08 100644
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFrameDef.java
@@ -23,20 +23,28 @@
private BoundaryDef start;
private BoundaryDef end;
+ public WindowFrameDef(BoundaryDef start, BoundaryDef end) {
+ this.start = start;
+ this.end = end;
+ }
public BoundaryDef getStart() {
return start;
}
- public void setStart(BoundaryDef start) {
- this.start = start;
- }
-
public BoundaryDef getEnd() {
return end;
}
- public void setEnd(BoundaryDef end) {
- this.end = end;
+ public boolean isStartUnbounded() {
+ return start.isUnbounded();
+ }
+
+ public boolean isEndUnbounded() {
+ return end.isUnbounded();
+ }
+
+ public int getWindowSize() {
+ return end.getAmt() + start.getAmt() + 1;
}
@Override
diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
index 12a327f..9f78449 100644
--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
@@ -157,13 +157,9 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
}
@Override
- public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
- BoundaryDef start = wFrmDef.getStart();
- BoundaryDef end = wFrmDef.getEnd();
-
- return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this,
- start.getAmt(), end.getAmt()) {
+ return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(this, wFrameDef) {
@Override
protected DoubleWritable getNextResult(
@@ -172,14 +168,12 @@ protected DoubleWritable getNextResult(
AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf;
Double r = myagg.count == 0 ? null : myagg.sum;
long cnt = myagg.count;
- if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
- && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
- Object[] o = ss.intermediateVals.remove(0);
- if (o != null) {
- Double d = (Double) o[0];
- r = r == null ? null : r - d;
- cnt = cnt - ((Long) o[1]);
- }
+
+ Object[] o = ss.retrieveNextIntermediateValue();
+ if (o != null) {
+ Double d = (Double) o[0];
+ r = r == null ? null : r - d;
+ cnt = cnt - ((Long) o[1]);
}
return r == null ? null : new DoubleWritable(r / cnt);
@@ -287,13 +281,10 @@ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
}
@Override
- public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
-
- BoundaryDef start = wFrmDef.getStart();
- BoundaryDef end = wFrmDef.getEnd();
+ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrameDef) {
return new GenericUDAFStreamingEvaluator.SumAvgEnhancer(
- this, start.getAmt(), end.getAmt()) {
+ this, wFrameDef) {
@Override
protected HiveDecimalWritable getNextResult(
@@ -302,14 +293,12 @@ protected HiveDecimalWritable getNextResult(
AverageAggregationBuffer myagg = (AverageAggregationBuffer) ss.wrappedBuf;
HiveDecimal r = myagg.count == 0 ? null : myagg.sum;
long cnt = myagg.count;
- if (ss.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
- && (ss.numRows - ss.numFollowing) >= (ss.numPreceding + 1)) {
- Object[] o = ss.intermediateVals.remove(0);
- if (o != null) {
- HiveDecimal d = (HiveDecimal) o[0];
- r = r == null ? null : r.subtract(d);
- cnt = cnt - ((Long) o[1]);
- }
+
+ Object[] o = ss.retrieveNextIntermediateValue();
+ if (o != null) {
+ HiveDecimal d = (HiveDecimal) o[0];
+ r = r == null ? null : r.subtract(d);
+ cnt = cnt - ((Long) o[1]);
}
return r == null ? null : new HiveDecimalWritable(
diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
index f679387..dd9eaf3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFFirstValue.java
@@ -178,8 +178,8 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
private final Deque valueChain;
- public State(int numPreceding, int numFollowing, AggregationBuffer buf) {
- super(numPreceding, numFollowing, buf);
+ public State(AggregationBuffer buf) {
+ super(buf);
valueChain = new ArrayDeque(numPreceding + numFollowing + 1);
}
@@ -225,7 +225,7 @@ public int getRowsRemainingAfterTerminate() throws HiveException {
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
- return new State(numPreceding, numFollowing, underlying);
+ return new State(underlying);
}
protected ObjectInspector inputOI() {
@@ -252,7 +252,7 @@ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveExcep
* add row to chain. except in case of UNB preceding: - only 1 firstVal
* needs to be tracked.
*/
- if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT || s.valueChain.isEmpty()) {
+ if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT || s.valueChain.isEmpty()) {
/*
* add value to chain if it is not null or if skipNulls is false.
*/
@@ -261,7 +261,7 @@ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveExcep
}
}
- if (s.numRows >= (s.numFollowing)) {
+ if (s.numRows >= numFollowing) {
/*
* if skipNulls is true and there are no rows in valueChain => all rows
* in partition are null so far; so add null in o/p
@@ -276,8 +276,8 @@ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveExcep
if (s.valueChain.size() > 0) {
int fIdx = (Integer) s.valueChain.getFirst().idx;
- if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
- && s.numRows > fIdx + s.numPreceding + s.numFollowing) {
+ if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && s.numRows > fIdx + numPreceding + numFollowing) {
s.valueChain.removeFirst();
}
}
@@ -288,13 +288,13 @@ public Object terminate(AggregationBuffer agg) throws HiveException {
State s = (State) agg;
ValIndexPair r = s.valueChain.size() == 0 ? null : s.valueChain.getFirst();
- for (int i = 0; i < s.numFollowing; i++) {
+ for (int i = 0; i < numFollowing; i++) {
s.results.add(r == null ? null : r.val);
s.numRows++;
if (r != null) {
int fIdx = (Integer) r.idx;
- if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
- && s.numRows > fIdx + s.numPreceding + s.numFollowing
+ if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && s.numRows > fIdx + numPreceding + numFollowing
&& !s.valueChain.isEmpty()) {
s.valueChain.removeFirst();
r = !s.valueChain.isEmpty() ? s.valueChain.getFirst() : r;
diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
index e099154..3ed6de7 100644
--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLastValue.java
@@ -154,8 +154,8 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
private Object lastValue;
private int lastIdx;
- public State(int numPreceding, int numFollowing, AggregationBuffer buf) {
- super(numPreceding, numFollowing, buf);
+ public State(AggregationBuffer buf) {
+ super(buf);
lastValue = null;
lastIdx = -1;
}
@@ -192,7 +192,7 @@ public int getRowsRemainingAfterTerminate() throws HiveException {
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
AggregationBuffer underlying = wrappedEval.getNewAggregationBuffer();
- return new State(numPreceding, numFollowing, underlying);
+ return new State(underlying);
}
protected ObjectInspector inputOI() {
@@ -219,14 +219,14 @@ public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveExcep
s.lastValue = o;
s.lastIdx = s.numRows;
} else if (lb.skipNulls && s.lastIdx != -1) {
- if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
- && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) {
+ if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && s.numRows > s.lastIdx + numPreceding + numFollowing) {
s.lastValue = null;
s.lastIdx = -1;
}
}
- if (s.numRows >= (s.numFollowing)) {
+ if (s.numRows >= (numFollowing)) {
s.results.add(s.lastValue);
}
s.numRows++;
@@ -238,14 +238,14 @@ public Object terminate(AggregationBuffer agg) throws HiveException {
LastValueBuffer lb = (LastValueBuffer) s.wrappedBuf;
if (lb.skipNulls && s.lastIdx != -1) {
- if (s.numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
- && s.numRows > s.lastIdx + s.numPreceding + s.numFollowing) {
+ if (numPreceding != BoundarySpec.UNBOUNDED_AMOUNT
+ && s.numRows > s.lastIdx + numPreceding + numFollowing) {
s.lastValue = null;
s.lastIdx = -1;
}
}
- for (int i = 0; i < s.numFollowing; i++) {
+ for (int i = 0; i < numFollowing; i++) {
s.results.add(s.lastValue);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
index a153818..6b7808a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMax.java
@@ -166,8 +166,8 @@ public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
class State extends GenericUDAFStreamingEvaluator