Uploaded image for project: 'Hive'
  1. Hive
  2. HIVE-21191

I want to extends lag/lead functions to Implementing some special functions, And I met some problems




      i want a distinctLag functions ,The function is like lag, but the difference is to select different values in front of it.
      select * from active

      session sq channel
      1 1 A
      1 2 B
      1 3 B
      1 4 C
      1 5 B
      2 1 C
      2 2 B
      2 3 B
      2 4 A
      2 5 B

      select session,sq,lag(channel)over(partition by session order by sq) from active

      session sq channel
      1 1 null
      1 2 A
      1 3 B
      1 4 B
      1 5 C
      2 1 null
      2 2 C
      2 3 B
      2 4 B
      2 5 A

      The function I want is:
      select session,sq,distinctLag(channel)over(partition by session order by sq) from active

      session sq channel
      1 1 null
      1 2 A
      1 3 A
      1 4 B
      1 5 C
      2 1 null
      2 2 C
      2 3 C
      2 4 B
      2 5 A


      i try to extend GenericUDFLeadLag and Override:

      import org.apache.hadoop.hive.ql.exec.Description;
      import org.apache.hadoop.hive.ql.metadata.HiveException;
      import org.apache.hadoop.hive.ql.udf.UDFType;
      import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
      	    name = "distinctLag",
      	    value = "distinctLag  (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); "
      	        + "The distinctLag function is used to access data from a distinct previous row.",
      	    extended = "Example:\n "
      	    + "select p1.p_mfgr, p1.p_name, p1.p_size,\n"
      	    + " p1.p_size - distinctLag(p1.p_size,1,p1.p_size) over( distribute by p1.p_mfgr sort by p1.p_name) as deltaSz\n"
      	    + " from part p1 join part p2 on p1.p_partkey = p2.p_partkey")
      @UDFType(impliesOrder = true)
      public class GenericUDFDistinctLag extends GenericUDFLeadLag {
      	public Object evaluate(DeferredObject[] arguments) throws HiveException {
      		Object defaultVal = null;
      		if (arguments.length == 3) {
      			defaultVal = ObjectInspectorUtils.copyToStandardObject(getDefaultValueConverter().convert(arguments[2].get()), getDefaultArgOI());
      		int idx = getpItr().getIndex() - 1;
      		int start = 0;
      		int end = getpItr().getPartition().size();
      		try {
      			Object currValue = ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().resetToIndex(idx)), getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
      			Object ret = null;
      			int newIdx = idx;
      			do {
      				if (newIdx >= end || newIdx < start) {
      					ret = defaultVal;
      					return ret;
      					ret = ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().lag(1)), getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
      						setAmt(getAmt() - 1);
      			} while (getAmt() > 0);
      			return ret;
      		} finally {
      			Object currRow = getpItr().resetToIndex(idx);
      			// reevaluate expression on current Row, to trigger the Lazy object
      			// caches to be reset to the current row.
      	protected  String _getFnName(){
      		 return "distinctLag";
      	protected Object getRow(int amt) throws HiveException {
      		throw new HiveException("distinctLag error: cannot call getRow");
      	protected int getIndex(int amt) {
      		// TODO Auto-generated method stub
      		return 0;

      and package as a jar,add into hive,create a  temporary function.

      then,i run:

      select session,sq,distinctLag(channel)over(partition by session order by sq) from active;

      It reported an error:

      FAILED: SemanticException Failed to breakup Windowing invocations into Groups. At least 1 group must only depend on input columns. Also check for circular dependencies.
      Underlying error: Invalid function distinctLag

      I don't know exactly what the problem is. I hope someone can give me a hint. Thank you.

      then,I noticed that there have a UDAF function GenericUDAFLag.I tried to imitate it.

      import java.util.ArrayList;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.apache.hadoop.hive.ql.exec.Description;
      import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
      import org.apache.hadoop.hive.ql.metadata.HiveException;
      import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
      import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
      import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
      @WindowFunctionDescription(description = @Description(name = "lag", value = "_FUNC_(expr, amt, default)"), supportsWindow = false, pivotResult = true, impliesOrder = true)
      public class GenericUDAFDistinctLag extends GenericUDAFDistinctLeadLag {
      	static final Log LOG = LogFactory.getLog(GenericUDAFDistinctLag.class.getName());
      	protected String functionName() {
      		return "Lag";
      	protected GenericUDAFDistinctLeadLagEvaluator createLLEvaluator() {
      		return new GenericUDAFDistinctLagEvaluator();
      	public static class GenericUDAFDistinctLagEvaluator extends GenericUDAFDistinctLeadLagEvaluator {
      		public GenericUDAFDistinctLagEvaluator() {
      		 * used to initialize Streaming Evaluator.
      		protected GenericUDAFDistinctLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
      		protected DistinctLeadLagBuffer getNewLLBuffer() throws HiveException {
      			return new DistinctLagBuffer();
      		public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef wFrmDef) {
      			return new GenericUDAFDistinctLagEvaluatorStreaming(this);
      	static class DistinctLagBuffer implements DistinctLeadLagBuffer {
      		ArrayList<Object> values;
      		int lagAmt;
      		ArrayList<Object> lagValues;
      		public void initialize(int lagAmt) {
      			this.lagAmt = lagAmt;
      			lagValues = new ArrayList<Object>();
      			values = new ArrayList<Object>();
      		public void addRow(Object currValue, Object defaultValue) {
      			int i = values.size() - 1;
      			int noEquals = 0;
      			for (; i >= 0; i--) {
      				if (!currValue.equals(values.get(i))) {
      					if (++noEquals == lagAmt) {
      			lagValues.add(i == -1 ? defaultValue : values.get(i));
      		public Object terminate() {
      			return lagValues;
      	 * StreamingEval: wrap regular eval. on getNext remove first row from values
      	 * and return it.
      	static class GenericUDAFDistinctLagEvaluatorStreaming extends GenericUDAFDistinctLagEvaluator implements ISupportStreamingModeForWindowing {
      		protected GenericUDAFDistinctLagEvaluatorStreaming(GenericUDAFDistinctLeadLagEvaluator src) {
      		public Object getNextResult(AggregationBuffer agg) throws HiveException {
      			DistinctLagBuffer lb = (DistinctLagBuffer) agg;
      			if (!lb.lagValues.isEmpty()) {
      				Object res = lb.lagValues.remove(0);
      				if (res == null) {
      					return ISupportStreamingModeForWindowing.NULL_RESULT;
      				return res;
      			} else if (!lb.values.isEmpty()) {
      				Object res = lb.values.remove(0);
      				if (res == null) {
      					return ISupportStreamingModeForWindowing.NULL_RESULT;
      				return res;
      			return null;
      		public int getRowsRemainingAfterTerminate() throws HiveException {
      			return getAmt();
      import java.lang.reflect.Field;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
      import org.apache.hadoop.hive.ql.metadata.HiveException;
      import org.apache.hadoop.hive.ql.parse.SemanticException;
      import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
      import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
      import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead;
      import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
      import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
      import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
      import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
      import org.apache.hadoop.io.IntWritable;
      public abstract class GenericUDAFDistinctLeadLag extends AbstractGenericUDAFResolver {
      	static final Log LOG = LogFactory.getLog(GenericUDAFLead.class.getName());
      	public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo parameters) throws SemanticException {
      		ObjectInspector[] paramOIs = parameters.getParameterObjectInspectors();
      		String fNm = functionName();
      		if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) {
      			throw new UDFArgumentTypeException(paramOIs.length - 1, "Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)");
      		int amt = 1;
      		if (paramOIs.length > 1) {
      			ObjectInspector amtOI = paramOIs[1];
      			if (!ObjectInspectorUtils.isConstantObjectInspector(amtOI) || (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE)
      					|| ((PrimitiveObjectInspector) amtOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) {
      				throw new UDFArgumentTypeException(1, fNm + " amount must be a integer value " + amtOI.getTypeName() + " was passed as parameter 1.");
      			Object o = ((ConstantObjectInspector) amtOI).getWritableConstantValue();
      			amt = ((IntWritable) o).get();
      			if (amt < 0) {
      				throw new UDFArgumentTypeException(1, fNm + " amount can not be nagative. Specified: " + amt);
      		if (paramOIs.length == 3) {
      			ObjectInspectorConverters.getConverter(paramOIs[2], paramOIs[0]);
      		GenericUDAFDistinctLeadLagEvaluator eval = createLLEvaluator();
      		return eval;
      	protected abstract String functionName();
      	protected abstract GenericUDAFDistinctLeadLagEvaluator createLLEvaluator();
      	public static abstract class GenericUDAFDistinctLeadLagEvaluator extends GenericUDAFEvaluator {
      		private transient ObjectInspector[] inputOI;
      		private int amt;
      		String fnName;
      		private transient Converter defaultValueConverter;
      		public GenericUDAFDistinctLeadLagEvaluator() {
      		 * used to initialize Streaming Evaluator.
      		protected GenericUDAFDistinctLeadLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
      			this.inputOI = src.inputOI;
      			this.amt = src.amt;
      			this.fnName = src.fnName;
      			this.defaultValueConverter = src.defaultValueConverter;
      			try {
      				Field mode = GenericUDAFEvaluator.class.getDeclaredField("mode");
      				mode.set(this, mode.get(src));
      			} catch (IllegalArgumentException e) {
      				// TODO Auto-generated catch block
      			} catch (IllegalAccessException e) {
      				// TODO Auto-generated catch block
      			} catch (NoSuchFieldException e) {
      				// TODO Auto-generated catch block
      			} catch (SecurityException e) {
      				// TODO Auto-generated catch block
      		public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
      			super.init(m, parameters);
      			if (m != Mode.COMPLETE) {
      				throw new HiveException("Only COMPLETE mode supported for " + fnName + " function");
      			inputOI = parameters;
      			if (parameters.length == 3) {
      				defaultValueConverter = ObjectInspectorConverters.getConverter(parameters[2], parameters[0]);
      			return ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(parameters[0]));
      		public int getAmt() {
      			return amt;
      		public void setAmt(int amt) {
      			this.amt = amt;
      		public String getFnName() {
      			return fnName;
      		public void setFnName(String fnName) {
      			this.fnName = fnName;
      		protected abstract DistinctLeadLagBuffer getNewLLBuffer() throws HiveException;
      		public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      			DistinctLeadLagBuffer lb = getNewLLBuffer();
      			return lb;
      		public void reset(AggregationBuffer agg) throws HiveException {
      			((DistinctLeadLagBuffer) agg).initialize(amt);
      		public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
      			Object rowExprVal = ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]);
      			Object defaultVal = parameters.length > 2 ? ObjectInspectorUtils.copyToStandardObject(defaultValueConverter.convert(parameters[2]), inputOI[0]) : null;
      			((DistinctLeadLagBuffer) agg).addRow(rowExprVal, defaultVal);
      		public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      			throw new HiveException("terminatePartial not supported");
      		public void merge(AggregationBuffer agg, Object partial) throws HiveException {
      			throw new HiveException("merge not supported");
      		public Object terminate(AggregationBuffer agg) throws HiveException {
      			return ((DistinctLeadLagBuffer) agg).terminate();
      import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
      interface DistinctLeadLagBuffer extends AggregationBuffer {
        void initialize(int leadAmt);
        void addRow(Object leadExprValue, Object defaultValue);
        Object terminate();

      and package as a jar,add into hive,create a temporary function.in hige hive vesion,it works, but in hive1.1.0 version ,It reported an error (and in hive1.1.0 if i create that temporary function named lag or lead,it also works as what i want ,but it will cover hive's built-in function lag/lead even if deleted  that temporary function,Only when I quit hive-cli and reenter hive-cli , built-in function lag/lead can work):

      hive> SELECT session,sq,distinctLag(channel)over(PARTITION BY session ORDER BY sq) FROM elephant_active;
      Query ID = root_20190131195959_8047b4ba-a85c-4f39-8a27-989388316c50
      Total jobs = 1
      Launching Job 1 out of 1
      Number of reduce tasks not specified. Estimated from input data size: 1
      In order to change the average load for a reducer (in bytes):
        set hive.exec.reducers.bytes.per.reducer=<number>
      In order to limit the maximum number of reducers:
        set hive.exec.reducers.max=<number>
      In order to set a constant number of reducers:
        set mapreduce.job.reduces=<number>
      Starting Job = job_1546504603780_0492, Tracking URL = http://dce.bi.com:8088/proxy/application_1546504603780_0492/
      Kill Command = /opt/cloudera/parcels/CDH-5.10.2-1.cdh5.10.2.p0.5/lib/hadoop/bin/hadoop job  -kill job_1546504603780_0492
      Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
      2019-01-31 20:00:03,639 Stage-1 map = 0%,  reduce = 0%
      2019-01-31 20:00:09,972 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.38 sec
      2019-01-31 20:00:30,795 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 1.38 sec
      MapReduce Total cumulative CPU time: 1 seconds 380 msec
      Ended Job = job_1546504603780_0492 with errors
      Error during job, obtaining debugging information...
      Examining task ID: task_1546504603780_0492_m_000000 (and more) from job job_1546504603780_0492
      Task with the most failures(4): 
      Task ID:
      Diagnostic Messages for this Task:
      Error: java.lang.RuntimeException: Error in configuring object
      	at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
      	at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
      	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
      	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:409)
      	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
      	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
      	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
      Caused by: java.lang.reflect.InvocationTargetException
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
      	... 9 more
      Caused by: java.lang.RuntimeException: Reduce operator initialization failed
      	at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166)
      	... 14 more
      Caused by: java.lang.NullPointerException
      	at org.apache.hadoop.hive.ql.exec.Registry.getFunctionInfo(Registry.java:306)
      	at org.apache.hadoop.hive.ql.exec.Registry.getWindowFunctionInfo(Registry.java:314)
      	at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getWindowFunctionInfo(FunctionRegistry.java:504)
      	at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.streamingPossible(WindowingTableFunction.java:151)
      	at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.setCanAcceptInputAsStream(WindowingTableFunction.java:222)
      	at org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.initializeStreaming(WindowingTableFunction.java:256)
      	at org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.initializeStreaming(PTFOperator.java:291)
      	at org.apache.hadoop.hive.ql.exec.PTFOperator.initializeOp(PTFOperator.java:86)
      	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
      	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
      	at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
      	at org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
      	at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
      	at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:159)
      	... 14 more
      FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
      MapReduce Jobs Launched: 
      Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 1.38 sec   HDFS Read: 2958 HDFS Write: 0 FAIL
      Total MapReduce CPU Time Spent: 1 seconds 380 msec

      I guess it's FunctionRegistry problem.
      I am a beginner. I hope someone can tell me the correct way to realize this special function. Thank you very much. I use Hive 1.1.0 + cdh5.10.2 + 945.





            Unassigned Unassigned
            way one
            0 Vote for this issue
            2 Start watching this issue

