diff --git ql/pom.xml ql/pom.xml index 0ff1cc7..049283f 100644 --- ql/pom.xml +++ ql/pom.xml @@ -401,6 +401,11 @@ + + com.clearspring.analytics + stream + 2.7.0 + @@ -631,6 +636,7 @@ org.codehaus.jackson:jackson-mapper-asl com.google.guava:guava net.sf.opencsv:opencsv + com.clearspring.analytics:stream diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index b94f790..3828726 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -39,7 +39,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -382,6 +381,8 @@ registerGenericUDAF("compute_stats" , new GenericUDAFComputeStats()); + registerGenericUDAF("hll", new GenericUDAFCardinalityApprox()); + registerUDAF("percentile", UDAFPercentile.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCardinalityApprox.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCardinalityApprox.java new file mode 100644 index 0000000..8afadd8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCardinalityApprox.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector; + +import java.io.IOException; + +@Description(name = "hll", + value = + "_FUNC_([float/double relativeSD, ]expr) or _FUNC_([int p, int sp, ]expr) - Returns " + + "approximate cadinality of expression by using HyperLogLogPlus.") +public class GenericUDAFCardinalityApprox extends AbstractGenericUDAFResolver { + + @Override + public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException { + ObjectInspector[] inputs = info.getParameterObjectInspectors(); + if (inputs.length == 0 || inputs.length > 3) { + throw new UDFArgumentLengthException("HLL accepts one to three arguments"); + } + getParams(inputs); + return new HLLEvaluator(); + } + + private static int[] getParams(ObjectInspector[] inputs) throws UDFArgumentException { + int p = 16; + int sp = 0; + if (inputs.length > 1) { + if (inputs[0] instanceof WritableConstantFloatObjectInspector) { + float relativeSD = ((WritableConstantFloatObjectInspector)inputs[0]).getWritableConstantValue().get(); + p = (int) Math.ceil(2.0 * Math.log(1.054 / relativeSD) / Math.log(2)); + } else if (inputs[0] instanceof WritableConstantDoubleObjectInspector) { + double relativeSD = ((WritableConstantDoubleObjectInspector)inputs[0]).getWritableConstantValue().get(); + p = (int) Math.ceil(2.0 * Math.log(1.054 / relativeSD) / Math.log(2)); + } else if (inputs[0] instanceof WritableConstantIntObjectInspector) { + p = ((WritableConstantIntObjectInspector)inputs[0]).getWritableConstantValue().get(); + } else { + throw new UDFArgumentTypeException(0, "first argument should be constant float/double or int"); + } + if (inputs.length > 2) { + if (!(inputs[1] instanceof WritableConstantIntObjectInspector)) { + throw new UDFArgumentTypeException(1, "second argument should be a int value"); + } + sp = ((WritableConstantIntObjectInspector)inputs[0]).getWritableConstantValue().get(); + } + } + if (p < 4 || (p > sp && sp != 0)) { + throw new UDFArgumentException("p must be between 4 and sp (inclusive)"); + } + if (sp > 32) { + throw new UDFArgumentException("sp values greater than 32 not supported"); + } + return new int[] {p, sp}; + } + + public static class HLLEvaluator extends GenericUDAFEvaluator { + + private int[] params; + private ObjectInspector valueOI; + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) + throws HiveException { + super.init(m, parameters); + if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) { + params = getParams(parameters); + } + valueOI = parameters[parameters.length - 1]; + if (mode == Mode.FINAL || mode == Mode.COMPLETE) { + return PrimitiveObjectInspectorFactory.javaLongObjectInspector; + } + return PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector; + } + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + HLLBuffer buffer = new HLLBuffer(); + if (params != null) { + buffer.hll = new HyperLogLogPlus(params[0], params[1]); + } + return buffer; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + if (params != null) { + ((HLLBuffer)agg).hll = new HyperLogLogPlus(params[0], params[1]); + } else { + ((HLLBuffer)agg).hll = null; + } + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { + ((HLLBuffer)agg).hll.offer(ObjectInspectorUtils.copyToStandardJavaObject(parameters[parameters.length - 1], valueOI)); + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + try { + return ((HLLBuffer)agg).hll.getBytes(); + } catch (IOException e) { + throw new HiveException(e); + } + } + + @Override + public void merge(AggregationBuffer agg, Object partial) throws HiveException { + try { + byte[] binary = (byte[]) ObjectInspectorUtils.copyToStandardJavaObject(partial, valueOI); + HyperLogLogPlus hll = HyperLogLogPlus.Builder.build(binary); + if (((HLLBuffer)agg).hll != null) { + ((HLLBuffer)agg).hll.addAll(hll); + } else { + ((HLLBuffer)agg).hll = hll; + } + } catch (Exception e) { + throw new HiveException(e); + } + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + return ((HLLBuffer)agg).hll.cardinality(); + } + } + + private static class HLLBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer { + + private HyperLogLogPlus hll; + + @Override + public int estimate() { + return hll.sizeof(); + } + } +} diff --git ql/src/test/queries/clientpositive/udaf_cardinality_approx.q ql/src/test/queries/clientpositive/udaf_cardinality_approx.q new file mode 100644 index 0000000..5ede65f --- /dev/null +++ ql/src/test/queries/clientpositive/udaf_cardinality_approx.q @@ -0,0 +1,16 @@ +DESCRIBE FUNCTION hll; +DESCRIBE FUNCTION EXTENDED hll; + +set hive.map.aggr = false; + +explain +select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src; + +select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src; + +set hive.map.aggr = true; + +explain +select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src; + +select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src; diff --git ql/src/test/results/clientpositive/udaf_cardinality_approx.q.out ql/src/test/results/clientpositive/udaf_cardinality_approx.q.out new file mode 100644 index 0000000..8e6719b --- /dev/null +++ ql/src/test/results/clientpositive/udaf_cardinality_approx.q.out @@ -0,0 +1,131 @@ +PREHOOK: query: DESCRIBE FUNCTION hll +PREHOOK: type: DESCFUNCTION +POSTHOOK: query: DESCRIBE FUNCTION hll +POSTHOOK: type: DESCFUNCTION +hll([float/double relativeSD, ]expr) or hll([int p, int sp, ]expr) - Returns approximate cadinality of expression by using HyperLogLogPlus. +PREHOOK: query: DESCRIBE FUNCTION EXTENDED hll +PREHOOK: type: DESCFUNCTION +POSTHOOK: query: DESCRIBE FUNCTION EXTENDED hll +POSTHOOK: type: DESCFUNCTION +hll([float/double relativeSD, ]expr) or hll([int p, int sp, ]expr) - Returns approximate cadinality of expression by using HyperLogLogPlus. +PREHOOK: query: explain +select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src +PREHOOK: type: QUERY +POSTHOOK: query: explain +select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: key (type: string) + Reduce Operator Tree: + Group By Operator + aggregations: hll(VALUE._col0), hll(6, VALUE._col0), hll(10, 20, VALUE._col0), hll(0.1, VALUE._col0) + mode: complete + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 32 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: bigint), _col3 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 32 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 32 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +309 302 320 331 +PREHOOK: query: explain +select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src +PREHOOK: type: QUERY +POSTHOOK: query: explain +select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: hll(key), hll(6, key), hll(10, 20, key), hll(0.1, key) + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: binary), _col1 (type: binary), _col2 (type: binary), _col3 (type: binary) + Reduce Operator Tree: + Group By Operator + aggregations: hll(VALUE._col0), hll(VALUE._col1), hll(VALUE._col2), hll(VALUE._col3) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: bigint), _col3 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select hll(key), hll(6,key), hll(10, 20, key), hll(0.1, key) from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +309 302 320 331