diff --git .gitignore .gitignore index c5decaf..5b70585 100644 --- .gitignore +++ .gitignore @@ -27,3 +27,5 @@ hcatalog/webhcat/java-client/target hcatalog/storage-handlers/hbase/target hcatalog/webhcat/svr/target conf/hive-default.xml.template +*.patch +*.orig diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 10ad3ea..3196e23 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -905,6 +905,12 @@ "Define the compression strategy to use while writing data. \n" + "This changes the compression level of higher level compression codec (like ZLIB)."), + // ORC Bloom Filter configs + HIVE_ORC_BLOOM_FILTER_COLUMNS("hive.exec.orc.bloom.filter.columns", "", + "Specify the column names for which bloom filter is to be created."), + HIVE_ORC_BLOOM_FILTER_FPP("hive.exec.orc.bloom.filter.fpp", 0.05f, + "Specify the false positive percentage for bloom filter. (default 5%)"), + HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false, "If turned on splits generated by orc will include metadata about the stripes in the file. This\n" + "data is read remotely (from the client or HS2 machine) and sent to all the tasks."), diff --git ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java index 662e058..91873f4 100644 --- ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java +++ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java @@ -4796,6 +4796,32 @@ public Builder clearSum() { * optional .org.apache.hadoop.hive.ql.io.orc.TimestampStatistics timestampStatistics = 9; */ org.apache.hadoop.hive.ql.io.orc.OrcProto.TimestampStatisticsOrBuilder getTimestampStatisticsOrBuilder(); + + // repeated uint64 bloomFilter = 10; + /** + * repeated uint64 bloomFilter = 10; + * + *
+     * bloom filter represented as long array
+     * 
+ */ + java.util.List getBloomFilterList(); + /** + * repeated uint64 bloomFilter = 10; + * + *
+     * bloom filter represented as long array
+     * 
+ */ + int getBloomFilterCount(); + /** + * repeated uint64 bloomFilter = 10; + * + *
+     * bloom filter represented as long array
+     * 
+ */ + long getBloomFilter(int index); } /** * Protobuf type {@code org.apache.hadoop.hive.ql.io.orc.ColumnStatistics} @@ -4957,6 +4983,27 @@ private ColumnStatistics( bitField0_ |= 0x00000100; break; } + case 80: { + if (!((mutable_bitField0_ & 0x00000200) == 0x00000200)) { + bloomFilter_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000200; + } + bloomFilter_.add(input.readUInt64()); + break; + } + case 82: { + int length = input.readRawVarint32(); + int limit = input.pushLimit(length); + if (!((mutable_bitField0_ & 0x00000200) == 0x00000200) && input.getBytesUntilLimit() > 0) { + bloomFilter_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000200; + } + while (input.getBytesUntilLimit() > 0) { + bloomFilter_.add(input.readUInt64()); + } + input.popLimit(limit); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4965,6 +5012,9 @@ private ColumnStatistics( throw new com.google.protobuf.InvalidProtocolBufferException( e.getMessage()).setUnfinishedMessage(this); } finally { + if (((mutable_bitField0_ & 0x00000200) == 0x00000200)) { + bloomFilter_ = java.util.Collections.unmodifiableList(bloomFilter_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -5189,6 +5239,41 @@ public boolean hasTimestampStatistics() { return timestampStatistics_; } + // repeated uint64 bloomFilter = 10; + public static final int BLOOMFILTER_FIELD_NUMBER = 10; + private java.util.List bloomFilter_; + /** + * repeated uint64 bloomFilter = 10; + * + *
+     * bloom filter represented as long array
+     * 
+ */ + public java.util.List + getBloomFilterList() { + return bloomFilter_; + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+     * bloom filter represented as long array
+     * 
+ */ + public int getBloomFilterCount() { + return bloomFilter_.size(); + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+     * bloom filter represented as long array
+     * 
+ */ + public long getBloomFilter(int index) { + return bloomFilter_.get(index); + } + private void initFields() { numberOfValues_ = 0L; intStatistics_ = org.apache.hadoop.hive.ql.io.orc.OrcProto.IntegerStatistics.getDefaultInstance(); @@ -5199,6 +5284,7 @@ private void initFields() { dateStatistics_ = org.apache.hadoop.hive.ql.io.orc.OrcProto.DateStatistics.getDefaultInstance(); binaryStatistics_ = org.apache.hadoop.hive.ql.io.orc.OrcProto.BinaryStatistics.getDefaultInstance(); timestampStatistics_ = org.apache.hadoop.hive.ql.io.orc.OrcProto.TimestampStatistics.getDefaultInstance(); + bloomFilter_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5239,6 +5325,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000100) == 0x00000100)) { output.writeMessage(9, timestampStatistics_); } + for (int i = 0; i < bloomFilter_.size(); i++) { + output.writeUInt64(10, bloomFilter_.get(i)); + } getUnknownFields().writeTo(output); } @@ -5284,6 +5373,15 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeMessageSize(9, timestampStatistics_); } + { + int dataSize = 0; + for (int i = 0; i < bloomFilter_.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream + .computeUInt64SizeNoTag(bloomFilter_.get(i)); + } + size += dataSize; + size += 1 * getBloomFilterList().size(); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5458,6 +5556,8 @@ public Builder clear() { timestampStatisticsBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000100); + bloomFilter_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000200); return this; } @@ -5554,6 +5654,11 @@ public Builder clone() { } else { result.timestampStatistics_ = timestampStatisticsBuilder_.build(); } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + bloomFilter_ = java.util.Collections.unmodifiableList(bloomFilter_); + bitField0_ = (bitField0_ & ~0x00000200); + } + result.bloomFilter_ = bloomFilter_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5597,6 +5702,16 @@ public Builder mergeFrom(org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnStatist if (other.hasTimestampStatistics()) { mergeTimestampStatistics(other.getTimestampStatistics()); } + if (!other.bloomFilter_.isEmpty()) { + if (bloomFilter_.isEmpty()) { + bloomFilter_ = other.bloomFilter_; + bitField0_ = (bitField0_ & ~0x00000200); + } else { + ensureBloomFilterIsMutable(); + bloomFilter_.addAll(other.bloomFilter_); + } + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6593,6 +6708,100 @@ public Builder clearTimestampStatistics() { return timestampStatisticsBuilder_; } + // repeated uint64 bloomFilter = 10; + private java.util.List bloomFilter_ = java.util.Collections.emptyList(); + private void ensureBloomFilterIsMutable() { + if (!((bitField0_ & 0x00000200) == 0x00000200)) { + bloomFilter_ = new java.util.ArrayList(bloomFilter_); + bitField0_ |= 0x00000200; + } + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+       * bloom filter represented as long array
+       * 
+ */ + public java.util.List + getBloomFilterList() { + return java.util.Collections.unmodifiableList(bloomFilter_); + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+       * bloom filter represented as long array
+       * 
+ */ + public int getBloomFilterCount() { + return bloomFilter_.size(); + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+       * bloom filter represented as long array
+       * 
+ */ + public long getBloomFilter(int index) { + return bloomFilter_.get(index); + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+       * bloom filter represented as long array
+       * 
+ */ + public Builder setBloomFilter( + int index, long value) { + ensureBloomFilterIsMutable(); + bloomFilter_.set(index, value); + onChanged(); + return this; + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+       * bloom filter represented as long array
+       * 
+ */ + public Builder addBloomFilter(long value) { + ensureBloomFilterIsMutable(); + bloomFilter_.add(value); + onChanged(); + return this; + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+       * bloom filter represented as long array
+       * 
+ */ + public Builder addAllBloomFilter( + java.lang.Iterable values) { + ensureBloomFilterIsMutable(); + super.addAll(values, bloomFilter_); + onChanged(); + return this; + } + /** + * repeated uint64 bloomFilter = 10; + * + *
+       * bloom filter represented as long array
+       * 
+ */ + public Builder clearBloomFilter() { + bloomFilter_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000200); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.apache.hadoop.hive.ql.io.orc.ColumnStatistics) } @@ -17657,7 +17866,7 @@ public Builder setMagicBytes( "\"2\n\016DateStatistics\022\017\n\007minimum\030\001 \001(\021\022\017\n\007m", "aximum\030\002 \001(\021\"7\n\023TimestampStatistics\022\017\n\007m" + "inimum\030\001 \001(\022\022\017\n\007maximum\030\002 \001(\022\"\037\n\020BinaryS" + - "tatistics\022\013\n\003sum\030\001 \001(\022\"\234\005\n\020ColumnStatist" + + "tatistics\022\013\n\003sum\030\001 \001(\022\"\261\005\n\020ColumnStatist" + "ics\022\026\n\016numberOfValues\030\001 \001(\004\022J\n\rintStatis" + "tics\030\002 \001(\01323.org.apache.hadoop.hive.ql.i" + "o.orc.IntegerStatistics\022L\n\020doubleStatist" + @@ -17674,60 +17883,60 @@ public Builder setMagicBytes( "org.apache.hadoop.hive.ql.io.orc.BinaryS" + "tatistics\022R\n\023timestampStatistics\030\t \001(\01325" + ".org.apache.hadoop.hive.ql.io.orc.Timest" + - "ampStatistics\"n\n\rRowIndexEntry\022\025\n\tpositi", - "ons\030\001 \003(\004B\002\020\001\022F\n\nstatistics\030\002 \001(\01322.org." + - "apache.hadoop.hive.ql.io.orc.ColumnStati" + - "stics\"J\n\010RowIndex\022>\n\005entry\030\001 \003(\0132/.org.a" + - "pache.hadoop.hive.ql.io.orc.RowIndexEntr" + - "y\"\331\001\n\006Stream\022;\n\004kind\030\001 \002(\0162-.org.apache." + - "hadoop.hive.ql.io.orc.Stream.Kind\022\016\n\006col" + - "umn\030\002 \001(\r\022\016\n\006length\030\003 \001(\004\"r\n\004Kind\022\013\n\007PRE" + - "SENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGTH\020\002\022\023\n\017DICTIONA" + - "RY_DATA\020\003\022\024\n\020DICTIONARY_COUNT\020\004\022\r\n\tSECON" + - "DARY\020\005\022\r\n\tROW_INDEX\020\006\"\263\001\n\016ColumnEncoding", - "\022C\n\004kind\030\001 \002(\01625.org.apache.hadoop.hive." + - "ql.io.orc.ColumnEncoding.Kind\022\026\n\016diction" + - "arySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006DIRECT\020\000\022\016\n\nDIC" + - "TIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022\021\n\rDICTIONARY_V" + - "2\020\003\"\214\001\n\014StripeFooter\0229\n\007streams\030\001 \003(\0132(." + - "org.apache.hadoop.hive.ql.io.orc.Stream\022" + - "A\n\007columns\030\002 \003(\01320.org.apache.hadoop.hiv" + - "e.ql.io.orc.ColumnEncoding\"\370\002\n\004Type\0229\n\004k" + - "ind\030\001 \002(\0162+.org.apache.hadoop.hive.ql.io" + - ".orc.Type.Kind\022\024\n\010subtypes\030\002 \003(\rB\002\020\001\022\022\n\n", - "fieldNames\030\003 \003(\t\022\025\n\rmaximumLength\030\004 \001(\r\022" + - "\021\n\tprecision\030\005 \001(\r\022\r\n\005scale\030\006 \001(\r\"\321\001\n\004Ki" + - "nd\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003" + - "INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n" + - "\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n" + - "\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022" + - "\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\022\013\n\007VARCHAR\020\020\022\010\n\004C" + - "HAR\020\021\"x\n\021StripeInformation\022\016\n\006offset\030\001 \001" + - "(\004\022\023\n\013indexLength\030\002 \001(\004\022\022\n\ndataLength\030\003 " + - "\001(\004\022\024\n\014footerLength\030\004 \001(\004\022\024\n\014numberOfRow", - "s\030\005 \001(\004\"/\n\020UserMetadataItem\022\014\n\004name\030\001 \002(" + - "\t\022\r\n\005value\030\002 \002(\014\"X\n\020StripeStatistics\022D\n\010" + - "colStats\030\001 \003(\01322.org.apache.hadoop.hive." + - "ql.io.orc.ColumnStatistics\"S\n\010Metadata\022G" + - "\n\013stripeStats\030\001 \003(\01322.org.apache.hadoop." + - "hive.ql.io.orc.StripeStatistics\"\356\002\n\006Foot" + - "er\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontentLengt" + - "h\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apache.ha" + - "doop.hive.ql.io.orc.StripeInformation\0225\n" + - "\005types\030\004 \003(\0132&.org.apache.hadoop.hive.ql", - ".io.orc.Type\022D\n\010metadata\030\005 \003(\01322.org.apa" + - "che.hadoop.hive.ql.io.orc.UserMetadataIt" + - "em\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatistics\030\007" + - " \003(\01322.org.apache.hadoop.hive.ql.io.orc." + - "ColumnStatistics\022\026\n\016rowIndexStride\030\010 \001(\r" + - "\"\334\001\n\nPostScript\022\024\n\014footerLength\030\001 \001(\004\022F\n" + - "\013compression\030\002 \001(\01621.org.apache.hadoop.h" + - "ive.ql.io.orc.CompressionKind\022\034\n\024compres" + - "sionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001" + - "\022\026\n\016metadataLength\030\005 \001(\004\022\025\n\rwriterVersio", - "n\030\006 \001(\r\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKi" + - "nd\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZ" + - "O\020\003" + "ampStatistics\022\023\n\013bloomFilter\030\n \003(\004\"n\n\rRo", + "wIndexEntry\022\025\n\tpositions\030\001 \003(\004B\002\020\001\022F\n\nst" + + "atistics\030\002 \001(\01322.org.apache.hadoop.hive." + + "ql.io.orc.ColumnStatistics\"J\n\010RowIndex\022>" + + "\n\005entry\030\001 \003(\0132/.org.apache.hadoop.hive.q" + + "l.io.orc.RowIndexEntry\"\331\001\n\006Stream\022;\n\004kin" + + "d\030\001 \002(\0162-.org.apache.hadoop.hive.ql.io.o" + + "rc.Stream.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006length" + + "\030\003 \001(\004\"r\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n" + + "\006LENGTH\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DICTIO" + + "NARY_COUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX", + "\020\006\"\263\001\n\016ColumnEncoding\022C\n\004kind\030\001 \002(\01625.or" + + "g.apache.hadoop.hive.ql.io.orc.ColumnEnc" + + "oding.Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Ki" + + "nd\022\n\n\006DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT" + + "_V2\020\002\022\021\n\rDICTIONARY_V2\020\003\"\214\001\n\014StripeFoote" + + "r\0229\n\007streams\030\001 \003(\0132(.org.apache.hadoop.h" + + "ive.ql.io.orc.Stream\022A\n\007columns\030\002 \003(\01320." + + "org.apache.hadoop.hive.ql.io.orc.ColumnE" + + "ncoding\"\370\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apa" + + "che.hadoop.hive.ql.io.orc.Type.Kind\022\024\n\010s", + "ubtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\022\025\n" + + "\rmaximumLength\030\004 \001(\r\022\021\n\tprecision\030\005 \001(\r\022" + + "\r\n\005scale\030\006 \001(\r\"\321\001\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004" + + "BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005" + + "FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006STRING\020\007\022\n\n\006BINAR" + + "Y\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n" + + "\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DAT" + + "E\020\017\022\013\n\007VARCHAR\020\020\022\010\n\004CHAR\020\021\"x\n\021StripeInfo" + + "rmation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002" + + " \001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength", + "\030\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMeta" + + "dataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"X\n" + + "\020StripeStatistics\022D\n\010colStats\030\001 \003(\01322.or" + + "g.apache.hadoop.hive.ql.io.orc.ColumnSta" + + "tistics\"S\n\010Metadata\022G\n\013stripeStats\030\001 \003(\013" + + "22.org.apache.hadoop.hive.ql.io.orc.Stri" + + "peStatistics\"\356\002\n\006Footer\022\024\n\014headerLength\030" + + "\001 \001(\004\022\025\n\rcontentLength\030\002 \001(\004\022D\n\007stripes\030" + + "\003 \003(\01323.org.apache.hadoop.hive.ql.io.orc" + + ".StripeInformation\0225\n\005types\030\004 \003(\0132&.org.", + "apache.hadoop.hive.ql.io.orc.Type\022D\n\010met" + + "adata\030\005 \003(\01322.org.apache.hadoop.hive.ql." + + "io.orc.UserMetadataItem\022\024\n\014numberOfRows\030" + + "\006 \001(\004\022F\n\nstatistics\030\007 \003(\01322.org.apache.h" + + "adoop.hive.ql.io.orc.ColumnStatistics\022\026\n" + + "\016rowIndexStride\030\010 \001(\r\"\334\001\n\nPostScript\022\024\n\014" + + "footerLength\030\001 \001(\004\022F\n\013compression\030\002 \001(\0162" + + "1.org.apache.hadoop.hive.ql.io.orc.Compr" + + "essionKind\022\034\n\024compressionBlockSize\030\003 \001(\004" + + "\022\023\n\007version\030\004 \003(\rB\002\020\001\022\026\n\016metadataLength\030", + "\005 \001(\004\022\025\n\rwriterVersion\030\006 \001(\r\022\016\n\005magic\030\300>" + + " \001(\t*:\n\017CompressionKind\022\010\n\004NONE\020\000\022\010\n\004ZLI" + + "B\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -17787,7 +17996,7 @@ public Builder setMagicBytes( internal_static_org_apache_hadoop_hive_ql_io_orc_ColumnStatistics_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_apache_hadoop_hive_ql_io_orc_ColumnStatistics_descriptor, - new java.lang.String[] { "NumberOfValues", "IntStatistics", "DoubleStatistics", "StringStatistics", "BucketStatistics", "DecimalStatistics", "DateStatistics", "BinaryStatistics", "TimestampStatistics", }); + new java.lang.String[] { "NumberOfValues", "IntStatistics", "DoubleStatistics", "StringStatistics", "BucketStatistics", "DecimalStatistics", "DateStatistics", "BinaryStatistics", "TimestampStatistics", "BloomFilter", }); internal_static_org_apache_hadoop_hive_ql_io_orc_RowIndexEntry_descriptor = getDescriptor().getMessageTypes().get(9); internal_static_org_apache_hadoop_hive_ql_io_orc_RowIndexEntry_fieldAccessorTable = new diff --git ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilter.java ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilter.java new file mode 100644 index 0000000..d129f3d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilter.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.filters; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + +import java.util.List; + +/** + * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are + * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of + * bloom filter false positive (element not present in bloom filter but test() says true) are + * possible but false negatives are not possible (if element is present then test() will never + * say false). The false positive probability is configurable (default: 5%) depending on which + * storage requirement may increase or decrease. Lower the false positive probability greater + * is the space requirement. + * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter. + * During the creation of bloom filter expected number of entries must be specified. If the number + * of insertions exceed the specified initial number of entries then false positive probability will + * increase accordingly. + * + * Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash + * algorithm. Although Murmur2 is slightly faster than Murmur3 in Java, it suffers from hash + * collisions for specific sequence of repeating bytes. Check the following link for more info + * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw + */ +public class BloomFilter { + private static final double DEFAULT_FPP = 0.05; + private BitSet bitSet; + private int m; + private int k; + + // will be serialized + private double fpp; + private long n; + + public BloomFilter(long maxNumEntries) { + this(maxNumEntries, DEFAULT_FPP); + } + + public BloomFilter(long maxNumEntries, double fpp) { + assert maxNumEntries > 0 : "maxNumEntries should be > 0"; + assert fpp > 0.0 && fpp < 1.0 : "False positive percentage should be > 0.0 & < 1.0"; + this.m = optimalNumOfBits(maxNumEntries, fpp); + this.k = optimalNumOfHashFunctions(maxNumEntries, m); + this.fpp = fpp; + this.n = maxNumEntries; + this.bitSet = new BitSet(m); + } + + // deserialize bloomfilter. see serialize() for the format. + public BloomFilter(List serializedBloom) { + this(serializedBloom.get(0), Double.longBitsToDouble(serializedBloom.get(1))); + long[] data = Longs.toArray(serializedBloom.subList(2, serializedBloom.size())); + this.bitSet = new BitSet(data); + } + + static int optimalNumOfHashFunctions(long n, long m) { + return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); + } + + static int optimalNumOfBits(long n, double p) { + if (p == 0) { + p = Double.MIN_VALUE; + } + return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); + } + + public long sizeInBytes() { + return bitSet.bitSize() / 8; + } + + public void add(byte[] val) { + addBytes(val); + } + + public void addBytes(byte[] val) { + // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter" + // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively + // implement a Bloom filter without any loss in the asymptotic false positive probability' + + // Lets split up 64-bit hashcode into two 32-bit hashcodes and employ the technique mentioned + // in the above paper + long hash64 = val == null ? Murmur3.NULL_HASHCODE : Murmur3.hash64(val); + int hash1 = (int) hash64; + int hash2 = (int) (hash64 >>> 32); + + for (int i = 1; i <= k; i++) { + int combinedHash = hash1 + (i * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + int pos = combinedHash % m; + bitSet.set(pos); + } + } + + public void addString(String val) { + if (val == null) { + addBytes(null); + } else { + addBytes(val.getBytes()); + } + } + + public void addLong(long val) { + // puts long in little endian order + addBytes(longToByteArrayLE(val)); + } + + public void addDouble(double val) { + addLong(Double.doubleToLongBits(val)); + } + + public boolean test(byte[] val) { + return testBytes(val); + } + + public boolean testBytes(byte[] val) { + long hash64 = val == null ? Murmur3.NULL_HASHCODE : Murmur3.hash64(val); + int hash1 = (int) hash64; + int hash2 = (int) (hash64 >>> 32); + + for (int i = 1; i <= k; i++) { + int combinedHash = hash1 + (i * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + int pos = combinedHash % m; + if (!bitSet.get(pos)) { + return false; + } + } + return true; + } + + public boolean testString(String val) { + if (val == null) { + return testBytes(null); + } else { + return testBytes(val.getBytes()); + } + } + + public boolean testLong(long val) { + return testBytes(longToByteArrayLE(val)); + } + + public boolean testDouble(double val) { + return testLong(Double.doubleToLongBits(val)); + } + + private static byte[] longToByteArrayLE(long val) { + return new byte[]{(byte) (val >> 0), + (byte) (val >> 8), + (byte) (val >> 16), + (byte) (val >> 24), + (byte) (val >> 32), + (byte) (val >> 40), + (byte) (val >> 48), + (byte) (val >> 56),}; + } + + public int getBitSize() { + return m; + } + + public int getNumHashFunctions() { + return k; + } + + public double getFalsePositivePercent() { + return fpp; + } + + public long getExpectedNumEntries() { + return n; + } + + /** + * First 2 entries are expected entries (n) and false positive percentage (fpp). fpp which is a + * double is serialized as long. The entries following first 2 entries are the actual bit set. + * + * @return bloom filter as list of long + */ + public List serialize() { + List serialized = Lists.newArrayList(); + serialized.add(n); + serialized.add(Double.doubleToLongBits(fpp)); + for (long l : bitSet.getData()) { + serialized.add(l); + } + return serialized; + } + + /** + * Check if the specified bloom filter is compatible with the current bloom filter. + * + * @param that - bloom filter to check compatibility + * @return true if compatible false otherwise + */ + public boolean isCompatible(BloomFilter that) { + return this != that && + this.getBitSize() == that.getBitSize() && + this.getNumHashFunctions() == that.getNumHashFunctions(); + } + + /** + * Merge the specified bloom filter with current bloom filter. + * NOTE: Merge does not check for incompatibility. Use isCompatible() before calling merge(). + * + * @param that - bloom filter to merge + */ + public void merge(BloomFilter that) { + this.bitSet.putAll(that.bitSet); + } + + + /** + * Bare metal bitset implementation. For performance reasons, this implementation does not check + * for index bounds nor expand the bitset size if the specified index is greater than the size. + */ + private class BitSet { + final long[] data; + + BitSet(long bits) { + this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]); + } + + /** + * Deserialize long array as bitset. + * + * @param data + */ + BitSet(long[] data) { + assert data.length > 0 : "data length is zero!"; + this.data = data; + } + + /** + * Sets the bit at specified index. + * + * @param index + */ + void set(long index) { + data[(int) (index >>> 6)] |= (1L << index); + } + + /** + * Returns true if the bit is set in the specified index. + * + * @param index + * @return + */ + boolean get(long index) { + return (data[(int) (index >>> 6)] & (1L << index)) != 0; + } + + /** + * Number of bits + */ + long bitSize() { + return (long) data.length * Long.SIZE; + } + + long[] getData() { + return data; + } + + /** + * Combines the two BitArrays using bitwise OR. + */ + void putAll(BitSet array) { + assert data.length == array.data.length : + "BitArrays must be of equal length (" + data.length + "!= " + array.data.length + ")"; + for (int i = 0; i < data.length; i++) { + data[i] |= array.data[i]; + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/filters/Murmur3.java ql/src/java/org/apache/hadoop/hive/ql/io/filters/Murmur3.java new file mode 100644 index 0000000..f9f3d2c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/filters/Murmur3.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.filters; + +/** + * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms. + * + * Murmur3 32 and 128 bit variants. + * 32-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94 + * 128-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#255 + */ +public class Murmur3 { + // from 64-bit linear congruential generator + public static final long NULL_HASHCODE = 2862933555777941757L; + + // Constants for 32 bit variant + private static final int C1_32 = 0xcc9e2d51; + private static final int C2_32 = 0x1b873593; + private static final int R1_32 = 15; + private static final int R2_32 = 13; + private static final int M_32 = 5; + private static final int N_32 = 0xe6546b64; + + // Constants for 128 bit variant + private static final long C1 = 0x87c37b91114253d5L; + private static final long C2 = 0x4cf5ad432745937fL; + private static final int R1 = 31; + private static final int R2 = 27; + private static final int R3 = 33; + private static final int M = 5; + private static final int N1 = 0x52dce729; + private static final int N2 = 0x38495ab5; + + private static final int DEFAULT_SEED = 104729; + + /** + * Murmur3 32-bit variant. + * + * @param data - input byte array + * @return - hashcode + */ + public static int hash32(byte[] data) { + return hash32(data, data.length, DEFAULT_SEED); + } + + /** + * Murmur3 32-bit variant. + * + * @param data - input byte array + * @param length - length of array + * @param seed - seed. (default 0) + * @return - hashcode + */ + public static int hash32(byte[] data, int length, int seed) { + int hash = seed; + final int nblocks = length >> 2; + + // body + for (int i = 0; i < nblocks; i++) { + int i_4 = i << 2; + int k = (data[i_4] & 0xff) + | ((data[i_4 + 1] & 0xff) << 8) + | ((data[i_4 + 2] & 0xff) << 16) + | ((data[i_4 + 3] & 0xff) << 24); + + // mix functions + k *= C1_32; + k = Integer.rotateLeft(k, R1_32); + k *= C2_32; + hash ^= k; + hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32; + } + + // tail + int idx = nblocks << 2; + int k1 = 0; + switch (length - idx) { + case 3: + k1 ^= data[idx + 2] << 16; + case 2: + k1 ^= data[idx + 1] << 8; + case 1: + k1 ^= data[idx]; + + // mix functions + k1 *= C1_32; + k1 = Integer.rotateLeft(k1, R1_32); + k1 *= C2_32; + hash ^= k1; + } + + // finalization + hash ^= length; + hash ^= (hash >>> 16); + hash *= 0x85ebca6b; + hash ^= (hash >>> 13); + hash *= 0xc2b2ae35; + hash ^= (hash >>> 16); + + return hash; + } + + /** + * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant. + * + * @param data - input byte array + * @return - hashcode + */ + public static long hash64(byte[] data) { + return hash64(data, data.length, DEFAULT_SEED); + } + + /** + * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant. + * + * @param data - input byte array + * @param length - length of array + * @param seed - seed. (default is 0) + * @return - hashcode + */ + public static long hash64(byte[] data, int length, int seed) { + long hash = seed; + final int nblocks = length >> 3; + + // body + for (int i = 0; i < nblocks; i++) { + final int i8 = i << 3; + long k = ((long) data[i8] & 0xff) + | (((long) data[i8 + 1] & 0xff) << 8) + | (((long) data[i8 + 2] & 0xff) << 16) + | (((long) data[i8 + 3] & 0xff) << 24) + | (((long) data[i8 + 4] & 0xff) << 32) + | (((long) data[i8 + 5] & 0xff) << 40) + | (((long) data[i8 + 6] & 0xff) << 48) + | (((long) data[i8 + 7] & 0xff) << 56); + + // mix functions + k *= C1; + k = Long.rotateLeft(k, R1); + k *= C2; + hash ^= k; + hash = Long.rotateLeft(hash, R2) * M + N1; + } + + // tail + long k1 = 0; + int tailStart = nblocks << 3; + switch (length - tailStart) { + case 7: + k1 ^= ((long) data[tailStart + 6] & 0xff) << 48; + case 6: + k1 ^= ((long) data[tailStart + 5] & 0xff) << 40; + case 5: + k1 ^= ((long) data[tailStart + 4] & 0xff) << 32; + case 4: + k1 ^= ((long) data[tailStart + 3] & 0xff) << 24; + case 3: + k1 ^= ((long) data[tailStart + 2] & 0xff) << 16; + case 2: + k1 ^= ((long) data[tailStart + 1] & 0xff) << 8; + case 1: + k1 ^= ((long) data[tailStart] & 0xff); + k1 *= C1; + k1 = Long.rotateLeft(k1, R1); + k1 *= C2; + hash ^= k1; + } + + // finalization + hash ^= length; + hash = fmix64(hash); + + return hash; + } + + /** + * Murmur3 128-bit variant. + * + * @param data - input byte array + * @return - hashcode (2 longs) + */ + public static long[] hash128(byte[] data) { + return hash128(data, data.length, DEFAULT_SEED); + } + + /** + * Murmur3 128-bit variant. + * + * @param data - input byte array + * @param length - length of array + * @param seed - seed. (default is 0) + * @return - hashcode (2 longs) + */ + public static long[] hash128(byte[] data, int length, int seed) { + long h1 = seed; + long h2 = seed; + final int nblocks = length >> 4; + + // body + for (int i = 0; i < nblocks; i++) { + final int i16 = i << 4; + long k1 = ((long) data[i16] & 0xff) + | (((long) data[i16 + 1] & 0xff) << 8) + | (((long) data[i16 + 2] & 0xff) << 16) + | (((long) data[i16 + 3] & 0xff) << 24) + | (((long) data[i16 + 4] & 0xff) << 32) + | (((long) data[i16 + 5] & 0xff) << 40) + | (((long) data[i16 + 6] & 0xff) << 48) + | (((long) data[i16 + 7] & 0xff) << 56); + + long k2 = ((long) data[i16 + 8] & 0xff) + | (((long) data[i16 + 9] & 0xff) << 8) + | (((long) data[i16 + 10] & 0xff) << 16) + | (((long) data[i16 + 11] & 0xff) << 24) + | (((long) data[i16 + 12] & 0xff) << 32) + | (((long) data[i16 + 13] & 0xff) << 40) + | (((long) data[i16 + 14] & 0xff) << 48) + | (((long) data[i16 + 15] & 0xff) << 56); + + // mix functions for k1 + k1 *= C1; + k1 = Long.rotateLeft(k1, R1); + k1 *= C2; + h1 ^= k1; + h1 = Long.rotateLeft(h1, R2); + h1 += h2; + h1 = h1 * M + N1; + + // mix functions for k2 + k2 *= C2; + k2 = Long.rotateLeft(k2, R3); + k2 *= C1; + h2 ^= k2; + h2 = Long.rotateLeft(h2, R1); + h2 += h1; + h2 = h2 * M + N2; + } + + // tail + long k1 = 0; + long k2 = 0; + int tailStart = nblocks << 4; + switch (length - tailStart) { + case 15: + k2 ^= (long) (data[tailStart + 14] & 0xff) << 48; + case 14: + k2 ^= (long) (data[tailStart + 13] & 0xff) << 40; + case 13: + k2 ^= (long) (data[tailStart + 12] & 0xff) << 32; + case 12: + k2 ^= (long) (data[tailStart + 11] & 0xff) << 24; + case 11: + k2 ^= (long) (data[tailStart + 10] & 0xff) << 16; + case 10: + k2 ^= (long) (data[tailStart + 9] & 0xff) << 8; + case 9: + k2 ^= (long) (data[tailStart + 8] & 0xff); + k2 *= C2; + k2 = Long.rotateLeft(k2, R3); + k2 *= C1; + h2 ^= k2; + + case 8: + k1 ^= (long) (data[tailStart + 7] & 0xff) << 56; + case 7: + k1 ^= (long) (data[tailStart + 6] & 0xff) << 48; + case 6: + k1 ^= (long) (data[tailStart + 5] & 0xff) << 40; + case 5: + k1 ^= (long) (data[tailStart + 4] & 0xff) << 32; + case 4: + k1 ^= (long) (data[tailStart + 3] & 0xff) << 24; + case 3: + k1 ^= (long) (data[tailStart + 2] & 0xff) << 16; + case 2: + k1 ^= (long) (data[tailStart + 1] & 0xff) << 8; + case 1: + k1 ^= (long) (data[tailStart] & 0xff); + k1 *= C1; + k1 = Long.rotateLeft(k1, R1); + k1 *= C2; + h1 ^= k1; + } + + // finalization + h1 ^= length; + h2 ^= length; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + return new long[]{h1, h2}; + } + + private static long fmix64(long h) { + h ^= (h >>> 33); + h *= 0xff51afd7ed558ccdL; + h ^= (h >>> 33); + h *= 0xc4ceb9fe1a85ec53L; + h ^= (h >>> 33); + return h; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatistics.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatistics.java index 31fa012..621aa3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatistics.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatistics.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.ql.io.filters.BloomFilter; + /** * Statistics that are available for all types of columns. */ @@ -27,4 +29,10 @@ * @return the number of values */ long getNumberOfValues(); + + /** + * Get bloom filter. + * @return + */ + BloomFilter getBloomFilter(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java index 3235b0e..2734dab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java @@ -17,17 +17,32 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import java.sql.Timestamp; - import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.io.filters.BloomFilter; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +import java.sql.Timestamp; + class ColumnStatisticsImpl implements ColumnStatistics { + BloomFilter bf; + boolean useBloomFilter; + private long numBloomEntries; + private double fpp; + + ColumnStatisticsImpl(boolean createBloomFilter, int n, double fpp) { + this.useBloomFilter = createBloomFilter; + this.numBloomEntries = n; + this.fpp = fpp; + if (useBloomFilter) { + this.bf = new BloomFilter(n, fpp); + } + } + private static final class BooleanStatisticsImpl extends ColumnStatisticsImpl implements BooleanColumnStatistics { private long trueCount = 0; @@ -97,6 +112,7 @@ public String toString() { private boolean overflow = false; IntegerStatisticsImpl() { + this(false, 0, 0.0); } IntegerStatisticsImpl(OrcProto.ColumnStatistics stats) { @@ -116,6 +132,10 @@ public String toString() { } } + public IntegerStatisticsImpl(boolean createBloomFilter, int numEntries, double fpp) { + super(createBloomFilter, numEntries, fpp); + } + @Override void reset() { super.reset(); @@ -144,6 +164,9 @@ void updateInteger(long value) { overflow = (sum >= 0) != wasPositive; } } + if (useBloomFilter) { + bf.addLong(value); + } } @Override @@ -233,6 +256,7 @@ public String toString() { private double sum = 0; DoubleStatisticsImpl() { + this(false, 0, 0.0); } DoubleStatisticsImpl(OrcProto.ColumnStatistics stats) { @@ -250,6 +274,10 @@ public String toString() { } } + public DoubleStatisticsImpl(boolean createBloomFilter, int numEntries, double fpp) { + super(createBloomFilter, numEntries, fpp); + } + @Override void reset() { super.reset(); @@ -271,6 +299,9 @@ void updateDouble(double value) { maximum = value; } sum += value; + if (useBloomFilter) { + bf.addDouble(value); + } } @Override @@ -343,6 +374,7 @@ public String toString() { private long sum = 0; StringStatisticsImpl() { + this(false, 0, 0.0); } StringStatisticsImpl(OrcProto.ColumnStatistics stats) { @@ -359,6 +391,10 @@ public String toString() { } } + public StringStatisticsImpl(boolean createBloomFilter, int numEntries, double fpp) { + super(createBloomFilter, numEntries, fpp); + } + @Override void reset() { super.reset(); @@ -369,6 +405,11 @@ void reset() { @Override void updateString(Text value) { + if (value == null) { + super.setNull(); + return; + } + if (minimum == null) { maximum = minimum = new Text(value); } else if (minimum.compareTo(value) > 0) { @@ -377,6 +418,9 @@ void updateString(Text value) { maximum = new Text(value); } sum += value.getLength(); + if (useBloomFilter) { + bf.addString(value.toString()); + } } @Override @@ -452,6 +496,7 @@ public String toString() { private long sum = 0; BinaryStatisticsImpl() { + this(false, 0, 0.0); } BinaryStatisticsImpl(OrcProto.ColumnStatistics stats) { @@ -462,6 +507,10 @@ public String toString() { } } + public BinaryStatisticsImpl(boolean createBloomFilter, int numEntries, double fpp) { + super(createBloomFilter, numEntries, fpp); + } + @Override void reset() { super.reset(); @@ -470,7 +519,15 @@ void reset() { @Override void updateBinary(BytesWritable value) { + if (value == null) { + super.setNull(); + return; + } + sum += value.getLength(); + if (useBloomFilter) { + bf.addBytes(value.getBytes()); + } } @Override @@ -512,6 +569,7 @@ public String toString() { private HiveDecimal sum = HiveDecimal.ZERO; DecimalStatisticsImpl() { + this(false, 0, 0.0); } DecimalStatisticsImpl(OrcProto.ColumnStatistics stats) { @@ -530,6 +588,10 @@ public String toString() { } } + public DecimalStatisticsImpl(boolean createBloomFilter, int numEntries, double fpp) { + super(createBloomFilter, numEntries, fpp); + } + @Override void reset() { super.reset(); @@ -540,6 +602,11 @@ void reset() { @Override void updateDecimal(HiveDecimal value) { + if (value == null) { + super.setNull(); + return; + } + if (minimum == null) { minimum = value; maximum = value; @@ -551,6 +618,9 @@ void updateDecimal(HiveDecimal value) { if (sum != null) { sum = sum.add(value); } + if (useBloomFilter) { + bf.addString(value.toString()); + } } @Override @@ -630,6 +700,7 @@ public String toString() { private Integer maximum = null; DateStatisticsImpl() { + this(false, 0, 0.0); } DateStatisticsImpl(OrcProto.ColumnStatistics stats) { @@ -644,6 +715,10 @@ public String toString() { } } + public DateStatisticsImpl(boolean createBloomFilter, int numEntries, double fpp) { + super(createBloomFilter, numEntries, fpp); + } + @Override void reset() { super.reset(); @@ -653,6 +728,11 @@ void reset() { @Override void updateDate(DateWritable value) { + if (value == null) { + super.setNull(); + return; + } + if (minimum == null) { minimum = value.getDays(); maximum = value.getDays(); @@ -661,6 +741,9 @@ void updateDate(DateWritable value) { } else if (maximum < value.getDays()) { maximum = value.getDays(); } + if (useBloomFilter) { + bf.addLong(value.getDays()); + } } @Override @@ -727,6 +810,7 @@ public String toString() { private Long maximum = null; TimestampStatisticsImpl() { + this(false, 0, 0.0); } TimestampStatisticsImpl(OrcProto.ColumnStatistics stats) { @@ -741,6 +825,10 @@ public String toString() { } } + public TimestampStatisticsImpl(boolean createBloomFilter, int numEntries, double fpp) { + super(createBloomFilter, numEntries, fpp); + } + @Override void reset() { super.reset(); @@ -750,6 +838,11 @@ void reset() { @Override void updateTimestamp(Timestamp value) { + if (value == null) { + super.setNull(); + return; + } + if (minimum == null) { minimum = value.getTime(); maximum = value.getTime(); @@ -758,6 +851,9 @@ void updateTimestamp(Timestamp value) { } else if (maximum < value.getTime()) { maximum = value.getTime(); } + if (useBloomFilter) { + bf.addLong(value.getTime()); + } } @Override @@ -792,14 +888,12 @@ void merge(ColumnStatisticsImpl other) { @Override public Timestamp getMinimum() { - Timestamp minTimestamp = new Timestamp(minimum); - return minTimestamp; + return new Timestamp(minimum); } @Override public Timestamp getMaximum() { - Timestamp maxTimestamp = new Timestamp(maximum); - return maxTimestamp; + return new Timestamp(maximum); } @Override @@ -821,6 +915,12 @@ public String toString() { if (stats.hasNumberOfValues()) { count = stats.getNumberOfValues(); } + if (stats.getBloomFilterCount() > 0) { + useBloomFilter = true; + bf = new BloomFilter(stats.getBloomFilterList()); + numBloomEntries = bf.getExpectedNumEntries(); + fpp = bf.getFalsePositivePercent(); + } } ColumnStatisticsImpl() { @@ -830,6 +930,12 @@ void increment() { count += 1; } + private void setNull() { + if (useBloomFilter) { + bf.addBytes(null); + } + } + void updateBoolean(boolean value) { throw new UnsupportedOperationException("Can't update boolean"); } @@ -864,10 +970,16 @@ void updateTimestamp(Timestamp value) { void merge(ColumnStatisticsImpl stats) { count += stats.count; + if (useBloomFilter && bf.isCompatible(stats.bf)) { + bf.merge(stats.bf); + } } void reset() { count = 0; + if (useBloomFilter) { + bf = new BloomFilter(numBloomEntries, fpp); + } } @Override @@ -876,18 +988,39 @@ public long getNumberOfValues() { } @Override + public BloomFilter getBloomFilter() { + if (useBloomFilter) { + return bf; + } + return null; + } + + @Override public String toString() { - return "count: " + count; + StringBuilder sb = new StringBuilder(); + sb.append("count: ").append(count); + if (useBloomFilter) { + sb.append(" bloomfilter: true [").append(bf.sizeInBytes()).append("]"); + } + return sb.toString(); } OrcProto.ColumnStatistics.Builder serialize() { OrcProto.ColumnStatistics.Builder builder = OrcProto.ColumnStatistics.newBuilder(); builder.setNumberOfValues(count); + if (useBloomFilter) { + builder.addAllBloomFilter(bf.serialize()); + } return builder; } static ColumnStatisticsImpl create(ObjectInspector inspector) { + return create(inspector, false, 0, 0.0); + } + + static ColumnStatisticsImpl create(ObjectInspector inspector, boolean createBloomFilter, + int expectedBloomEntries, double fpp) { switch (inspector.getCategory()) { case PRIMITIVE: switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { @@ -897,22 +1030,22 @@ static ColumnStatisticsImpl create(ObjectInspector inspector) { case SHORT: case INT: case LONG: - return new IntegerStatisticsImpl(); + return new IntegerStatisticsImpl(createBloomFilter, expectedBloomEntries, fpp); case FLOAT: case DOUBLE: - return new DoubleStatisticsImpl(); + return new DoubleStatisticsImpl(createBloomFilter, expectedBloomEntries, fpp); case STRING: case CHAR: case VARCHAR: - return new StringStatisticsImpl(); + return new StringStatisticsImpl(createBloomFilter, expectedBloomEntries, fpp); case DECIMAL: - return new DecimalStatisticsImpl(); + return new DecimalStatisticsImpl(createBloomFilter, expectedBloomEntries, fpp); case DATE: - return new DateStatisticsImpl(); + return new DateStatisticsImpl(createBloomFilter, expectedBloomEntries, fpp); case TIMESTAMP: - return new TimestampStatisticsImpl(); + return new TimestampStatisticsImpl(createBloomFilter, expectedBloomEntries, fpp); case BINARY: - return new BinaryStatisticsImpl(); + return new BinaryStatisticsImpl(createBloomFilter, expectedBloomEntries, fpp); default: return new ColumnStatisticsImpl(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java index 10f8b8d..79279ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java @@ -17,15 +17,6 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import java.io.IOException; -import java.text.DecimalFormat; -import java.util.Map; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -46,6 +37,14 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + /** * A tool for printing out the file structure of ORC files. */ @@ -170,10 +169,7 @@ private static void printMetaData(List files, Configuration conf, buf.append("no stats at "); } else { ColumnStatistics cs = ColumnStatisticsImpl.deserialize(colStats); - Object min = RecordReaderImpl.getMin(cs), max = RecordReaderImpl.getMax(cs); - buf.append(" count: ").append(cs.getNumberOfValues()); - buf.append(" min: ").append(min); - buf.append(" max: ").append(max); + buf.append(cs.toString()); } buf.append(" positions: "); for (int posIx = 0; posIx < entry.getPositionsCount(); ++posIx) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index b46937c..0281c86 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -18,7 +18,14 @@ package org.apache.hadoop.hive.ql.io.orc; -import java.io.IOException; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_BLOOM_FILTER_COLUMNS; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_COMPRESS; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -26,7 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.*; +import java.io.IOException; /** * Contains factory methods to read or write ORC files. @@ -148,7 +155,8 @@ private WriterVersion(int id) { ROW_INDEX_STRIDE("orc.row.index.stride"), ENABLE_INDEXES("orc.create.index"), BLOCK_PADDING("orc.block.padding"), - ENCODING_STRATEGY("orc.encoding.strategy"); + ENCODING_STRATEGY("orc.encoding.strategy"), + BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns"); private final String propName; @@ -256,6 +264,8 @@ public static Reader createReader(Path path, private EncodingStrategy encodingStrategy; private CompressionStrategy compressionStrategy; private float paddingTolerance; + private String bloomFilterColumns; + private double bloomFilterFpp; WriterOptions(Configuration conf) { configuration = conf; @@ -288,9 +298,12 @@ public static Reader createReader(Path path, compressionStrategy = CompressionStrategy.valueOf(compString); } - paddingTolerance = - conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, - HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); + paddingTolerance = conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, + HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); + + bloomFilterColumns = HiveConf.getVar(conf, HIVE_ORC_BLOOM_FILTER_COLUMNS); + bloomFilterFpp = conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOOM_FILTER_FPP.varname, + HiveConf.ConfVars.HIVE_ORC_BLOOM_FILTER_FPP.defaultFloatVal); } /** @@ -367,6 +380,14 @@ public WriterOptions paddingTolerance(float value) { } /** + * Comma separated values of column names for which bloom filter is to be created. + */ + public WriterOptions bloomFilterColumns(String columns) { + bloomFilterColumns = columns; + return this; + } + + /** * Sets the generic compression that is used to compress the data. */ public WriterOptions compress(CompressionKind value) { @@ -438,8 +459,8 @@ public static Writer createWriter(Path path, opts.memoryManagerValue, opts.blockPaddingValue, opts.versionValue, opts.callback, opts.encodingStrategy, opts.compressionStrategy, - opts.paddingTolerance, - opts.blockSizeValue); + opts.paddingTolerance, opts.blockSizeValue, + opts.bloomFilterColumns, opts.bloomFilterFpp); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 5be2b4f..b7841d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -18,18 +18,9 @@ package org.apache.hadoop.hive.ql.io.orc; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.NavigableMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,9 +45,9 @@ import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -74,9 +65,18 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * A MapReduce/Hive input format for ORC files. *

@@ -923,9 +923,8 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, stripeStatistics.getColumnStatistics()[filterColumns[pred]]; Object minValue = RecordReaderImpl.getMin(stats); Object maxValue = RecordReaderImpl.getMax(stats); - truthValues[pred] = - RecordReaderImpl.evaluatePredicateRange(predLeaves.get(pred), - minValue, maxValue); + truthValues[pred] = RecordReaderImpl.evaluatePredicateRange(predLeaves.get(pred), + minValue, maxValue, stats.getBloomFilter()); } else { // parition column case. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 5bd3f0c..d76e25f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -170,6 +170,11 @@ private String getSettingFromPropsFallingBackToConf(String key, Properties props options.encodingStrategy(EncodingStrategy.valueOf(propVal)); } + if ((propVal = getSettingFromPropsFallingBackToConf( + OrcFile.OrcTableProperties.BLOOM_FILTER_COLUMNS.getPropName(),props,conf)) != null){ + options.bloomFilterColumns(propVal); + } + return options; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index f7fce3f..0f68b5f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -19,19 +19,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY; -import java.io.EOFException; -import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import com.google.common.collect.ComparisonChain; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.builder.HashCodeBuilder; @@ -51,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import org.apache.hadoop.hive.ql.io.filters.BloomFilter; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@ -74,7 +63,20 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import com.google.common.collect.ComparisonChain; +import java.io.EOFException; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; class RecordReaderImpl implements RecordReader { @@ -2366,11 +2368,11 @@ static TruthValue evaluatePredicate(OrcProto.ColumnStatistics index, ColumnStatistics cs = ColumnStatisticsImpl.deserialize(index); Object minValue = getMin(cs); Object maxValue = getMax(cs); - return evaluatePredicateRange(predicate, minValue, maxValue); + return evaluatePredicateRange(predicate, minValue, maxValue, cs.getBloomFilter()); } static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, - Object max) { + Object max, BloomFilter bloomFilter) { // if we didn't have any values, everything must have been null if (min == null) { if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) { @@ -2395,89 +2397,107 @@ static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, Object predObj = getBaseObjectForComparison(baseObj, minValue); switch (predicate.getOperator()) { - case NULL_SAFE_EQUALS: - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.BEFORE || loc == Location.AFTER) { - return TruthValue.NO; - } else { - return TruthValue.YES_NO; - } - case EQUALS: - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (minValue.equals(maxValue) && loc == Location.MIN) { - return TruthValue.YES_NULL; - } else if (loc == Location.BEFORE || loc == Location.AFTER) { - return TruthValue.NO_NULL; - } else { - return TruthValue.YES_NO_NULL; - } - case LESS_THAN: - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.AFTER) { - return TruthValue.YES_NULL; - } else if (loc == Location.BEFORE || loc == Location.MIN) { - return TruthValue.NO_NULL; - } else { - return TruthValue.YES_NO_NULL; - } - case LESS_THAN_EQUALS: - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.AFTER || loc == Location.MAX) { - return TruthValue.YES_NULL; - } else if (loc == Location.BEFORE) { - return TruthValue.NO_NULL; - } else { - return TruthValue.YES_NO_NULL; - } - case IN: - if (minValue.equals(maxValue)) { - // for a single value, look through to see if that value is in the - // set - for (Object arg : predicate.getLiteralList(PredicateLeaf.FileFormat.ORC)) { - predObj = getBaseObjectForComparison(arg, minValue); + case NULL_SAFE_EQUALS: + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.BEFORE || loc == Location.AFTER) { + return TruthValue.NO; + } else { + return TruthValue.YES_NO; + } + case EQUALS: + if (bloomFilter != null) { + return checkInBloomFilter(bloomFilter, predObj); + } else { loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.MIN) { + if (minValue.equals(maxValue) && loc == Location.MIN) { return TruthValue.YES_NULL; + } else if (loc == Location.BEFORE || loc == Location.AFTER) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; } } - return TruthValue.NO_NULL; - } else { - // are all of the values outside of the range? - for (Object arg : predicate.getLiteralList(PredicateLeaf.FileFormat.ORC)) { - predObj = getBaseObjectForComparison(arg, minValue); - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.MIN || loc == Location.MIDDLE || - loc == Location.MAX) { - return TruthValue.YES_NO_NULL; + case LESS_THAN: + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.AFTER) { + return TruthValue.YES_NULL; + } else if (loc == Location.BEFORE || loc == Location.MIN) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; + } + case LESS_THAN_EQUALS: + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.AFTER || loc == Location.MAX) { + return TruthValue.YES_NULL; + } else if (loc == Location.BEFORE) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; + } + case IN: + if (minValue.equals(maxValue)) { + // for a single value, look through to see if that value is in the + // set + for (Object arg : predicate.getLiteralList(PredicateLeaf.FileFormat.ORC)) { + predObj = getBaseObjectForComparison(arg, minValue); + + if (bloomFilter != null) { + if (checkInBloomFilter(bloomFilter, predObj) != TruthValue.NO_NULL) { + return TruthValue.YES_NO_NULL; + } + } else { + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.MIN) { + return TruthValue.YES_NULL; + } + } } + return TruthValue.NO_NULL; + } else { + // are all of the values outside of the range? + for (Object arg : predicate.getLiteralList(PredicateLeaf.FileFormat.ORC)) { + predObj = getBaseObjectForComparison(arg, minValue); + + if (bloomFilter != null) { + if (checkInBloomFilter(bloomFilter, predObj) != TruthValue.NO_NULL) { + return TruthValue.YES_NO_NULL; + } + } else { + loc = compareToRange((Comparable) predObj, minValue, maxValue); + if (loc == Location.MIN || loc == Location.MIDDLE || + loc == Location.MAX) { + return TruthValue.YES_NO_NULL; + } + } + } + return TruthValue.NO_NULL; } - return TruthValue.NO_NULL; - } - case BETWEEN: - List args = predicate.getLiteralList(PredicateLeaf.FileFormat.ORC); - Object predObj1 = getBaseObjectForComparison(args.get(0), minValue); + case BETWEEN: + List args = predicate.getLiteralList(PredicateLeaf.FileFormat.ORC); + Object predObj1 = getBaseObjectForComparison(args.get(0), minValue); - loc = compareToRange((Comparable) predObj1, minValue, maxValue); - if (loc == Location.BEFORE || loc == Location.MIN) { - Object predObj2 = getBaseObjectForComparison(args.get(1), minValue); + loc = compareToRange((Comparable) predObj1, minValue, maxValue); + if (loc == Location.BEFORE || loc == Location.MIN) { + Object predObj2 = getBaseObjectForComparison(args.get(1), minValue); - Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue); - if (loc2 == Location.AFTER || loc2 == Location.MAX) { - return TruthValue.YES_NULL; - } else if (loc2 == Location.BEFORE) { + Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue); + if (loc2 == Location.AFTER || loc2 == Location.MAX) { + return TruthValue.YES_NULL; + } else if (loc2 == Location.BEFORE) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; + } + } else if (loc == Location.AFTER) { return TruthValue.NO_NULL; } else { return TruthValue.YES_NO_NULL; } - } else if (loc == Location.AFTER) { - return TruthValue.NO_NULL; - } else { + case IS_NULL: + return TruthValue.YES_NO; + default: return TruthValue.YES_NO_NULL; - } - case IS_NULL: - return TruthValue.YES_NO; - default: - return TruthValue.YES_NO_NULL; } // in case failed conversion, return the default YES_NO_NULL truth value @@ -2486,6 +2506,43 @@ static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, } } + private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj) { + if (predObj instanceof Long) { + if (bf.testLong(((Long) predObj).longValue())) { + return TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof Double) { + if (bf.testDouble(((Double) predObj).doubleValue())) { + return TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof String || predObj instanceof Text || + predObj instanceof HiveDecimal || predObj instanceof BigDecimal) { + if (bf.testString(predObj.toString())) { + return TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof Date) { + if (bf.testLong(DateWritable.dateToDays((Date) predObj))) { + return TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof DateWritable) { + if (bf.testLong(((DateWritable) predObj).getDays())) { + return TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof Timestamp) { + if (bf.testLong(((Timestamp) predObj).getTime())) { + return TruthValue.YES_NO_NULL; + } + } else if (predObj instanceof TimestampWritable) { + if (bf.testLong(((TimestampWritable) predObj).getTimestamp().getTime())) { + return TruthValue.YES_NO_NULL; + } + } else { + return TruthValue.YES_NO_NULL; + } + + return TruthValue.NO_NULL; + } + private static Object getBaseObjectForComparison(Object predObj, Object statsObj) { if (predObj != null) { if (predObj instanceof ExprNodeConstantDesc) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 24da301..e76c168 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -76,7 +76,9 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -145,23 +147,27 @@ private final OrcFile.WriterContext callbackContext; private final OrcFile.EncodingStrategy encodingStrategy; private final OrcFile.CompressionStrategy compressionStrategy; + private final boolean[] bloomFilterColumns; + private final double bloomFilterFpp; WriterImpl(FileSystem fs, - Path path, - Configuration conf, - ObjectInspector inspector, - long stripeSize, - CompressionKind compress, - int bufferSize, - int rowIndexStride, - MemoryManager memoryManager, - boolean addBlockPadding, - OrcFile.Version version, - OrcFile.WriterCallback callback, - OrcFile.EncodingStrategy encodingStrategy, - CompressionStrategy compressionStrategy, - float paddingTolerance, - long blockSizeValue) throws IOException { + Path path, + Configuration conf, + ObjectInspector inspector, + long stripeSize, + CompressionKind compress, + int bufferSize, + int rowIndexStride, + MemoryManager memoryManager, + boolean addBlockPadding, + OrcFile.Version version, + OrcFile.WriterCallback callback, + EncodingStrategy encodingStrategy, + CompressionStrategy compressionStrategy, + float paddingTolerance, + long blockSizeValue, + String bloomFilterColumnNames, + double bloomFilterFpp) throws IOException { this.fs = fs; this.path = path; this.conf = conf; @@ -191,16 +197,159 @@ public Writer getWriter() { buildIndex = rowIndexStride > 0; codec = createCodec(compress); this.bufferSize = getEstimatedBufferSize(bufferSize); + int numFlattenedCols = getTotalFlattenedColumns(inspector); + this.bloomFilterColumns = new boolean[numFlattenedCols]; + this.bloomFilterFpp = bloomFilterFpp; + updateColumnsForBloomFilter(inspector, bloomFilterColumnNames); treeWriter = createTreeWriter(inspector, streamFactory, false); if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { throw new IllegalArgumentException("Row stride must be at least " + MIN_ROW_INDEX_STRIDE); } + assert numFlattenedCols == streamFactory.getCurrentColumnId() : "Column counts doesn't match"; + // ensure that we are able to handle callbacks before we register ourselves memoryManager.addWriter(path, stripeSize, this); } + private void updateColumnsForBloomFilter(ObjectInspector inspector, String bloomFilterColumnNames) { + // This is weird. Since ORC gets the column names from Hive ObjectInspectors we need a map + // between actual columns vs field names from ObjectInspectors. Also we need another map between + // field names and column span (internal column id range after flattening different types). + // colToFieldMap: Column name to field name + // fieldToColSpan: Field name to column span + // Example: For a row with schema struct:st> there + // are 6 columns in ORC after flattening. col_0 for the root (outer) struct, col_1 for int:a, + // col_2 for long:b, col_3 for inner struct, col_4 and col_5 for fields c,d of inner struct. + // colToFieldMap will be { a => col_1, b => col_2, st => col_3 } + // fieldToColSpan will be { col_1 => [1,1], col_2 => [2,2], col_3 => [3, 5] } + // what this really means is column name 'st' corresponds to ObjectInspector field name 'col_3' + // whose column id's span 3 to 5 (both inclusive) + // NOTE: column id 0 is for root struct + + Map colToFieldMap = new HashMap(); + Map> fieldToColSpan = new HashMap>(); + String colsStr = streamFactory.getConfiguration().get(IOConstants.COLUMNS); + if (colsStr != null) { + String[] columns = colsStr.split(","); + int startColIdx = 0; + int endColIdx = 0; + if (inspector instanceof StructObjectInspector) { + StructObjectInspector soi = (StructObjectInspector) inspector; + List fields = soi.getAllStructFieldRefs(); + for (int i = 0; i < fields.size(); i++) { + StructField sf = fields.get(i); + ObjectInspector sfOI = sf.getFieldObjectInspector(); + String colName = columns[i]; + String fieldName = sf.getFieldName(); + colToFieldMap.put(colName, fieldName); + + startColIdx = endColIdx + 1; + switch (sfOI.getCategory()) { + case PRIMITIVE: + endColIdx += 1; + break; + case STRUCT: + endColIdx += 1; + StructObjectInspector structInsp = (StructObjectInspector) sfOI; + List structFields = structInsp.getAllStructFieldRefs(); + for (int j = 0; j < structFields.size(); ++j) { + endColIdx += getTotalFlattenedColumns( + structFields.get(j).getFieldObjectInspector()); + } + break; + case MAP: + endColIdx += 1; + MapObjectInspector mapInsp = (MapObjectInspector) sfOI; + endColIdx += getTotalFlattenedColumns(mapInsp.getMapKeyObjectInspector()); + endColIdx += getTotalFlattenedColumns(mapInsp.getMapValueObjectInspector()); + break; + case LIST: + endColIdx += 1; + ListObjectInspector listInsp = (ListObjectInspector) sfOI; + endColIdx += getTotalFlattenedColumns(listInsp.getListElementObjectInspector()); + break; + case UNION: + endColIdx += 1; + UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI; + List choices = unionInsp.getObjectInspectors(); + for (int j = 0; j < choices.size(); ++j) { + endColIdx += getTotalFlattenedColumns(choices.get(j)); + } + break; + default: + throw new IllegalArgumentException("Bad category: " + + inspector.getCategory()); + } + + fieldToColSpan.put(fieldName, Lists.newArrayList(startColIdx, endColIdx)); + } + } + + LOG.info("ORC-BloomFilter: colToFieldMap: " + colToFieldMap); + LOG.info("ORC-BloomFilter: fieldToColSpan: " + fieldToColSpan); + + String[] bfColumns = bloomFilterColumnNames.split(","); + // We don't validate if the bloom filter columns are actually present in the column list or not. + // Validation should have happened much earlier when creating the table or altering the table + // with new tblproperties. + for (String bfCol : bfColumns) { + if (colToFieldMap.containsKey(bfCol)) { + String fieldName = colToFieldMap.get(bfCol); + List colSpan = fieldToColSpan.get(fieldName); + int start = colSpan.get(0); + int end = colSpan.get(1); + for (int i = start; i <= end; i++) { + bloomFilterColumns[i] = true; + } + } + } + + LOG.info("ORC-BloomFilter: bloomFilterColumns: " + Arrays.asList(bloomFilterColumns)); + } + } + + private int getTotalFlattenedColumns(ObjectInspector inspector) { + int numWriters = 0; + switch (inspector.getCategory()) { + case PRIMITIVE: + numWriters += 1; + break; + case STRUCT: + numWriters += 1; + StructObjectInspector structInsp = (StructObjectInspector) inspector; + List fields = structInsp.getAllStructFieldRefs(); + for (int i = 0; i < fields.size(); ++i) { + numWriters += getTotalFlattenedColumns(fields.get(i).getFieldObjectInspector()); + } + break; + case MAP: + numWriters += 1; + MapObjectInspector mapInsp = (MapObjectInspector) inspector; + numWriters += getTotalFlattenedColumns(mapInsp.getMapKeyObjectInspector()); + numWriters += getTotalFlattenedColumns(mapInsp.getMapValueObjectInspector()); + break; + case LIST: + numWriters += 1; + ListObjectInspector listInsp = (ListObjectInspector) inspector; + numWriters += getTotalFlattenedColumns(listInsp.getListElementObjectInspector()); + break; + case UNION: + numWriters += 1; + UnionObjectInspector unionInsp = (UnionObjectInspector) inspector; + List choices = unionInsp.getObjectInspectors(); + for (int i = 0; i < choices.size(); ++i) { + numWriters += getTotalFlattenedColumns(choices.get(i)); + } + break; + default: + throw new IllegalArgumentException("Bad category: " + + inspector.getCategory()); + } + return numWriters; + } + int getEstimatedBufferSize(int bs) { String colNames = conf.get(IOConstants.COLUMNS); long availableMem = getMemoryAvailableForORC(); @@ -499,6 +648,15 @@ public int getNextColumnId() { } /** + * Get the current column id. After creating all tree writers this count should tell how many + * columns (including columns within nested complex objects) are created in total. + * @return current column id + */ + public int getCurrentColumnId() { + return columnCount; + } + + /** * Get the stride rate of the row index. */ public int getRowIndexStride() { @@ -538,6 +696,22 @@ public CompressionStrategy getCompressionStrategy() { } /** + * Get the bloom filter columns + * @return bloom filter columns + */ + public boolean[] getBloomFilterColumns() { + return bloomFilterColumns; + } + + /** + * Get bloom filter false positive percentage. + * @return fpp + */ + public double getBloomFilterFPP() { + return bloomFilterFpp; + } + + /** * Get the writer's configuration. * @return configuration */ @@ -598,9 +772,13 @@ public Configuration getConfiguration() { isPresent = null; } this.foundNulls = false; - indexStatistics = ColumnStatisticsImpl.create(inspector); - stripeColStatistics = ColumnStatisticsImpl.create(inspector); - fileStatistics = ColumnStatisticsImpl.create(inspector); + boolean createBloomFilter = streamFactory.getBloomFilterColumns()[columnId]; + indexStatistics = ColumnStatisticsImpl.create(inspector, createBloomFilter, + streamFactory.getRowIndexStride(), streamFactory.getBloomFilterFPP()); + stripeColStatistics = ColumnStatisticsImpl.create(inspector, createBloomFilter, + streamFactory.getRowIndexStride(), streamFactory.getBloomFilterFPP()); + fileStatistics = ColumnStatisticsImpl.create(inspector, createBloomFilter, + streamFactory.getRowIndexStride(), streamFactory.getBloomFilterFPP()); childrenWriters = new TreeWriter[0]; rowIndex = OrcProto.RowIndex.newBuilder(); rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); @@ -1097,6 +1275,8 @@ void write(Object obj) throws IOException { directLengthOutput.write(val.getLength()); } indexStatistics.updateString(val); + } else { + indexStatistics.updateString(null); } } @@ -1358,6 +1538,8 @@ void write(Object obj) throws IOException { stream.write(val.getBytes(), 0, val.getLength()); length.write(val.getLength()); indexStatistics.updateBinary(val); + } else { + indexStatistics.updateBinary(null); } } @@ -1420,6 +1602,8 @@ void write(Object obj) throws IOException { indexStatistics.updateTimestamp(val); seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP); nanos.write(formatNanos(val.getNanos())); + } else { + indexStatistics.updateTimestamp(null); } } @@ -1480,6 +1664,8 @@ void write(Object obj) throws IOException { DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj); indexStatistics.updateDate(val); writer.write(val.getDays()); + } else { + indexStatistics.updateDate(null); } } @@ -1548,6 +1734,8 @@ void write(Object obj) throws IOException { decimal.unscaledValue()); scaleStream.write(decimal.scale()); indexStatistics.updateDecimal(decimal); + } else { + indexStatistics.updateDecimal(null); } } diff --git ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto index cbfe57b..dbd2a0d 100644 --- ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto +++ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto @@ -74,6 +74,8 @@ message ColumnStatistics { optional DateStatistics dateStatistics = 7; optional BinaryStatistics binaryStatistics = 8; optional TimestampStatistics timestampStatistics = 9; + // bloom filter represented as long array + repeated uint64 bloomFilter = 10; } message RowIndexEntry { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/filters/TestBloomFilter.java ql/src/test/org/apache/hadoop/hive/ql/io/filters/TestBloomFilter.java new file mode 100644 index 0000000..df22064 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/filters/TestBloomFilter.java @@ -0,0 +1,417 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.filters; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import java.util.Random; + +/** + * + */ +public class TestBloomFilter { + private static final int COUNT = 100; + Random rand = new Random(123); + + @Test(expected = AssertionError.class) + public void testBloomIllegalArg1() { + BloomFilter bf = new BloomFilter(0, 0); + } + + @Test(expected = AssertionError.class) + public void testBloomIllegalArg2() { + BloomFilter bf = new BloomFilter(0, 0.1); + } + + @Test(expected = AssertionError.class) + public void testBloomIllegalArg3() { + BloomFilter bf = new BloomFilter(1, 0.0); + } + + @Test(expected = AssertionError.class) + public void testBloomIllegalArg4() { + BloomFilter bf = new BloomFilter(1, 1.0); + } + + @Test(expected = AssertionError.class) + public void testBloomIllegalArg5() { + BloomFilter bf = new BloomFilter(-1, -1); + } + + + @Test + public void testBloomNumBits() { + assertEquals(0, BloomFilter.optimalNumOfBits(0, 0)); + assertEquals(1549, BloomFilter.optimalNumOfBits(1, 0)); + assertEquals(0, BloomFilter.optimalNumOfBits(0, 1)); + assertEquals(0, BloomFilter.optimalNumOfBits(1, 1)); + assertEquals(7, BloomFilter.optimalNumOfBits(1, 0.03)); + assertEquals(72, BloomFilter.optimalNumOfBits(10, 0.03)); + assertEquals(729, BloomFilter.optimalNumOfBits(100, 0.03)); + assertEquals(7298, BloomFilter.optimalNumOfBits(1000, 0.03)); + assertEquals(72984, BloomFilter.optimalNumOfBits(10000, 0.03)); + assertEquals(729844, BloomFilter.optimalNumOfBits(100000, 0.03)); + assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03)); + assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05)); + } + + @Test + public void testBloomNumHashFunctions() { + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(-1, -1)); + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(0, 0)); + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 0)); + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 10)); + assertEquals(7, BloomFilter.optimalNumOfHashFunctions(10, 100)); + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100, 100)); + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000, 100)); + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10000, 100)); + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100000, 100)); + assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000000, 100)); + } + + @Test + public void testBloomFilterBytes() { + BloomFilter bf = new BloomFilter(10000); + byte[] val = new byte[]{1, 2, 3}; + byte[] val1 = new byte[]{1, 2, 3, 4}; + byte[] val2 = new byte[]{1, 2, 3, 4, 5}; + byte[] val3 = new byte[]{1, 2, 3, 4, 5, 6}; + + assertEquals(false, bf.test(val)); + assertEquals(false, bf.test(val1)); + assertEquals(false, bf.test(val2)); + assertEquals(false, bf.test(val3)); + bf.add(val); + assertEquals(true, bf.test(val)); + assertEquals(false, bf.test(val1)); + assertEquals(false, bf.test(val2)); + assertEquals(false, bf.test(val3)); + bf.add(val1); + assertEquals(true, bf.test(val)); + assertEquals(true, bf.test(val1)); + assertEquals(false, bf.test(val2)); + assertEquals(false, bf.test(val3)); + bf.add(val2); + assertEquals(true, bf.test(val)); + assertEquals(true, bf.test(val1)); + assertEquals(true, bf.test(val2)); + assertEquals(false, bf.test(val3)); + bf.add(val3); + assertEquals(true, bf.test(val)); + assertEquals(true, bf.test(val1)); + assertEquals(true, bf.test(val2)); + assertEquals(true, bf.test(val3)); + + byte[] randVal = new byte[COUNT]; + for (int i = 0; i < COUNT; i++) { + rand.nextBytes(randVal); + bf.addBytes(randVal); + } + // last value should be present + assertEquals(true, bf.testBytes(randVal)); + // most likely this value should not exist + randVal[0] = 0; + randVal[1] = 0; + randVal[2] = 0; + randVal[3] = 0; + randVal[4] = 0; + assertEquals(false, bf.testBytes(randVal)); + + assertEquals(7800, bf.sizeInBytes()); + } + + @Test + public void testBloomFilterByte() { + BloomFilter bf = new BloomFilter(10000); + byte val = Byte.MIN_VALUE; + byte val1 = 1; + byte val2 = 2; + byte val3 = Byte.MAX_VALUE; + + assertEquals(false, bf.testLong(val)); + assertEquals(false, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val); + assertEquals(true, bf.testLong(val)); + assertEquals(false, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val1); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val2); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(true, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val3); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(true, bf.testLong(val2)); + assertEquals(true, bf.testLong(val3)); + + byte randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = (byte) rand.nextInt(Byte.MAX_VALUE); + bf.addLong(randVal); + } + // last value should be present + assertEquals(true, bf.testLong(randVal)); + // most likely this value should not exist + assertEquals(false, bf.testLong((byte) -120)); + + assertEquals(7800, bf.sizeInBytes()); + } + + @Test + public void testBloomFilterInt() { + BloomFilter bf = new BloomFilter(10000); + int val = Integer.MIN_VALUE; + int val1 = 1; + int val2 = 2; + int val3 = Integer.MAX_VALUE; + + assertEquals(false, bf.testLong(val)); + assertEquals(false, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val); + assertEquals(true, bf.testLong(val)); + assertEquals(false, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val1); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val2); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(true, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val3); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(true, bf.testLong(val2)); + assertEquals(true, bf.testLong(val3)); + + int randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextInt(); + bf.addLong(randVal); + } + // last value should be present + assertEquals(true, bf.testLong(randVal)); + // most likely this value should not exist + assertEquals(false, bf.testLong(-120)); + + assertEquals(7800, bf.sizeInBytes()); + } + + @Test + public void testBloomFilterLong() { + BloomFilter bf = new BloomFilter(10000); + long val = Long.MIN_VALUE; + long val1 = 1; + long val2 = 2; + long val3 = Long.MAX_VALUE; + + assertEquals(false, bf.testLong(val)); + assertEquals(false, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val); + assertEquals(true, bf.testLong(val)); + assertEquals(false, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val1); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(false, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val2); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(true, bf.testLong(val2)); + assertEquals(false, bf.testLong(val3)); + bf.addLong(val3); + assertEquals(true, bf.testLong(val)); + assertEquals(true, bf.testLong(val1)); + assertEquals(true, bf.testLong(val2)); + assertEquals(true, bf.testLong(val3)); + + long randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextLong(); + bf.addLong(randVal); + } + // last value should be present + assertEquals(true, bf.testLong(randVal)); + // most likely this value should not exist + assertEquals(false, bf.testLong(-120)); + + assertEquals(7800, bf.sizeInBytes()); + } + + @Test + public void testBloomFilterFloat() { + BloomFilter bf = new BloomFilter(10000); + float val = Float.MIN_VALUE; + float val1 = 1.1f; + float val2 = 2.2f; + float val3 = Float.MAX_VALUE; + + assertEquals(false, bf.testDouble(val)); + assertEquals(false, bf.testDouble(val1)); + assertEquals(false, bf.testDouble(val2)); + assertEquals(false, bf.testDouble(val3)); + bf.addDouble(val); + assertEquals(true, bf.testDouble(val)); + assertEquals(false, bf.testDouble(val1)); + assertEquals(false, bf.testDouble(val2)); + assertEquals(false, bf.testDouble(val3)); + bf.addDouble(val1); + assertEquals(true, bf.testDouble(val)); + assertEquals(true, bf.testDouble(val1)); + assertEquals(false, bf.testDouble(val2)); + assertEquals(false, bf.testDouble(val3)); + bf.addDouble(val2); + assertEquals(true, bf.testDouble(val)); + assertEquals(true, bf.testDouble(val1)); + assertEquals(true, bf.testDouble(val2)); + assertEquals(false, bf.testDouble(val3)); + bf.addDouble(val3); + assertEquals(true, bf.testDouble(val)); + assertEquals(true, bf.testDouble(val1)); + assertEquals(true, bf.testDouble(val2)); + assertEquals(true, bf.testDouble(val3)); + + float randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextFloat(); + bf.addDouble(randVal); + } + // last value should be present + assertEquals(true, bf.testDouble(randVal)); + // most likely this value should not exist + assertEquals(false, bf.testDouble(-120.2f)); + + assertEquals(7800, bf.sizeInBytes()); + } + + @Test + public void testBloomFilterDouble() { + BloomFilter bf = new BloomFilter(10000); + double val = Double.MIN_VALUE; + double val1 = 1.1d; + double val2 = 2.2d; + double val3 = Double.MAX_VALUE; + + assertEquals(false, bf.testDouble(val)); + assertEquals(false, bf.testDouble(val1)); + assertEquals(false, bf.testDouble(val2)); + assertEquals(false, bf.testDouble(val3)); + bf.addDouble(val); + assertEquals(true, bf.testDouble(val)); + assertEquals(false, bf.testDouble(val1)); + assertEquals(false, bf.testDouble(val2)); + assertEquals(false, bf.testDouble(val3)); + bf.addDouble(val1); + assertEquals(true, bf.testDouble(val)); + assertEquals(true, bf.testDouble(val1)); + assertEquals(false, bf.testDouble(val2)); + assertEquals(false, bf.testDouble(val3)); + bf.addDouble(val2); + assertEquals(true, bf.testDouble(val)); + assertEquals(true, bf.testDouble(val1)); + assertEquals(true, bf.testDouble(val2)); + assertEquals(false, bf.testDouble(val3)); + bf.addDouble(val3); + assertEquals(true, bf.testDouble(val)); + assertEquals(true, bf.testDouble(val1)); + assertEquals(true, bf.testDouble(val2)); + assertEquals(true, bf.testDouble(val3)); + + double randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextDouble(); + bf.addDouble(randVal); + } + // last value should be present + assertEquals(true, bf.testDouble(randVal)); + // most likely this value should not exist + assertEquals(false, bf.testDouble(-120.2d)); + + assertEquals(7800, bf.sizeInBytes()); + } + + @Test + public void testBloomFilterString() { + BloomFilter bf = new BloomFilter(100000); + String val = "bloo"; + String val1 = "bloom fil"; + String val2 = "bloom filter"; + String val3 = "cuckoo filter"; + + assertEquals(false, bf.testString(val)); + assertEquals(false, bf.testString(val1)); + assertEquals(false, bf.testString(val2)); + assertEquals(false, bf.testString(val3)); + bf.addString(val); + assertEquals(true, bf.testString(val)); + assertEquals(false, bf.testString(val1)); + assertEquals(false, bf.testString(val2)); + assertEquals(false, bf.testString(val3)); + bf.addString(val1); + assertEquals(true, bf.testString(val)); + assertEquals(true, bf.testString(val1)); + assertEquals(false, bf.testString(val2)); + assertEquals(false, bf.testString(val3)); + bf.addString(val2); + assertEquals(true, bf.testString(val)); + assertEquals(true, bf.testString(val1)); + assertEquals(true, bf.testString(val2)); + assertEquals(false, bf.testString(val3)); + bf.addString(val3); + assertEquals(true, bf.testString(val)); + assertEquals(true, bf.testString(val1)); + assertEquals(true, bf.testString(val2)); + assertEquals(true, bf.testString(val3)); + + long randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextLong(); + bf.addString(Long.toString(randVal)); + } + // last value should be present + assertEquals(true, bf.testString(Long.toString(randVal))); + // most likely this value should not exist + assertEquals(false, bf.testString(Long.toString(-120))); + + assertEquals(77944, bf.sizeInBytes()); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/filters/TestMurmur3.java ql/src/test/org/apache/hadoop/hive/ql/io/filters/TestMurmur3.java new file mode 100644 index 0000000..d92a3ce --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/io/filters/TestMurmur3.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.filters; + +import static org.junit.Assert.assertEquals; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; + +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Random; + +/** + * Tests for Murmur3 variants. + */ +public class TestMurmur3 { + + @Test + public void testHashCodesM3_32_string() { + String key = "test"; + int seed = 123; + HashFunction hf = Hashing.murmur3_32(seed); + int hc1 = hf.hashBytes(key.getBytes()).asInt(); + int hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed); + assertEquals(hc1, hc2); + + key = "testkey"; + hc1 = hf.hashBytes(key.getBytes()).asInt(); + hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed); + assertEquals(hc1, hc2); + } + + @Test + public void testHashCodesM3_32_ints() { + int seed = 123; + Random rand = new Random(seed); + HashFunction hf = Hashing.murmur3_32(seed); + for (int i = 0; i < 1000; i++) { + int val = rand.nextInt(); + byte[] data = ByteBuffer.allocate(4).putInt(val).array(); + int hc1 = hf.hashBytes(data).asInt(); + int hc2 = Murmur3.hash32(data, data.length, seed); + assertEquals(hc1, hc2); + } + } + + @Test + public void testHashCodesM3_32_longs() { + int seed = 123; + Random rand = new Random(seed); + HashFunction hf = Hashing.murmur3_32(seed); + for (int i = 0; i < 1000; i++) { + long val = rand.nextLong(); + byte[] data = ByteBuffer.allocate(8).putLong(val).array(); + int hc1 = hf.hashBytes(data).asInt(); + int hc2 = Murmur3.hash32(data, data.length, seed); + assertEquals(hc1, hc2); + } + } + + @Test + public void testHashCodesM3_32_double() { + int seed = 123; + Random rand = new Random(seed); + HashFunction hf = Hashing.murmur3_32(seed); + for (int i = 0; i < 1000; i++) { + double val = rand.nextDouble(); + byte[] data = ByteBuffer.allocate(8).putDouble(val).array(); + int hc1 = hf.hashBytes(data).asInt(); + int hc2 = Murmur3.hash32(data, data.length, seed); + assertEquals(hc1, hc2); + } + } + + @Test + public void testHashCodesM3_128_string() { + String key = "test"; + int seed = 123; + HashFunction hf = Hashing.murmur3_128(seed); + // guava stores the hashcodes in little endian order + ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN); + buf.put(hf.hashBytes(key.getBytes()).asBytes()); + buf.flip(); + long gl1 = buf.getLong(); + long gl2 = buf.getLong(8); + long[] hc = Murmur3.hash128(key.getBytes(), key.getBytes().length, seed); + long m1 = hc[0]; + long m2 = hc[1]; + assertEquals(gl1, m1); + assertEquals(gl2, m2); + + key = "testkey128_testkey128"; + buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN); + buf.put(hf.hashBytes(key.getBytes()).asBytes()); + buf.flip(); + gl1 = buf.getLong(); + gl2 = buf.getLong(8); + hc = Murmur3.hash128(key.getBytes(), key.getBytes().length, seed); + m1 = hc[0]; + m2 = hc[1]; + assertEquals(gl1, m1); + assertEquals(gl2, m2); + } + + @Test + public void testHashCodesM3_128_ints() { + int seed = 123; + Random rand = new Random(seed); + HashFunction hf = Hashing.murmur3_128(seed); + for (int i = 0; i < 1000; i++) { + int val = rand.nextInt(); + byte[] data = ByteBuffer.allocate(4).putInt(val).array(); + // guava stores the hashcodes in little endian order + ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN); + buf.put(hf.hashBytes(data).asBytes()); + buf.flip(); + long gl1 = buf.getLong(); + long gl2 = buf.getLong(8); + long[] hc = Murmur3.hash128(data, data.length, seed); + long m1 = hc[0]; + long m2 = hc[1]; + assertEquals(gl1, m1); + assertEquals(gl2, m2); + } + } + + @Test + public void testHashCodesM3_128_longs() { + int seed = 123; + Random rand = new Random(seed); + HashFunction hf = Hashing.murmur3_128(seed); + for (int i = 0; i < 1000; i++) { + long val = rand.nextLong(); + byte[] data = ByteBuffer.allocate(8).putLong(val).array(); + // guava stores the hashcodes in little endian order + ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN); + buf.put(hf.hashBytes(data).asBytes()); + buf.flip(); + long gl1 = buf.getLong(); + long gl2 = buf.getLong(8); + long[] hc = Murmur3.hash128(data, data.length, seed); + long m1 = hc[0]; + long m2 = hc[1]; + assertEquals(gl1, m1); + assertEquals(gl2, m2); + } + } + + @Test + public void testHashCodesM3_128_double() { + int seed = 123; + Random rand = new Random(seed); + HashFunction hf = Hashing.murmur3_128(seed); + for (int i = 0; i < 1000; i++) { + double val = rand.nextDouble(); + byte[] data = ByteBuffer.allocate(8).putDouble(val).array(); + // guava stores the hashcodes in little endian order + ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN); + buf.put(hf.hashBytes(data).asBytes()); + buf.flip(); + long gl1 = buf.getLong(); + long gl2 = buf.getLong(8); + long[] hc = Murmur3.hash128(data, data.length, seed); + long m1 = hc[0]; + long m2 = hc[1]; + assertEquals(gl1, m1); + assertEquals(gl2, m2); + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java index dbd38c8..d2d06ed 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java @@ -18,22 +18,69 @@ package org.apache.hadoop.hive.ql.io.orc; +import static junit.framework.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.filters.BloomFilter; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import java.io.File; +import java.math.BigDecimal; import java.sql.Timestamp; - -import static junit.framework.Assert.assertEquals; +import java.util.List; +import java.util.Random; /** * Test ColumnStatisticsImpl for ORC. */ public class TestColumnStatistics { + public static class SimpleStruct { + BytesWritable bytes1; + Text string1; + + SimpleStruct(BytesWritable b1, String s1) { + this.bytes1 = b1; + if(s1 == null) { + this.string1 = null; + } else { + this.string1 = new Text(s1); + } + } + } + + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + FileSystem fs; + Path testFilePath; + Configuration conf; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem () throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestOrcFile." + + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + @Test public void testLongMerge() throws Exception { ObjectInspector inspector = @@ -173,4 +220,269 @@ public void testDecimalMerge() throws Exception { assertEquals(-10, typed.getMinimum().longValue()); assertEquals(10000, typed.getMaximum().longValue()); } + + @Test + public void testBloomFilter() throws Exception { + ObjectInspector inspector = PrimitiveObjectInspectorFactory.javaStringObjectInspector; + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + stats1.updateString(new Text("hello")); + stats1.updateString(new Text("world")); + stats1.updateString(new Text("hive")); + stats1.updateString(null); + ColumnStatistics cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter().testString("hello")); + assertEquals(true, cstat1.getBloomFilter().testString("world")); + assertEquals(true, cstat1.getBloomFilter().testString("hive")); + assertEquals(true, cstat1.getBloomFilter().testString(null)); + assertEquals(false, cstat1.getBloomFilter().testString(" ")); + assertEquals(false, cstat1.getBloomFilter().testString("apache")); + + inspector = PrimitiveObjectInspectorFactory.javaFloatObjectInspector; + stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + float[] floats = new float[10000]; + Random rand = new Random(123); + for (int i = 0; i < floats.length; i++) { + floats[i] = rand.nextFloat(); + stats1.updateDouble(floats[i]); + } + cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter().testDouble(floats[2])); + assertEquals(true, cstat1.getBloomFilter().testDouble(floats[20])); + assertEquals(true, cstat1.getBloomFilter().testDouble(floats[200])); + assertEquals(true, cstat1.getBloomFilter().testDouble(floats[2000])); + assertEquals(true, cstat1.getBloomFilter().testDouble(floats[9999])); + assertEquals(false, cstat1.getBloomFilter().testDouble(0.0)); + assertEquals(false, cstat1.getBloomFilter().testDouble(1.0)); + assertEquals(false, cstat1.getBloomFilter().testDouble(-1.0)); + + inspector = PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; + stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + double[] doubles = new double[10000]; + for (int i = 0; i < doubles.length; i++) { + doubles[i] = rand.nextDouble(); + stats1.updateDouble(doubles[i]); + } + cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter().testDouble(doubles[2])); + assertEquals(true, cstat1.getBloomFilter().testDouble(doubles[20])); + assertEquals(true, cstat1.getBloomFilter().testDouble(doubles[200])); + assertEquals(true, cstat1.getBloomFilter().testDouble(doubles[2000])); + assertEquals(true, cstat1.getBloomFilter().testDouble(doubles[9999])); + assertEquals(false, cstat1.getBloomFilter().testDouble(0.0)); + assertEquals(false, cstat1.getBloomFilter().testDouble(1.0)); + assertEquals(false, cstat1.getBloomFilter().testDouble(-1.0)); + + inspector = PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector; + stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + for (int i = 0; i < doubles.length; i++) { + stats1.updateDecimal(HiveDecimal.create(new BigDecimal(doubles[i]))); + } + cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter() + .testString(HiveDecimal.create(new BigDecimal(doubles[2])).toString())); + assertEquals(true, cstat1.getBloomFilter() + .testString(HiveDecimal.create(new BigDecimal(doubles[200])).toString())); + assertEquals(true, cstat1.getBloomFilter() + .testString(HiveDecimal.create(new BigDecimal(doubles[2000])).toString())); + assertEquals(true, cstat1.getBloomFilter() + .testString(HiveDecimal.create(new BigDecimal(doubles[9999])).toString())); + assertEquals(false, + cstat1.getBloomFilter().testString(HiveDecimal.create(new BigDecimal(0.0)).toString())); + assertEquals(false, + cstat1.getBloomFilter().testString(HiveDecimal.create(new BigDecimal(1.0)).toString())); + assertEquals(false, + cstat1.getBloomFilter().testString(HiveDecimal.create(new BigDecimal(-1.0)).toString())); + assertEquals(false, cstat1.getBloomFilter().testString("")); + assertEquals(false, cstat1.getBloomFilter().testString(null)); + + inspector = PrimitiveObjectInspectorFactory.javaIntObjectInspector; + stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + int[] ints = new int[10000]; + for (int i = 0; i < ints.length; i++) { + ints[i] = rand.nextInt(10000); + stats1.updateInteger(ints[i]); + } + cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[2])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[20])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[200])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[2000])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[9999])); + assertEquals(false, cstat1.getBloomFilter().testLong(-1)); + assertEquals(false, cstat1.getBloomFilter().testLong(10001)); + assertEquals(false, cstat1.getBloomFilter().testLong(Integer.MIN_VALUE)); + assertEquals(false, cstat1.getBloomFilter().testLong(Integer.MAX_VALUE)); + + inspector = PrimitiveObjectInspectorFactory.javaLongObjectInspector; + stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + long[] longs = new long[10000]; + for (int i = 0; i < longs.length; i++) { + longs[i] = rand.nextLong(); + stats1.updateInteger(longs[i]); + } + cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter().testLong(longs[2])); + assertEquals(true, cstat1.getBloomFilter().testLong(longs[20])); + assertEquals(true, cstat1.getBloomFilter().testLong(longs[200])); + assertEquals(true, cstat1.getBloomFilter().testLong(longs[2000])); + assertEquals(true, cstat1.getBloomFilter().testLong(longs[9999])); + assertEquals(false, cstat1.getBloomFilter().testLong(-1)); + assertEquals(false, cstat1.getBloomFilter().testLong(10001)); + assertEquals(false, cstat1.getBloomFilter().testLong(Long.MIN_VALUE)); + assertEquals(false, cstat1.getBloomFilter().testLong(Long.MAX_VALUE)); + + inspector = PrimitiveObjectInspectorFactory.javaDateObjectInspector; + stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + for (int i = 0; i < ints.length; i++) { + stats1.updateDate(new DateWritable(ints[i])); + } + cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[2])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[20])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[200])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[2000])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[9999])); + assertEquals(false, cstat1.getBloomFilter().testLong(-1)); + assertEquals(false, cstat1.getBloomFilter().testLong(10001)); + assertEquals(false, cstat1.getBloomFilter().testLong(Long.MIN_VALUE)); + assertEquals(false, cstat1.getBloomFilter().testLong(Long.MAX_VALUE)); + + inspector = PrimitiveObjectInspectorFactory.javaTimestampObjectInspector; + stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + for (int i = 0; i < ints.length; i++) { + stats1.updateTimestamp(new Timestamp(ints[i])); + } + cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[2])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[20])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[200])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[2000])); + assertEquals(true, cstat1.getBloomFilter().testLong(ints[9999])); + assertEquals(false, cstat1.getBloomFilter().testLong(-1)); + assertEquals(false, cstat1.getBloomFilter().testLong(10001)); + assertEquals(false, cstat1.getBloomFilter().testLong(Long.MIN_VALUE)); + assertEquals(false, cstat1.getBloomFilter().testLong(Long.MAX_VALUE)); + + inspector = PrimitiveObjectInspectorFactory.javaHiveVarcharObjectInspector; + stats1 = ColumnStatisticsImpl.create(inspector, true, 10000, 0.05); + for (int i = 0; i < doubles.length; i++) { + stats1.updateString(new Text(new HiveVarchar(String.valueOf(doubles[i]), 255).toString())); + } + cstat1 = ColumnStatisticsImpl.deserialize(stats1.serialize().build()); + assertEquals(true, cstat1.getBloomFilter().testString(new Text(new HiveVarchar( + String.valueOf(doubles[2]), 255).toString()).toString())); + assertEquals(true, cstat1.getBloomFilter().testString(new Text(new HiveVarchar( + String.valueOf(doubles[20]), 255).toString()).toString())); + assertEquals(true, cstat1.getBloomFilter().testString(new Text(new HiveVarchar( + String.valueOf(doubles[200]), 255).toString()).toString())); + assertEquals(true, cstat1.getBloomFilter().testString(new Text(new HiveVarchar( + String.valueOf(doubles[9999]), 255).toString()).toString())); + assertEquals(false, cstat1.getBloomFilter().testString("0.0")); + assertEquals(false, cstat1.getBloomFilter().testString("1.1")); + assertEquals(false, cstat1.getBloomFilter().testString("-1.0")); + assertEquals(false, cstat1.getBloomFilter().testString("")); + assertEquals(false, cstat1.getBloomFilter().testString(" ")); + assertEquals(false, cstat1.getBloomFilter().testString(null)); + } + + private static BytesWritable bytes(int... items) { + BytesWritable result = new BytesWritable(); + result.setSize(items.length); + for(int i=0; i < items.length; ++i) { + result.getBytes()[i] = (byte) items[i]; + } + return result; + } + + @Test + public void testBloomFilterIndex() throws Exception { + + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (SimpleStruct.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + conf.set(IOConstants.COLUMNS, "bytes1,string1"); + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .inspector(inspector) + .stripeSize(100000) + .bufferSize(10000) + .bloomFilterColumns("string1")); + for (int i = 0; i < 5000; i++) { + writer.addRow(new SimpleStruct(bytes(0, 1, 2, 3, 4), "foo")); + } + for (int i = 0; i < 5000; i++) { + writer.addRow(new SimpleStruct(bytes(0, 1, 2, 3), String.valueOf(i))); + } + for (int i = 0; i < 5000; i++) { + writer.addRow(new SimpleStruct(bytes(0, 1, 2, 3, 4, 5), null)); + } + writer.close(); + Reader reader = OrcFile.createReader(testFilePath, + OrcFile.readerOptions(conf).filesystem(fs)); + + // check the stats + ColumnStatistics[] stats = reader.getStatistics(); + assertEquals(15000, stats[0].getNumberOfValues()); + assertEquals(15000, stats[1].getNumberOfValues()); + assertEquals(10000, stats[2].getNumberOfValues()); + + // file level statistics + BloomFilter bf = stats[2].getBloomFilter(); + assertEquals(true, bf.testString("foo")); + assertEquals(true, bf.testString("1")); + assertEquals(true, bf.testString("100")); + assertEquals(true, bf.testString("4999")); + assertEquals(true, bf.testString(null)); + assertEquals(false, bf.testString("bar")); + assertEquals(false, bf.testString("-1")); + assertEquals(false, bf.testString("100000")); + assertEquals(false, bf.testString("")); + assertEquals(false, bf.testString(" ")); + + // stripe 1 stats + List ss = reader.getMetadata().getStripeStatistics(); + ColumnStatistics csStripe1 = ss.get(0).getColumnStatistics()[2]; + bf = csStripe1.getBloomFilter(); + assertEquals(true, bf.testString("foo")); + assertEquals(false, bf.testString("1")); + assertEquals(false, bf.testString("100")); + assertEquals(false, bf.testString("1000")); + assertEquals(false, bf.testString(null)); + assertEquals(false, bf.testString("bar")); + assertEquals(false, bf.testString("-1")); + assertEquals(false, bf.testString("100000")); + assertEquals(false, bf.testString("")); + assertEquals(false, bf.testString(" ")); + + // stripe 2 stats + ColumnStatistics csStripe2 = ss.get(1).getColumnStatistics()[2]; + bf = csStripe2.getBloomFilter(); + assertEquals(false, bf.testString("foo")); + assertEquals(true, bf.testString("1")); + assertEquals(true, bf.testString("100")); + assertEquals(true, bf.testString("1000")); + assertEquals(false, bf.testString(null)); + assertEquals(false, bf.testString("bar")); + assertEquals(false, bf.testString("-1")); + assertEquals(false, bf.testString("100000")); + assertEquals(false, bf.testString("")); + assertEquals(false, bf.testString(" ")); + + // stripe 3 stats + ColumnStatistics csStripe3 = ss.get(2).getColumnStatistics()[2]; + bf = csStripe3.getBloomFilter(); + assertEquals(false, bf.testString("foo")); + assertEquals(false, bf.testString("1")); + assertEquals(false, bf.testString("100")); + assertEquals(false, bf.testString("1000")); + assertEquals(true, bf.testString(null)); + assertEquals(false, bf.testString("bar")); + assertEquals(false, bf.testString("-1")); + assertEquals(false, bf.testString("100000")); + assertEquals(false, bf.testString("")); + assertEquals(false, bf.testString(" ")); + } + }