Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 1523187) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -382,6 +382,7 @@ registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric()); registerGenericUDAF("percentile_approx", new GenericUDAFPercentileApprox()); registerGenericUDAF("collect_set", new GenericUDAFCollectSet()); + registerGenericUDAF("collect_list", new GenericUDAFCollectList()); registerGenericUDAF("ngrams", new GenericUDAFnGrams()); registerGenericUDAF("context_ngrams", new GenericUDAFContextNGrams()); Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectList.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectList.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectList.java (working copy) @@ -0,0 +1,34 @@ +package org.apache.hadoop.hive.ql.udf.generic; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator.BufferType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +@Description(name = "collect_list", value = "_FUNC_(x) - Returns a list of objects with duplicates") +public class GenericUDAFCollectList extends AbstractGenericUDAFResolver { + + static final Log LOG = LogFactory.getLog(GenericUDAFCollectList.class.getName()); + + public GenericUDAFCollectList() { + } + + @Override + public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) + throws SemanticException { + if (parameters.length != 1) { + throw new UDFArgumentTypeException(parameters.length - 1, + "Exactly one argument is expected."); + } + if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(0, + "Only primitive type arguments are accepted but " + + parameters[0].getTypeName() + " was passed as parameter 1."); + } + return new GenericUDAFMkCollectionEvaluator(BufferType.LIST); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java (revision 1523187) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCollectSet.java (working copy) @@ -17,21 +17,13 @@ */ package org.apache.hadoop.hive.ql.udf.generic; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator.BufferType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** @@ -41,126 +33,23 @@ public class GenericUDAFCollectSet extends AbstractGenericUDAFResolver { static final Log LOG = LogFactory.getLog(GenericUDAFCollectSet.class.getName()); - + public GenericUDAFCollectSet() { } @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { - if (parameters.length != 1) { throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected."); } - if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + parameters[0].getTypeName() + " was passed as parameter 1."); } - - return new GenericUDAFMkSetEvaluator(); + return new GenericUDAFMkCollectionEvaluator(BufferType.SET); } - public static class GenericUDAFMkSetEvaluator extends GenericUDAFEvaluator { - - // For PARTIAL1 and COMPLETE: ObjectInspectors for original data - private PrimitiveObjectInspector inputOI; - // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list - // of objs) - private transient StandardListObjectInspector loi; - - private transient StandardListObjectInspector internalMergeOI; - - @Override - public ObjectInspector init(Mode m, ObjectInspector[] parameters) - throws HiveException { - super.init(m, parameters); - // init output object inspectors - // The output of a partial aggregation is a list - if (m == Mode.PARTIAL1) { - inputOI = (PrimitiveObjectInspector) parameters[0]; - return ObjectInspectorFactory - .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils - .getStandardObjectInspector(inputOI)); - } else { - if (!(parameters[0] instanceof StandardListObjectInspector)) { - //no map aggregation. - inputOI = (PrimitiveObjectInspector) ObjectInspectorUtils - .getStandardObjectInspector(parameters[0]); - return (StandardListObjectInspector) ObjectInspectorFactory - .getStandardListObjectInspector(inputOI); - } else { - internalMergeOI = (StandardListObjectInspector) parameters[0]; - inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector(); - loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI); - return loi; - } - } - } - - static class MkArrayAggregationBuffer extends AbstractAggregationBuffer { - Set container; - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - ((MkArrayAggregationBuffer) agg).container = new HashSet(); - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - MkArrayAggregationBuffer ret = new MkArrayAggregationBuffer(); - reset(ret); - return ret; - } - - //mapside - @Override - public void iterate(AggregationBuffer agg, Object[] parameters) - throws HiveException { - assert (parameters.length == 1); - Object p = parameters[0]; - - if (p != null) { - MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg; - putIntoSet(p, myagg); - } - } - - //mapside - @Override - public Object terminatePartial(AggregationBuffer agg) throws HiveException { - MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg; - ArrayList ret = new ArrayList(myagg.container.size()); - ret.addAll(myagg.container); - return ret; - } - - @Override - public void merge(AggregationBuffer agg, Object partial) - throws HiveException { - MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg; - ArrayList partialResult = (ArrayList) internalMergeOI.getList(partial); - for(Object i : partialResult) { - putIntoSet(i, myagg); - } - } - - @Override - public Object terminate(AggregationBuffer agg) throws HiveException { - MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg; - ArrayList ret = new ArrayList(myagg.container.size()); - ret.addAll(myagg.container); - return ret; - } - - private void putIntoSet(Object p, MkArrayAggregationBuffer myagg) { - Object pCopy = ObjectInspectorUtils.copyToStandardObject(p, - this.inputOI); - myagg.container.add(pCopy); - } - } - } Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMkCollectionEvaluator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMkCollectionEvaluator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFMkCollectionEvaluator.java (working copy) @@ -0,0 +1,147 @@ +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; + +import com.esotericsoftware.minlog.Log; + +public class GenericUDAFMkCollectionEvaluator extends GenericUDAFEvaluator { + + enum BufferType { SET, LIST } + + // For PARTIAL1 and COMPLETE: ObjectInspectors for original data + private PrimitiveObjectInspector inputOI; + // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list + // of objs) + private transient StandardListObjectInspector loi; + + private transient StandardListObjectInspector internalMergeOI; + + private BufferType bufferType; + + //needed by kyro + public GenericUDAFMkCollectionEvaluator(){ + + } + + public GenericUDAFMkCollectionEvaluator(BufferType bufferType){ + this.bufferType = bufferType; + } + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) + throws HiveException { + super.init(m, parameters); + // init output object inspectors + // The output of a partial aggregation is a list + if (m == Mode.PARTIAL1) { + inputOI = (PrimitiveObjectInspector) parameters[0]; + return ObjectInspectorFactory + .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils + .getStandardObjectInspector(inputOI)); + } else { + if (!(parameters[0] instanceof StandardListObjectInspector)) { + //no map aggregation. + inputOI = (PrimitiveObjectInspector) ObjectInspectorUtils + .getStandardObjectInspector(parameters[0]); + return (StandardListObjectInspector) ObjectInspectorFactory + .getStandardListObjectInspector(inputOI); + } else { + internalMergeOI = (StandardListObjectInspector) parameters[0]; + inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector(); + loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI); + return loi; + } + } + } + + + class MkArrayAggregationBuffer extends AbstractAggregationBuffer { + + private Collection container; + + public MkArrayAggregationBuffer() { + if (bufferType == BufferType.LIST){ + container = new ArrayList(); + } else if(bufferType == BufferType.SET){ + container = new HashSet(); + } else { + Log.error("buffer type was null"); + } + } + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + ((MkArrayAggregationBuffer) agg).container.clear(); + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + MkArrayAggregationBuffer ret = new MkArrayAggregationBuffer(); + return ret; + } + + //mapside + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) + throws HiveException { + assert (parameters.length == 1); + Object p = parameters[0]; + + if (p != null) { + MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg; + putIntoCollection(p, myagg); + } + } + + //mapside + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg; + List ret = new ArrayList(myagg.container.size()); + ret.addAll(myagg.container); + return ret; + } + + @Override + public void merge(AggregationBuffer agg, Object partial) + throws HiveException { + MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg; + List partialResult = (ArrayList) internalMergeOI.getList(partial); + for(Object i : partialResult) { + putIntoCollection(i, myagg); + } + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg; + List ret = new ArrayList(myagg.container.size()); + ret.addAll(myagg.container); + return ret; + } + + private void putIntoCollection(Object p, MkArrayAggregationBuffer myagg) { + Object pCopy = ObjectInspectorUtils.copyToStandardObject(p, this.inputOI); + myagg.container.add(pCopy); + } + + public BufferType getBufferType() { + return bufferType; + } + + public void setBufferType(BufferType bufferType) { + this.bufferType = bufferType; + } + +} \ No newline at end of file Index: ql/src/test/queries/clientpositive/udaf_collect_set.q =================================================================== --- ql/src/test/queries/clientpositive/udaf_collect_set.q (revision 1523187) +++ ql/src/test/queries/clientpositive/udaf_collect_set.q (working copy) @@ -1,6 +1,9 @@ DESCRIBE FUNCTION collect_set; DESCRIBE FUNCTION EXTENDED collect_set; +DESCRIBE FUNCTION collect_list; +DESCRIBE FUNCTION EXTENDED collect_list; + set hive.map.aggr = false; set hive.groupby.skewindata = false; @@ -8,6 +11,10 @@ FROM src GROUP BY key ORDER BY key limit 20; +SELECT key, collect_list(value) +FROM src +GROUP BY key ORDER by key limit 20; + set hive.map.aggr = true; set hive.groupby.skewindata = false; @@ -15,6 +22,10 @@ FROM src GROUP BY key ORDER BY key limit 20; +SELECT key, collect_list(value) +FROM src +GROUP BY key ORDER BY key limit 20; + set hive.map.aggr = false; set hive.groupby.skewindata = true; Index: ql/src/test/results/clientpositive/show_functions.q.out =================================================================== --- ql/src/test/results/clientpositive/show_functions.q.out (revision 1523187) +++ ql/src/test/results/clientpositive/show_functions.q.out (working copy) @@ -36,6 +36,7 @@ ceil ceiling coalesce +collect_list collect_set compute_stats concat @@ -202,6 +203,7 @@ ceil ceiling coalesce +collect_list collect_set compute_stats concat Index: ql/src/test/results/clientpositive/udaf_collect_set.q.out =================================================================== --- ql/src/test/results/clientpositive/udaf_collect_set.q.out (revision 1523187) +++ ql/src/test/results/clientpositive/udaf_collect_set.q.out (working copy) @@ -8,6 +8,16 @@ POSTHOOK: query: DESCRIBE FUNCTION EXTENDED collect_set POSTHOOK: type: DESCFUNCTION collect_set(x) - Returns a set of objects with duplicate elements eliminated +PREHOOK: query: DESCRIBE FUNCTION collect_list +PREHOOK: type: DESCFUNCTION +POSTHOOK: query: DESCRIBE FUNCTION collect_list +POSTHOOK: type: DESCFUNCTION +collect_list(x) - Returns a list of objects with duplicates +PREHOOK: query: DESCRIBE FUNCTION EXTENDED collect_list +PREHOOK: type: DESCFUNCTION +POSTHOOK: query: DESCRIBE FUNCTION EXTENDED collect_list +POSTHOOK: type: DESCFUNCTION +collect_list(x) - Returns a list of objects with duplicates PREHOOK: query: SELECT key, collect_set(value) FROM src GROUP BY key ORDER BY key limit 20 @@ -40,6 +50,38 @@ 128 ["val_128"] 129 ["val_129"] 131 ["val_131"] +PREHOOK: query: SELECT key, collect_list(value) +FROM src +GROUP BY key ORDER by key limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, collect_list(value) +FROM src +GROUP BY key ORDER by key limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 ["val_0","val_0","val_0"] +10 ["val_10"] +100 ["val_100","val_100"] +103 ["val_103","val_103"] +104 ["val_104","val_104"] +105 ["val_105"] +11 ["val_11"] +111 ["val_111"] +113 ["val_113","val_113"] +114 ["val_114"] +116 ["val_116"] +118 ["val_118","val_118"] +119 ["val_119","val_119","val_119"] +12 ["val_12","val_12"] +120 ["val_120","val_120"] +125 ["val_125","val_125"] +126 ["val_126"] +128 ["val_128","val_128","val_128"] +129 ["val_129","val_129"] +131 ["val_131"] PREHOOK: query: SELECT key, collect_set(value) FROM src GROUP BY key ORDER BY key limit 20 @@ -72,6 +114,38 @@ 128 ["val_128"] 129 ["val_129"] 131 ["val_131"] +PREHOOK: query: SELECT key, collect_list(value) +FROM src +GROUP BY key ORDER BY key limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, collect_list(value) +FROM src +GROUP BY key ORDER BY key limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 ["val_0","val_0","val_0"] +10 ["val_10"] +100 ["val_100","val_100"] +103 ["val_103","val_103"] +104 ["val_104","val_104"] +105 ["val_105"] +11 ["val_11"] +111 ["val_111"] +113 ["val_113","val_113"] +114 ["val_114"] +116 ["val_116"] +118 ["val_118","val_118"] +119 ["val_119","val_119","val_119"] +12 ["val_12","val_12"] +120 ["val_120","val_120"] +125 ["val_125","val_125"] +126 ["val_126"] +128 ["val_128","val_128","val_128"] +129 ["val_129","val_129"] +131 ["val_131"] PREHOOK: query: SELECT key, collect_set(value) FROM src GROUP BY key ORDER BY key limit 20