Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 1529732) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -129,8 +129,6 @@ import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear; import org.apache.hadoop.hive.ql.udf.UDFYear; import org.apache.hadoop.hive.ql.udf.generic.*; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLag; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLead; import org.apache.hadoop.hive.ql.udf.ptf.MatchPath.MatchPathResolver; import org.apache.hadoop.hive.ql.udf.ptf.Noop.NoopResolver; import org.apache.hadoop.hive.ql.udf.ptf.NoopWithMap.NoopWithMapResolver; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java (revision 1529732) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java (working copy) @@ -26,8 +26,8 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLag; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLead; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLag; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLead; /* * When constructing the Evaluator Tree from an ExprNode Tree Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java (revision 1529732) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLeadLag.java (working copy) @@ -23,7 +23,6 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; @@ -34,8 +33,7 @@ import org.apache.hadoop.io.IntWritable; /** - * abstract class for Lead & lag UDAFs - * GenericUDAFLeadLag. + * abstract class for Lead & lag UDAFs GenericUDAFLeadLag. * */ public abstract class GenericUDAFLeadLag extends AbstractGenericUDAFResolver { @@ -44,32 +42,31 @@ @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo parameters) - throws SemanticException { + throws SemanticException { ObjectInspector[] paramOIs = parameters.getParameterObjectInspectors(); String fNm = functionName(); - if (!(paramOIs.length >= 1 && paramOIs.length <= 3) ) { - throw new UDFArgumentTypeException(paramOIs.length - 1, - "Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)"); + if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) { + throw new UDFArgumentTypeException(paramOIs.length - 1, "Incorrect invocation of " + fNm + + ": _FUNC_(expr, amt, default)"); } int amt = 1; - if ( paramOIs.length > 1 ) { + if (paramOIs.length > 1) { ObjectInspector amtOI = paramOIs[1]; - if ( !ObjectInspectorUtils.isConstantObjectInspector(amtOI) || - (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE) || - ((PrimitiveObjectInspector)amtOI).getPrimitiveCategory() != - PrimitiveObjectInspector.PrimitiveCategory.INT ) - { - throw new UDFArgumentTypeException(0, - fNm + " amount must be a integer value " - + amtOI.getTypeName() + " was passed as parameter 1."); + if (!ObjectInspectorUtils.isConstantObjectInspector(amtOI) + || (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE) + || ((PrimitiveObjectInspector) amtOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) { + throw new UDFArgumentTypeException(1, fNm + " amount must be a integer value " + + amtOI.getTypeName() + " was passed as parameter 1."); } - Object o = ((ConstantObjectInspector)amtOI). - getWritableConstantValue(); - amt = ((IntWritable)o).get(); + Object o = ((ConstantObjectInspector) amtOI).getWritableConstantValue(); + amt = ((IntWritable) o).get(); + if (amt < 0) { + throw new UDFArgumentTypeException(1, fNm + " amount can not be nagative. Specified: " + amt ); + } } if (paramOIs.length == 3) { @@ -85,13 +82,6 @@ protected abstract GenericUDAFLeadLagEvaluator createLLEvaluator(); - static interface LeadLagBuffer extends AggregationBuffer { - void initialize(int leadAmt); - void addRow(Object leadExprValue, Object defaultValue) ; - Object terminate(); - - } - public static abstract class GenericUDAFLeadLagEvaluator extends GenericUDAFEvaluator { private transient ObjectInspector[] inputOI; @@ -100,24 +90,21 @@ private transient Converter defaultValueConverter; @Override - public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException - { + public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); - if (m != Mode.COMPLETE) - { - throw new HiveException( - "Only COMPLETE mode supported for " + fnName + " function"); + if (m != Mode.COMPLETE) { + throw new HiveException("Only COMPLETE mode supported for " + fnName + " function"); } inputOI = parameters; if (parameters.length == 3) { - defaultValueConverter = ObjectInspectorConverters.getConverter(parameters[2], parameters[0]); + defaultValueConverter = ObjectInspectorConverters + .getConverter(parameters[2], parameters[0]); } - return ObjectInspectorFactory.getStandardListObjectInspector( - ObjectInspectorUtils - .getStandardObjectInspector(parameters[0])); + return ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils + .getStandardObjectInspector(parameters[0])); } public int getAmt() { @@ -140,24 +127,22 @@ @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { - LeadLagBuffer lb = getNewLLBuffer(); + LeadLagBuffer lb = getNewLLBuffer(); lb.initialize(amt); return lb; } @Override public void reset(AggregationBuffer agg) throws HiveException { - ((LeadLagBuffer)agg).initialize(amt); + ((LeadLagBuffer) agg).initialize(amt); } @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { - Object rowExprVal = ObjectInspectorUtils.copyToStandardObject(parameters[0], - inputOI[0]); + Object rowExprVal = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]); Object defaultVal = parameters.length > 2 ? ObjectInspectorUtils.copyToStandardObject( - defaultValueConverter.convert(parameters[2]), - inputOI[0]) : null; - ((LeadLagBuffer)agg).addRow(rowExprVal, defaultVal); + defaultValueConverter.convert(parameters[2]), inputOI[0]) : null; + ((LeadLagBuffer) agg).addRow(rowExprVal, defaultVal); } @Override @@ -172,7 +157,7 @@ @Override public Object terminate(AggregationBuffer agg) throws HiveException { - return ((LeadLagBuffer)agg).terminate(); + return ((LeadLagBuffer) agg).terminate(); } } Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLag.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLag.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLag.java (working copy) @@ -0,0 +1,23 @@ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; + +@UDFType(impliesOrder = true) +public class GenericUDFLag extends GenericUDFLeadLag { + @Override + protected String _getFnName() { + return "lag"; + } + + @Override + protected int getIndex(int amt) { + return pItr.getIndex() - 1 - amt; + } + + @Override + protected Object getRow(int amt) throws HiveException { + return pItr.lag(amt + 1); + } + +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLead.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLead.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLead.java (working copy) @@ -0,0 +1,24 @@ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.UDFType; + +@UDFType(impliesOrder = true) +public class GenericUDFLead extends GenericUDFLeadLag { + + @Override + protected String _getFnName() { + return "lead"; + } + + @Override + protected int getIndex(int amt) { + return pItr.getIndex() - 1 + amt; + } + + @Override + protected Object getRow(int amt) throws HiveException { + return pItr.lead(amt - 1); + } + +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java (revision 1529732) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLeadLag.java (working copy) @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; @@ -34,84 +33,73 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.io.IntWritable; -public abstract class GenericUDFLeadLag extends GenericUDF -{ - transient ExprNodeEvaluator exprEvaluator; - transient PTFPartitionIterator pItr; - transient ObjectInspector firstArgOI; - transient ObjectInspector defaultArgOI; - transient Converter defaultValueConverter; - int amt; +public abstract class GenericUDFLeadLag extends GenericUDF { + transient ExprNodeEvaluator exprEvaluator; + transient PTFPartitionIterator pItr; + transient ObjectInspector firstArgOI; + transient ObjectInspector defaultArgOI; + transient Converter defaultValueConverter; + int amt; - static{ - PTFUtils.makeTransient(GenericUDFLeadLag.class, "exprEvaluator", "pItr", - "firstArgOI", "defaultArgOI", "defaultValueConverter"); - } + static { + PTFUtils.makeTransient(GenericUDFLeadLag.class, "exprEvaluator", "pItr", "firstArgOI", + "defaultArgOI", "defaultValueConverter"); + } - @Override - public Object evaluate(DeferredObject[] arguments) throws HiveException - { + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { Object defaultVal = null; - if(arguments.length == 3){ - defaultVal = ObjectInspectorUtils.copyToStandardObject( - defaultValueConverter.convert(arguments[2].get()), - defaultArgOI); + if (arguments.length == 3) { + defaultVal = ObjectInspectorUtils.copyToStandardObject( + defaultValueConverter.convert(arguments[2].get()), defaultArgOI); } - int idx = pItr.getIndex() - 1; - int start = 0; - int end = pItr.getPartition().size(); - try - { - Object ret = null; - int newIdx = getIndex(amt); + int idx = pItr.getIndex() - 1; + int start = 0; + int end = pItr.getPartition().size(); + try { + Object ret = null; + int newIdx = getIndex(amt); - if(newIdx >= end || newIdx < start) { + if (newIdx >= end || newIdx < start) { ret = defaultVal; - } - else { + } else { Object row = getRow(amt); ret = exprEvaluator.evaluate(row); - ret = ObjectInspectorUtils.copyToStandardObject(ret, - firstArgOI, ObjectInspectorCopyOption.WRITABLE); - } - return ret; - } - finally - { - Object currRow = pItr.resetToIndex(idx); - // reevaluate expression on current Row, to trigger the Lazy object - // caches to be reset to the current row. - exprEvaluator.evaluate(currRow); - } + ret = ObjectInspectorUtils.copyToStandardObject(ret, firstArgOI, + ObjectInspectorCopyOption.WRITABLE); + } + return ret; + } finally { + Object currRow = pItr.resetToIndex(idx); + // reevaluate expression on current Row, to trigger the Lazy object + // caches to be reset to the current row. + exprEvaluator.evaluate(currRow); + } - } + } - @Override - public ObjectInspector initialize(ObjectInspector[] arguments) - throws UDFArgumentException - { + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { if (!(arguments.length >= 1 && arguments.length <= 3)) { - throw new UDFArgumentTypeException(arguments.length - 1, - "Incorrect invocation of " + _getFnName() + ": _FUNC_(expr, amt, default)"); + throw new UDFArgumentTypeException(arguments.length - 1, "Incorrect invocation of " + + _getFnName() + ": _FUNC_(expr, amt, default)"); } amt = 1; - if (arguments.length > 1) { ObjectInspector amtOI = arguments[1]; - if ( !ObjectInspectorUtils.isConstantObjectInspector(amtOI) || - (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE) || - ((PrimitiveObjectInspector)amtOI).getPrimitiveCategory() != - PrimitiveObjectInspector.PrimitiveCategory.INT ) - { - throw new UDFArgumentTypeException(0, - _getFnName() + " amount must be a integer value " - + amtOI.getTypeName() + " was passed as parameter 1."); + if (!ObjectInspectorUtils.isConstantObjectInspector(amtOI) + || (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE) + || ((PrimitiveObjectInspector) amtOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) { + throw new UDFArgumentTypeException(1, _getFnName() + " amount must be a integer value " + + amtOI.getTypeName() + " was passed as parameter 1."); } - Object o = ((ConstantObjectInspector)amtOI). - getWritableConstantValue(); - amt = ((IntWritable)o).get(); + Object o = ((ConstantObjectInspector) amtOI).getWritableConstantValue(); + amt = ((IntWritable) o).get(); + if (amt < 0) { + throw new UDFArgumentTypeException(1, " amount can not be nagative. Specified: " + amt); + } } if (arguments.length == 3) { @@ -123,30 +111,26 @@ firstArgOI = arguments[0]; return ObjectInspectorUtils.getStandardObjectInspector(firstArgOI, - ObjectInspectorCopyOption.WRITABLE); - } + ObjectInspectorCopyOption.WRITABLE); + } - public ExprNodeEvaluator getExprEvaluator() - { - return exprEvaluator; - } + public ExprNodeEvaluator getExprEvaluator() { + return exprEvaluator; + } - public void setExprEvaluator(ExprNodeEvaluator exprEvaluator) - { - this.exprEvaluator = exprEvaluator; - } + public void setExprEvaluator(ExprNodeEvaluator exprEvaluator) { + this.exprEvaluator = exprEvaluator; + } - public PTFPartitionIterator getpItr() - { - return pItr; - } + public PTFPartitionIterator getpItr() { + return pItr; + } - public void setpItr(PTFPartitionIterator pItr) - { - this.pItr = pItr; - } + public void setpItr(PTFPartitionIterator pItr) { + this.pItr = pItr; + } - public ObjectInspector getFirstArgOI() { + public ObjectInspector getFirstArgOI() { return firstArgOI; } @@ -179,69 +163,22 @@ } @Override - public String getDisplayString(String[] children) - { - assert (children.length == 2); - StringBuilder sb = new StringBuilder(); - sb.append(_getFnName()); - sb.append("("); - sb.append(children[0]); - sb.append(", "); - sb.append(children[1]); - sb.append(")"); - return sb.toString(); - } + public String getDisplayString(String[] children) { + assert (children.length == 2); + StringBuilder sb = new StringBuilder(); + sb.append(_getFnName()); + sb.append("("); + sb.append(children[0]); + sb.append(", "); + sb.append(children[1]); + sb.append(")"); + return sb.toString(); + } - protected abstract String _getFnName(); + protected abstract String _getFnName(); - protected abstract Object getRow(int amt) throws HiveException; + protected abstract Object getRow(int amt) throws HiveException; - protected abstract int getIndex(int amt); + protected abstract int getIndex(int amt); - @UDFType(impliesOrder = true) - public static class GenericUDFLead extends GenericUDFLeadLag - { - - @Override - protected String _getFnName() - { - return "lead"; - } - - @Override - protected int getIndex(int amt) { - return pItr.getIndex() - 1 + amt; - } - - @Override - protected Object getRow(int amt) throws HiveException - { - return pItr.lead(amt - 1); - } - - } - - @UDFType(impliesOrder = true) - public static class GenericUDFLag extends GenericUDFLeadLag - { - @Override - protected String _getFnName() - { - return "lag"; - } - - @Override - protected int getIndex(int amt) { - return pItr.getIndex() - 1 - amt; - } - - @Override - protected Object getRow(int amt) throws HiveException - { - return pItr.lag(amt + 1); - } - - } - } - Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/LeadLagBuffer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/LeadLagBuffer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/LeadLagBuffer.java (working copy) @@ -0,0 +1,12 @@ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; + +interface LeadLagBuffer extends AggregationBuffer { + void initialize(int leadAmt); + + void addRow(Object leadExprValue, Object defaultValue); + + Object terminate(); + +} \ No newline at end of file Index: ql/src/test/queries/clientnegative/windowing_ll_no_neg.q =================================================================== --- ql/src/test/queries/clientnegative/windowing_ll_no_neg.q (revision 0) +++ ql/src/test/queries/clientnegative/windowing_ll_no_neg.q (working copy) @@ -0,0 +1,26 @@ +DROP TABLE IF EXISTS part; + +-- data setup +CREATE TABLE part( + p_partkey INT, + p_name STRING, + p_mfgr STRING, + p_brand STRING, + p_type STRING, + p_size INT, + p_container STRING, + p_retailprice DOUBLE, + p_comment STRING +); + +LOAD DATA LOCAL INPATH '../data/files/part_tiny.txt' overwrite into table part; + + +select p_mfgr, p_name, p_size, +min(p_retailprice), +rank() over(distribute by p_mfgr sort by p_name)as r, +dense_rank() over(distribute by p_mfgr sort by p_name) as dr, +p_size, p_size - lag(p_size,-1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz +from part +group by p_mfgr, p_name, p_size +; Index: ql/src/test/results/clientnegative/windowing_ll_no_neg.q.out =================================================================== --- ql/src/test/results/clientnegative/windowing_ll_no_neg.q.out (revision 0) +++ ql/src/test/results/clientnegative/windowing_ll_no_neg.q.out (working copy) @@ -0,0 +1,39 @@ +PREHOOK: query: DROP TABLE IF EXISTS part +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS part +POSTHOOK: type: DROPTABLE +PREHOOK: query: -- data setup +CREATE TABLE part( + p_partkey INT, + p_name STRING, + p_mfgr STRING, + p_brand STRING, + p_type STRING, + p_size INT, + p_container STRING, + p_retailprice DOUBLE, + p_comment STRING +) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- data setup +CREATE TABLE part( + p_partkey INT, + p_name STRING, + p_mfgr STRING, + p_brand STRING, + p_type STRING, + p_size INT, + p_container STRING, + p_retailprice DOUBLE, + p_comment STRING +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@part +PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/part_tiny.txt' overwrite into table part +PREHOOK: type: LOAD +PREHOOK: Output: default@part +POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/part_tiny.txt' overwrite into table part +POSTHOOK: type: LOAD +POSTHOOK: Output: default@part +FAILED: SemanticException Failed to breakup Windowing invocations into Groups. At least 1 group must only depend on input columns. Also check for circular dependencies. +Underlying error: org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: Lag amount can not be nagative. Specified: -1