Index: contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/GenericUDFBloomFilter.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/GenericUDFBloomFilter.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/GenericUDFBloomFilter.java (revision 0) @@ -0,0 +1,102 @@ +package org.apache.hadoop.hive.contrib.genericudf; + +import java.io.RandomAccessFile; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.hive.contrib.genericudaf.BuildBloomBase; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.util.bloom.Key; + +public class GenericUDFBloomFilter extends GenericUDF { + + //private ObjectInspectorConverters.Converter[] converters; + + String tblName; + private PrimitiveObjectInspector hashTypeOI; + private PrimitiveObjectInspector numElementsOI; + private PrimitiveObjectInspector desiredFalsePositiveOI; + private PrimitiveObjectInspector tableOI; + private PrimitiveObjectInspector inputOI; + + private BuildBloomBase bloom; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentTypeException { + + hashTypeOI = (PrimitiveObjectInspector) arguments[0]; + numElementsOI = (PrimitiveObjectInspector) arguments[1]; + desiredFalsePositiveOI = (PrimitiveObjectInspector) arguments[2]; + tableOI = (PrimitiveObjectInspector) arguments[3]; + inputOI = (PrimitiveObjectInspector) arguments[4]; + + bloom = new BuildBloomBase(); + return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + + if(!bloom.isReady()) + { + try{ + String hashType = PrimitiveObjectInspectorUtils.getString(arguments[0].get(),hashTypeOI); + String numElements = PrimitiveObjectInspectorUtils.getString(arguments[1].get(),numElementsOI); + String desiredFalsePositive = PrimitiveObjectInspectorUtils.getString(arguments[2].get(),desiredFalsePositiveOI); + tblName = PrimitiveObjectInspectorUtils.getString(arguments[3].get(),tableOI); + + RandomAccessFile f = new RandomAccessFile(tblName, "r"); + byte[] b = new byte[(int)f.length()]; + f.read(b); + byte[] c = Base64.decodeBase64(b); + f.close(); + + + bloom.setProperties(hashType, numElements, desiredFalsePositive); + bloom.bloomOr(c); + }catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + } + } + + String concact = ""; + + for(int i=0;i= 1.0) { + throw new RuntimeException("Number of elements must be greater " + + "than zero and desiredFalsePositive must be between 0 " + + " and 1."); + } + vSize = (int)(-1 * (num * Math.log(fp)) / Math.pow(Math.log(2), 2)); + + numHash = (int)(0.7 * vSize / num); + hType = convertHashType(hashType); + filter = new BloomFilter(vSize, numHash, hType); + } + + public boolean isReady() { + return numHash != 0; + } + + public void reset() { + vSize=0; + numHash=0; + filter = null; + } + + private int convertHashType(String hashType) { + if (hashType.toLowerCase().contains("jenkins")) { + return Hash.JENKINS_HASH; + } else if (hashType.toLowerCase().contains("murmur")) { + return Hash.MURMUR_HASH; + } else { + throw new RuntimeException("Unknown hash type " + hashType + + ". Valid values are jenkins and murmur."); + } + } + + public BytesWritable bloomOut() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(vSize / 8); + DataOutputStream dos = new DataOutputStream(baos); + filter.write(dos); + + BytesWritable bi = new BytesWritable(baos.toByteArray()); + + return bi; + + } + + public void bloomOr(byte[] b) throws IOException { + DataInputStream dis = new DataInputStream(new + ByteArrayInputStream(b)); + BloomFilter filterNew123 = new BloomFilter(); + BloomFilter filterNew = new BloomFilter(); + filterNew.readFields(dis); + if(filter == null) { + filter = filterNew; + } else { + filter.or(filterNew); + } + } + +} Index: contrib/src/java/org/apache/hadoop/hive/contrib/genericudaf/GenericUDAFBuildBloom.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/genericudaf/GenericUDAFBuildBloom.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/genericudaf/GenericUDAFBuildBloom.java (revision 0) @@ -0,0 +1,175 @@ +package org.apache.hadoop.hive.contrib.genericudaf; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.util.bloom.Key; + +public class GenericUDAFBuildBloom extends AbstractGenericUDAFResolver{ + + + @Override + public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { + + if (parameters.length < 4) { + throw new UDFArgumentTypeException(parameters.length - 1, + "Please specify at least 4 arguments"); + } + + for(int i=0;i<3;i++) { + if (parameters[i].getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(i, + "Only primitive type arguments are accepted but " + + parameters[i].getTypeName() + " was passed as parameter "+ (i+1) + "."); + } + } + + if(((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory() != PrimitiveCategory.STRING) { + throw new UDFArgumentTypeException(0, + "Only string type arguments are accepted but" + + parameters[0].getTypeName() + "was passed as parameter 1."); + } + + if(((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() != PrimitiveCategory.STRING) { + throw new UDFArgumentTypeException(1, + "Only string type arguments are accepted but" + + parameters[1].getTypeName() + "was passed as parameter 2."); + } + + if(((PrimitiveTypeInfo) parameters[2]).getPrimitiveCategory() != PrimitiveCategory.STRING) { + throw new UDFArgumentTypeException(2, + "Only string type arguments are accepted but" + + parameters[2].getTypeName() + "was passed as parameter 2."); + } + + return new GenericUDAFBloomBitsEvaluator(); + } + + public static class GenericUDAFBloomBitsEvaluator extends GenericUDAFEvaluator { + + // For PARTIAL1 and COMPLETE: ObjectInspectors for original data + private PrimitiveObjectInspector hashTypeOI; + private PrimitiveObjectInspector numElementsOI; + private PrimitiveObjectInspector desiredFalsePositiveOI; + private PrimitiveObjectInspector inputOI; + + // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles) + private WritableBinaryObjectInspector binaryOI; + + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { + super.init(m, parameters); + + if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { + assert(parameters.length == 2); + hashTypeOI = (PrimitiveObjectInspector) parameters[0]; + numElementsOI = (PrimitiveObjectInspector) parameters[1]; + desiredFalsePositiveOI = (PrimitiveObjectInspector) parameters[2]; + inputOI = (PrimitiveObjectInspector) parameters[3]; + } else { + binaryOI = (WritableBinaryObjectInspector) parameters[0]; + } + if(m == Mode.PARTIAL1 || m == Mode.PARTIAL2) { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } else { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } + + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + StdAgg myagg = (StdAgg) agg; + + try { + return myagg.bloom.bloomOut(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + BytesWritable bw = (BytesWritable) terminatePartial(agg); + return bw; + } + + @Override + public void merge(AggregationBuffer agg, Object partial) throws HiveException { + StdAgg myagg = (StdAgg) agg; + + byte[] b = binaryOI.getPrimitiveJavaObject(partial).getData(); + + try { + myagg.bloom.bloomOr(b); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { + assert (parameters.length > 3); + if(parameters[0] == null || parameters[1] == null || parameters[2] == null) { + return; + } + + StdAgg myagg = (StdAgg) agg; + + if(!myagg.bloom.isReady()) + { + String hashType = PrimitiveObjectInspectorUtils.getString(parameters[0], hashTypeOI); + String numElements = PrimitiveObjectInspectorUtils.getString(parameters[1], numElementsOI); + String desiredFalsePositive = PrimitiveObjectInspectorUtils.getString(parameters[2], desiredFalsePositiveOI); + + myagg.bloom.setProperties(hashType, numElements, desiredFalsePositive); + } + + String concact = ""; + + for(int i=0;i