diff --git a/core/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/core/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 6269e3e..8e3bd53 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/core/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -29,6 +29,8 @@ import java.util.Map; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; @@ -50,7 +52,8 @@ public class HColumnDescriptor implements WritableComparable // Version 5 was when bloom filter descriptors were removed. // Version 6 adds metadata as a map where keys and values are byte[]. // Version 7 -- add new compression and hfile blocksize to HColumnDescriptor (HBASE-1217) - private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)7; + // Version 8 -- reintroduction of bloom filters, changed from boolean to enum + private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)8; /** * The type of compression. @@ -113,7 +116,7 @@ public class HColumnDescriptor implements WritableComparable /** * Default setting for whether or not to use bloomfilters. */ - public static final boolean DEFAULT_BLOOMFILTER = false; + public static final String DEFAULT_BLOOMFILTER = StoreFile.BloomType.NONE.toString(); /** * Default time to live of cell contents. @@ -166,7 +169,7 @@ public class HColumnDescriptor implements WritableComparable this (familyName == null || familyName.length <= 0? HConstants.EMPTY_BYTE_ARRAY: familyName, DEFAULT_VERSIONS, DEFAULT_COMPRESSION, DEFAULT_IN_MEMORY, DEFAULT_BLOCKCACHE, - DEFAULT_TTL, false); + DEFAULT_TTL, DEFAULT_BLOOMFILTER); } /** @@ -195,7 +198,7 @@ public class HColumnDescriptor implements WritableComparable * @param blockCacheEnabled If true, MapFile blocks should be cached * @param timeToLive Time-to-live of cell contents, in seconds * (use HConstants.FOREVER for unlimited TTL) - * @param bloomFilter Enable the specified bloom filter for this column + * @param bloomFilter Bloom filter type for this column * * @throws IllegalArgumentException if passed a family name that is made of * other than 'word' characters: i.e. [a-zA-Z_0-9] or contains @@ -205,7 +208,7 @@ public class HColumnDescriptor implements WritableComparable public HColumnDescriptor(final byte [] familyName, final int maxVersions, final String compression, final boolean inMemory, final boolean blockCacheEnabled, - final int timeToLive, final boolean bloomFilter) { + final int timeToLive, final String bloomFilter) { this(familyName, maxVersions, compression, inMemory, blockCacheEnabled, DEFAULT_BLOCKSIZE, timeToLive, bloomFilter, DEFAULT_REPLICATION_SCOPE); } @@ -222,7 +225,7 @@ public class HColumnDescriptor implements WritableComparable * @param blocksize * @param timeToLive Time-to-live of cell contents, in seconds * (use HConstants.FOREVER for unlimited TTL) - * @param bloomFilter Enable the specified bloom filter for this column + * @param bloomFilter Bloom filter type for this column * @param scope The scope tag for this column * * @throws IllegalArgumentException if passed a family name that is made of @@ -233,7 +236,7 @@ public class HColumnDescriptor implements WritableComparable public HColumnDescriptor(final byte [] familyName, final int maxVersions, final String compression, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, - final int timeToLive, final boolean bloomFilter, final int scope) { + final int timeToLive, final String bloomFilter, final int scope) { isLegalFamilyName(familyName); this.name = familyName; @@ -248,7 +251,8 @@ public class HColumnDescriptor implements WritableComparable setTimeToLive(timeToLive); setCompressionType(Compression.Algorithm. valueOf(compression.toUpperCase())); - setBloomfilter(bloomFilter); + setBloomFilterType(StoreFile.BloomType. + valueOf(bloomFilter.toUpperCase())); setBlocksize(blocksize); setScope(scope); } @@ -464,20 +468,21 @@ public class HColumnDescriptor implements WritableComparable } /** - * @return true if a bloom filter is enabled + * @return bloom filter type used for new StoreFiles in ColumnFamily */ - public boolean isBloomfilter() { - String value = getValue(BLOOMFILTER); - if (value != null) - return Boolean.valueOf(value).booleanValue(); - return DEFAULT_BLOOMFILTER; + public StoreFile.BloomType getBloomFilterType() { + String n = getValue(BLOOMFILTER); + if (n == null) { + n = DEFAULT_BLOOMFILTER; + } + return StoreFile.BloomType.valueOf(n.toUpperCase()); } /** - * @param onOff Enable/Disable bloom filter + * @param toggle bloom filter type */ - public void setBloomfilter(final boolean onOff) { - setValue(BLOOMFILTER, Boolean.toString(onOff)); + public void setBloomFilterType(final StoreFile.BloomType bt) { + setValue(BLOOMFILTER, bt.toString()); } /** @@ -513,10 +518,6 @@ public class HColumnDescriptor implements WritableComparable values.entrySet()) { String key = Bytes.toString(e.getKey().get()); String value = Bytes.toString(e.getValue().get()); - if (key != null && key.toUpperCase().equals(BLOOMFILTER)) { - // Don't emit bloomfilter. Its not working. - continue; - } s.append(", "); s.append(key); s.append(" => '"); @@ -576,8 +577,8 @@ public class HColumnDescriptor implements WritableComparable int ordinal = in.readInt(); setCompressionType(Compression.Algorithm.values()[ordinal]); setInMemory(in.readBoolean()); - setBloomfilter(in.readBoolean()); - if (isBloomfilter() && version < 5) { + setBloomFilterType(in.readBoolean() ? BloomType.ROW : BloomType.NONE); + if (getBloomFilterType() != BloomType.NONE && version < 5) { // If a bloomFilter is enabled and the column descriptor is less than // version 5, we need to skip over it to read the rest of the column // descriptor. There are no BloomFilterDescriptors written to disk for @@ -593,7 +594,7 @@ public class HColumnDescriptor implements WritableComparable setTimeToLive(in.readInt()); } } else { - // version 7+ + // version 6+ this.name = Bytes.readByteArray(in); this.values.clear(); int numValues = in.readInt(); @@ -602,6 +603,15 @@ public class HColumnDescriptor implements WritableComparable ImmutableBytesWritable value = new ImmutableBytesWritable(); key.readFields(in); value.readFields(in); + + // in version 8, the BloomFilter setting changed from bool to enum + if (version < 8 && Bytes.toString(key.get()).equals(BLOOMFILTER)) { + value.set(Bytes.toBytes( + Boolean.getBoolean(Bytes.toString(value.get())) + ? BloomType.ROW.toString() + : BloomType.NONE.toString())); + } + values.put(key, value); } if (version == 6) { diff --git a/core/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/core/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index d0c220e..0d57270 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/core/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -33,6 +33,7 @@ import java.util.TreeMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; @@ -667,7 +668,8 @@ public class HTableDescriptor implements WritableComparable { new HColumnDescriptor[] { new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), true, true, 8 * 1024, - HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL) }); + HConstants.FOREVER, StoreFile.BloomType.NONE.toString(), + HConstants.REPLICATION_SCOPE_LOCAL) }); /** Table descriptor for .META. catalog table */ public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor( @@ -675,9 +677,11 @@ public class HTableDescriptor implements WritableComparable { new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), true, true, 8 * 1024, - HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL), + HConstants.FOREVER, StoreFile.BloomType.NONE.toString(), + HConstants.REPLICATION_SCOPE_LOCAL), new HColumnDescriptor(HConstants.CATALOG_HISTORIAN_FAMILY, HConstants.ALL_VERSIONS, Compression.Algorithm.NONE.getName(), false, false, 8 * 1024, - HConstants.WEEK_IN_SECONDS, false, HConstants.REPLICATION_SCOPE_LOCAL)}); + HConstants.WEEK_IN_SECONDS,StoreFile.BloomType.NONE.toString(), + HConstants.REPLICATION_SCOPE_LOCAL)}); } diff --git a/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 8aac19a..fc5494b 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -945,7 +945,7 @@ public class KeyValue implements Writable, HeapSize { System.arraycopy(this.bytes, o, result, 0, l); return result; } - + //--------------------------------------------------------------------------- // // KeyValue splitter @@ -1371,7 +1371,7 @@ public class KeyValue implements Writable, HeapSize { } /** - * Compares the row and column of two keyvalues + * Compares the row and column of two keyvalues for equality * @param left * @param right * @return True if same row and column. @@ -1380,10 +1380,10 @@ public class KeyValue implements Writable, HeapSize { final KeyValue right) { short lrowlength = left.getRowLength(); short rrowlength = right.getRowLength(); - if (!matchingRows(left, lrowlength, right, rrowlength)) { - return false; - } - return compareColumns(left, lrowlength, right, rrowlength) == 0; + // TsOffset = end of column data. just comparing Row+CF length of each + return left.getTimestampOffset() == right.getTimestampOffset() && + matchingRows(left, lrowlength, right, rrowlength) && + compareColumns(left, lrowlength, right, rrowlength) == 0; } /** @@ -1396,6 +1396,7 @@ public class KeyValue implements Writable, HeapSize { } /** + * Compares the row of two keyvalues for equality * @param left * @param right * @return True if rows match. @@ -1415,11 +1416,8 @@ public class KeyValue implements Writable, HeapSize { */ public boolean matchingRows(final KeyValue left, final short lrowlength, final KeyValue right, final short rrowlength) { - int compare = compareRows(left, lrowlength, right, rrowlength); - if (compare != 0) { - return false; - } - return true; + return lrowlength == rrowlength && + compareRows(left, lrowlength, right, rrowlength) == 0; } public boolean matchingRows(final byte [] left, final int loffset, diff --git a/core/src/main/java/org/apache/hadoop/hbase/io/HalfHFileReader.java b/core/src/main/java/org/apache/hadoop/hbase/io/HalfHFileReader.java deleted file mode 100644 index c657630..0000000 --- a/core/src/main/java/org/apache/hadoop/hbase/io/HalfHFileReader.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Copyright 2008 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * A facade for a {@link org.apache.hadoop.hbase.io.hfile.HFile.Reader} that serves up - * either the top or bottom half of a HFile where 'bottom' is the first half - * of the file containing the keys that sort lowest and 'top' is the second half - * of the file with keys that sort greater than those of the bottom half. - * The top includes the split files midkey, of the key that follows if it does - * not exist in the file. - * - *

This type works in tandem with the {@link Reference} type. This class - * is used reading while Reference is used writing. - * - *

This file is not splitable. Calls to {@link #midkey()} return null. - */ -public class HalfHFileReader extends HFile.Reader { - final Log LOG = LogFactory.getLog(HalfHFileReader.class); - final boolean top; - // This is the key we split around. Its the first possible entry on a row: - // i.e. empty column and a timestamp of LATEST_TIMESTAMP. - protected final byte [] splitkey; - - /** - * @param fs - * @param p - * @param c - * @param r - * @throws IOException - */ - public HalfHFileReader(final FileSystem fs, final Path p, final BlockCache c, - final Reference r) - throws IOException { - super(fs, p, c, false); - // This is not actual midkey for this half-file; its just border - // around which we split top and bottom. Have to look in files to find - // actual last and first keys for bottom and top halves. Half-files don't - // have an actual midkey themselves. No midkey is how we indicate file is - // not splittable. - this.splitkey = r.getSplitKey(); - // Is it top or bottom half? - this.top = Reference.isTopFileRegion(r.getFileRegion()); - } - - protected boolean isTop() { - return this.top; - } - - @Override - public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread) { - final HFileScanner s = super.getScanner(cacheBlocks, pread); - return new HFileScanner() { - final HFileScanner delegate = s; - public boolean atEnd = false; - - public ByteBuffer getKey() { - if (atEnd) return null; - return delegate.getKey(); - } - - public String getKeyString() { - if (atEnd) return null; - - return delegate.getKeyString(); - } - - public ByteBuffer getValue() { - if (atEnd) return null; - - return delegate.getValue(); - } - - public String getValueString() { - if (atEnd) return null; - - return delegate.getValueString(); - } - - public KeyValue getKeyValue() { - if (atEnd) return null; - - return delegate.getKeyValue(); - } - - public boolean next() throws IOException { - if (atEnd) return false; - - boolean b = delegate.next(); - if (!b) { - return b; - } - // constrain the bottom. - if (!top) { - ByteBuffer bb = getKey(); - if (getComparator().compare(bb.array(), bb.arrayOffset(), bb.limit(), - splitkey, 0, splitkey.length) >= 0) { - atEnd = true; - return false; - } - } - return true; - } - - public boolean seekBefore(byte[] key) throws IOException { - return seekBefore(key, 0, key.length); - } - - public boolean seekBefore(byte [] key, int offset, int length) - throws IOException { - if (top) { - if (getComparator().compare(key, offset, length, splitkey, 0, - splitkey.length) < 0) { - return false; - } - } else { - if (getComparator().compare(key, offset, length, splitkey, 0, - splitkey.length) >= 0) { - return seekBefore(splitkey, 0, splitkey.length); - } - } - return this.delegate.seekBefore(key, offset, length); - } - - public boolean seekTo() throws IOException { - if (top) { - int r = this.delegate.seekTo(splitkey); - if (r < 0) { - // midkey is < first key in file - return this.delegate.seekTo(); - } - if (r > 0) { - return this.delegate.next(); - } - return true; - } - - boolean b = delegate.seekTo(); - if (!b) { - return b; - } - // Check key. - ByteBuffer k = this.delegate.getKey(); - return this.delegate.getReader().getComparator(). - compare(k.array(), k.arrayOffset(), k.limit(), - splitkey, 0, splitkey.length) < 0; - } - - public int seekTo(byte[] key) throws IOException { - return seekTo(key, 0, key.length); - } - - public int seekTo(byte[] key, int offset, int length) throws IOException { - if (top) { - if (getComparator().compare(key, offset, length, splitkey, 0, - splitkey.length) < 0) { - return -1; - } - } else { - if (getComparator().compare(key, offset, length, splitkey, 0, - splitkey.length) >= 0) { - // we would place the scanner in the second half. - // it might be an error to return false here ever... - boolean res = delegate.seekBefore(splitkey, 0, splitkey.length); - if (!res) { - throw new IOException("Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)"); - } - return 1; - } - } - return delegate.seekTo(key, offset, length); - } - - public Reader getReader() { - return this.delegate.getReader(); - } - - public boolean isSeeked() { - return this.delegate.isSeeked(); - } - }; - } - - @Override - public byte[] getLastKey() { - if (top) { - return super.getLastKey(); - } - // Get a scanner that caches the block and that uses pread. - HFileScanner scanner = getScanner(true, true); - try { - if (scanner.seekBefore(this.splitkey)) { - return Bytes.toBytes(scanner.getKey()); - } - } catch (IOException e) { - LOG.warn("Failed seekBefore " + Bytes.toString(this.splitkey), e); - } - return null; - } - - @Override - public byte[] midkey() throws IOException { - // Returns null to indicate file is not splitable. - return null; - } -} diff --git a/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 4488ccc..3433811 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.SortedSet; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; @@ -55,6 +57,7 @@ import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; @@ -209,7 +212,7 @@ public class HFile { private long valuelength = 0; // Used to ensure we write in order. - private final RawComparator comparator; + private final RawComparator rawComparator; // A stream made per block written. private DataOutputStream out; @@ -239,7 +242,7 @@ public class HFile { // Meta block system. private ArrayList metaNames = new ArrayList(); - private ArrayList metaData = new ArrayList(); + private ArrayList metaData = new ArrayList(); // Used compression. Used even if no compression -- 'none'. private final Compression.Algorithm compressAlgo; @@ -273,7 +276,7 @@ public class HFile { * @throws IOException */ public Writer(FileSystem fs, Path path, int blocksize, - String compress, final RawComparator comparator) + String compress, final KeyComparator comparator) throws IOException { this(fs, path, blocksize, compress == null? DEFAULT_COMPRESSION_ALGORITHM: @@ -292,7 +295,7 @@ public class HFile { */ public Writer(FileSystem fs, Path path, int blocksize, Compression.Algorithm compress, - final RawComparator comparator) + final KeyComparator comparator) throws IOException { this(fs.create(path), blocksize, compress, comparator); this.closeOutputStream = true; @@ -309,7 +312,7 @@ public class HFile { * @throws IOException */ public Writer(final FSDataOutputStream ostream, final int blocksize, - final String compress, final RawComparator c) + final String compress, final KeyComparator c) throws IOException { this(ostream, blocksize, Compression.getCompressionAlgorithmByName(compress), c); @@ -324,12 +327,12 @@ public class HFile { * @throws IOException */ public Writer(final FSDataOutputStream ostream, final int blocksize, - final Compression.Algorithm compress, final RawComparator c) + final Compression.Algorithm compress, final KeyComparator c) throws IOException { this.outputStream = ostream; this.closeOutputStream = false; this.blocksize = blocksize; - this.comparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c; + this.rawComparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c; this.name = this.outputStream.toString(); this.compressAlgo = compress == null? DEFAULT_COMPRESSION_ALGORITHM: compress; @@ -423,11 +426,21 @@ public class HFile { * small, consider adding to file info using * {@link #appendFileInfo(byte[], byte[])} * @param metaBlockName name of the block - * @param bytes uninterpreted bytes of the block. + * @param content will call readFields to get data later (DO NOT REUSE) */ - public void appendMetaBlock(String metaBlockName, byte [] bytes) { - metaNames.add(Bytes.toBytes(metaBlockName)); - metaData.add(bytes); + public void appendMetaBlock(String metaBlockName, Writable content) { + byte[] key = Bytes.toBytes(metaBlockName); + int i; + for (i = 0; i < metaNames.size(); ++i) { + // stop when the current key is greater than our own + byte[] cur = metaNames.get(i); + if (this.rawComparator.compare(cur, 0, cur.length, key, 0, key.length) + > 0) { + break; + } + } + metaNames.add(i, key); + metaData.add(i, content); } /** @@ -508,7 +521,7 @@ public class HFile { * @param vlength * @throws IOException */ - public void append(final byte [] key, final int koffset, final int klength, + private void append(final byte [] key, final int koffset, final int klength, final byte [] value, final int voffset, final int vlength) throws IOException { boolean dupKey = checkKey(key, koffset, klength); @@ -552,7 +565,7 @@ public class HFile { MAXIMUM_KEY_LENGTH); } if (this.lastKeyBuffer != null) { - int keyComp = this.comparator.compare(this.lastKeyBuffer, this.lastKeyOffset, + int keyComp = this.rawComparator.compare(this.lastKeyBuffer, this.lastKeyOffset, this.lastKeyLength, key, offset, length); if (keyComp > 0) { throw new IOException("Added a key not lexically larger than" + @@ -595,10 +608,16 @@ public class HFile { metaOffsets = new ArrayList(metaNames.size()); metaDataSizes = new ArrayList(metaNames.size()); for (int i = 0 ; i < metaNames.size() ; ++ i ) { - metaOffsets.add(Long.valueOf(outputStream.getPos())); - metaDataSizes. - add(Integer.valueOf(METABLOCKMAGIC.length + metaData.get(i).length)); - writeMetaBlock(metaData.get(i)); + // store the beginning offset + long curPos = outputStream.getPos(); + metaOffsets.add(curPos); + // write the metadata content + DataOutputStream dos = getCompressingStream(); + dos.write(METABLOCKMAGIC); + metaData.get(i).write(dos); + int size = releaseCompressingStream(dos); + // store the metadata size + metaDataSizes.add(size); } } @@ -632,17 +651,6 @@ public class HFile { } } - /* Write a metadata block. - * @param metadata - * @throws IOException - */ - private void writeMetaBlock(final byte [] b) throws IOException { - DataOutputStream dos = getCompressingStream(); - dos.write(METABLOCKMAGIC); - dos.write(b); - releaseCompressingStream(dos); - } - /* * Add last bits of metadata to fileinfo and then write it out. * Reader will be expecting to find all below. @@ -668,7 +676,7 @@ public class HFile { appendFileInfo(this.fileinfo, FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); appendFileInfo(this.fileinfo, FileInfo.COMPARATOR, - Bytes.toBytes(this.comparator.getClass().getName()), false); + Bytes.toBytes(this.rawComparator.getClass().getName()), false); long pos = o.getPos(); this.fileinfo.write(o); return pos; @@ -710,6 +718,7 @@ public class HFile { private final BlockCache cache; public int cacheHits = 0; public int blockLoads = 0; + public int metaLoads = 0; // Whether file is from in-memory store private boolean inMemory = false; @@ -717,15 +726,7 @@ public class HFile { // Name for this object used when logging or in toString. Is either // the result of a toString on the stream or else is toString of passed // file Path plus metadata key/value pairs. - private String name; - - /* - * Do not expose the default constructor. - */ - @SuppressWarnings("unused") - private Reader() throws IOException { - this(null, -1, null, false); - } + protected String name; /** * Opens a HFile. You must load the file info before you can @@ -799,7 +800,8 @@ public class HFile { * See {@link Writer#appendFileInfo(byte[], byte[])}. * @throws IOException */ - public Map loadFileInfo() throws IOException { + public Map loadFileInfo() + throws IOException { this.trailer = readTrailer(); // Read in the fileinfo and get what we need from it. @@ -889,16 +891,19 @@ public class HFile { } /** * @param metaBlockName + * @param cacheBlock Add block to cache, if found * @return Block wrapped in a ByteBuffer * @throws IOException */ - public ByteBuffer getMetaBlock(String metaBlockName) throws IOException { + public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) + throws IOException { if (trailer.metaIndexCount == 0) { return null; // there are no meta blocks } if (metaIndex == null) { throw new IOException("Meta index not loaded"); } + byte [] mbname = Bytes.toBytes(metaBlockName); int block = metaIndex.blockContainingKey(mbname, 0, mbname.length); if (block == -1) @@ -910,19 +915,45 @@ public class HFile { blockSize = metaIndex.blockOffsets[block+1] - metaIndex.blockOffsets[block]; } - ByteBuffer buf = decompress(metaIndex.blockOffsets[block], - longToInt(blockSize), metaIndex.blockDataSizes[block], true); - byte [] magic = new byte[METABLOCKMAGIC.length]; - buf.get(magic, 0, magic.length); + long now = System.currentTimeMillis(); - if (! Arrays.equals(magic, METABLOCKMAGIC)) { - throw new IOException("Meta magic is bad in block " + block); + // Per meta key from any given file, synchronize reads for said block + synchronized (metaIndex.blockKeys[block]) { + metaLoads++; + // Check cache for block. If found return. + if (cache != null) { + ByteBuffer cachedBuf = cache.getBlock(name + "meta" + block); + if (cachedBuf != null) { + // Return a distinct 'shallow copy' of the block, + // so pos doesnt get messed by the scanner + cacheHits++; + return cachedBuf.duplicate(); + } + // Cache Miss, please load. + } + + ByteBuffer buf = decompress(metaIndex.blockOffsets[block], + longToInt(blockSize), metaIndex.blockDataSizes[block], true); + byte [] magic = new byte[METABLOCKMAGIC.length]; + buf.get(magic, 0, magic.length); + + if (! Arrays.equals(magic, METABLOCKMAGIC)) { + throw new IOException("Meta magic is bad in block " + block); + } + + // Create a new ByteBuffer 'shallow copy' to hide the magic header + buf = buf.slice(); + + readTime += System.currentTimeMillis() - now; + readOps++; + + // Cache the block + if(cacheBlock && cache != null) { + cache.cacheBlock(name + "meta" + block, buf.duplicate(), inMemory); + } + + return buf; } - // Toss the header. May have to remove later due to performance. - buf.compact(); - buf.limit(buf.limit() - METABLOCKMAGIC.length); - buf.rewind(); - return buf; } /** @@ -952,8 +983,8 @@ public class HFile { if (cache != null) { ByteBuffer cachedBuf = cache.getBlock(name + block); if (cachedBuf != null) { - // Return a distinct 'copy' of the block, so pos doesnt get messed by - // the scanner + // Return a distinct 'shallow copy' of the block, + // so pos doesnt get messed by the scanner cacheHits++; return cachedBuf.duplicate(); } @@ -982,11 +1013,12 @@ public class HFile { if (!Arrays.equals(magic, DATABLOCKMAGIC)) { throw new IOException("Data magic is bad in block " + block); } - // Toss the header. May have to remove later due to performance. - buf.compact(); - buf.limit(buf.limit() - DATABLOCKMAGIC.length); - buf.rewind(); + // 'shallow copy' to hide the header + // NOTE: you WILL GET BIT if you call buf.array() but don't start + // reading at buf.arrayOffset() + buf = buf.slice(); + readTime += System.currentTimeMillis() - now; readOps++; @@ -1045,6 +1077,9 @@ public class HFile { return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0]; } + /** + * @return number of KV entries in this HFile + */ public int getEntries() { if (!this.isFileInfoLoaded()) { throw new RuntimeException("File info not loaded"); @@ -1061,6 +1096,13 @@ public class HFile { } return this.blockIndex.isEmpty()? null: this.lastkey; } + + /** + * @return number of K entries in this HFile's filter. Returns KV count if no filter. + */ + public int getFilterEntries() { + return getEntries(); + } /** * @return Comparator. @@ -1099,7 +1141,7 @@ public class HFile { /* * Implementation of {@link HFileScanner} interface. */ - private static class Scanner implements HFileScanner { + protected static class Scanner implements HFileScanner { private final Reader reader; private ByteBuffer block; private int currBlock; @@ -1180,6 +1222,11 @@ public class HFile { return true; } + public boolean shouldSeek(final byte[] row, + final SortedSet columns) { + return true; + } + public int seekTo(byte [] key) throws IOException { return seekTo(key, 0, key.length); } @@ -1333,10 +1380,10 @@ public class HFile { * parts of the file. Also includes basic metadata on this file. */ private static class FixedFileTrailer { - // Offset to the data block index. - long dataIndexOffset; // Offset to the fileinfo data, a small block of vitals.. long fileinfoOffset; + // Offset to the data block index. + long dataIndexOffset; // How many index counts are there (aka: block count) int dataIndexCount; // Offset to the meta block index. diff --git a/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index 9d891c6..f5a5dc0 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.SortedSet; import org.apache.hadoop.hbase.KeyValue; @@ -65,6 +66,17 @@ public interface HFileScanner { public boolean seekBefore(byte [] key) throws IOException; public boolean seekBefore(byte []key, int offset, int length) throws IOException; /** + * Optimization for single key lookups. If the file has a filter, + * perform a lookup on the key. + * @param row the row to scan + * @param family the column family to scan + * @param columns the array of column qualifiers to scan + * @return False if the key definitely does not exist in this ScanFile + * @throws IOException + */ + public boolean shouldSeek(final byte[] row, + final SortedSet columns); + /** * Positions this scanner at the start of the file. * @return False if empty file; i.e. a call to next would return false and * the current key and value are undefined. diff --git a/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 2c81723..9c8e53e 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/core/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -112,7 +112,10 @@ public class HFileOutputFormat extends FileOutputFormat set, final long logCacheFlushId) throws IOException { - HFile.Writer writer = null; + StoreFile.Writer writer = null; long flushed = 0; // Don't flush if there are no entries. if (set.size() == 0) { @@ -546,7 +546,7 @@ public class Store implements HConstants, HeapSize { // if we fail. synchronized (flushLock) { // A. Write the map out to the disk - writer = getWriter(); + writer = createWriter(this.homedir, set.size()); int entries = 0; try { for (KeyValue kv: set) { @@ -559,13 +559,13 @@ public class Store implements HConstants, HeapSize { } finally { // Write out the log sequence number that corresponds to this output // hfile. The hfile is current up to and including logCacheFlushId. - StoreFile.appendMetadata(writer, logCacheFlushId); + writer.appendMetadata(logCacheFlushId, false); writer.close(); } } StoreFile sf = new StoreFile(this.fs, writer.getPath(), blockcache, - this.conf, this.inMemory); - Reader r = sf.getReader(); + this.conf, this.family.getBloomFilterType(), this.inMemory); + Reader r = sf.createReader(); this.storeSize += r.length(); if(LOG.isDebugEnabled()) { LOG.debug("Added " + sf + ", entries=" + r.getEntries() + @@ -577,22 +577,16 @@ public class Store implements HConstants, HeapSize { return sf; } - /** - * @return Writer for this store. - * @throws IOException - */ - HFile.Writer getWriter() throws IOException { - return getWriter(this.homedir); - } - /* * @return Writer for this store. * @param basedir Directory to put writer in. * @throws IOException */ - private HFile.Writer getWriter(final Path basedir) throws IOException { - return StoreFile.getWriter(this.fs, basedir, this.blocksize, - this.compression, this.comparator.getRawComparator()); + private StoreFile.Writer createWriter(final Path basedir, int maxKeyCount) + throws IOException { + return StoreFile.createWriter(this.fs, basedir, this.blocksize, + this.compression, this.comparator, this.conf, + this.family.getBloomFilterType(), maxKeyCount); } /* @@ -879,13 +873,27 @@ public class Store implements HConstants, HeapSize { private HFile.Writer compact(final List filesToCompact, final boolean majorCompaction, final long maxId) throws IOException { + // calculate maximum key count after compaction (for blooms) + int maxKeyCount = 0; + for (StoreFile file : filesToCompact) { + StoreFile.Reader r = file.getReader(); + if (r != null) { + // NOTE: getFilterEntries could cause under-sized blooms if the user + // switches bloom type (e.g. from ROW to ROWCOL) + maxKeyCount += (r.getBloomFilterType() == family.getBloomFilterType()) + ? r.getFilterEntries() : r.getEntries(); + } + } + // For each file, obtain a scanner: - List scanners = StoreFileScanner.getScannersForStoreFiles( - filesToCompact, false, false); + List sfScanners = StoreFileScanner + .getScannersForStoreFiles(filesToCompact, false, false); + List scanners = + new ArrayList(sfScanners.size()+1); // Make the instantiation lazy in case compaction produces no product; i.e. // where all source cells are expired or deleted. - HFile.Writer writer = null; + StoreFile.Writer writer = null; try { if (majorCompaction) { InternalScanner scanner = null; @@ -900,7 +908,7 @@ public class Store implements HConstants, HeapSize { // output to writer: for (KeyValue kv : kvs) { if (writer == null) { - writer = getWriter(this.regionCompactionDir); + writer = createWriter(this.regionCompactionDir, maxKeyCount); } writer.append(kv); } @@ -915,7 +923,7 @@ public class Store implements HConstants, HeapSize { MinorCompactingStoreScanner scanner = null; try { scanner = new MinorCompactingStoreScanner(this, scanners); - writer = getWriter(this.regionCompactionDir); + writer = createWriter(this.regionCompactionDir, maxKeyCount); while (scanner.next(writer)) { // Nothing to do } @@ -926,7 +934,7 @@ public class Store implements HConstants, HeapSize { } } finally { if (writer != null) { - StoreFile.appendMetadata(writer, maxId, majorCompaction); + writer.appendMetadata(maxId, majorCompaction); writer.close(); } } @@ -970,7 +978,9 @@ public class Store implements HConstants, HeapSize { LOG.error("Failed move of compacted file " + compactedFile.getPath(), e); return null; } - result = new StoreFile(this.fs, p, blockcache, this.conf, this.inMemory); + result = new StoreFile(this.fs, p, blockcache, this.conf, + this.family.getBloomFilterType(), this.inMemory); + result.createReader(); } this.lock.writeLock().lock(); try { @@ -1000,7 +1010,7 @@ public class Store implements HConstants, HeapSize { notifyChangedReadersObservers(); // Finally, delete old store files. for (StoreFile hsf: compactedFiles) { - hsf.delete(); + hsf.deleteReader(); } } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); @@ -1569,7 +1579,7 @@ public class Store implements HConstants, HeapSize { } public static final long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + + ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN + ClassSize.align(ClassSize.ARRAY)); diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 038c09e..0f7a5ec 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -21,26 +21,37 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.HalfHFileReader; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; +import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; +import org.apache.hadoop.hbase.util.BloomFilter; +import org.apache.hadoop.hbase.util.ByteBloomFilter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.util.StringUtils; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.Arrays; import java.util.Map; +import java.util.SortedSet; import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; @@ -49,11 +60,11 @@ import java.util.regex.Pattern; /** * A Store data file. Stores usually have one or more of these files. They * are produced by flushing the memstore to disk. To - * create, call {@link #getWriter(FileSystem, Path)} and append data. Be + * create, call {@link #createWriter(FileSystem, Path, int)} and append data. Be * sure to add any metadata before calling close on the Writer * (Use the appendMetadata convenience methods). On close, a StoreFile is * sitting in the Filesystem. To refer to it, create a StoreFile instance - * passing filesystem and path. To read, call {@link #getReader()}. + * passing filesystem and path. To read, call {@link #createReader()}. *

StoreFiles may also reference store files in another Store. */ public class StoreFile implements HConstants { @@ -65,7 +76,7 @@ public class StoreFile implements HConstants { // Make default block size for StoreFiles 8k while testing. TODO: FIX! // Need to make it 8k for testing. - private static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024; + public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024; private final FileSystem fs; // This file's path. @@ -80,16 +91,23 @@ public class StoreFile implements HConstants { private boolean inMemory; // Keys for metadata stored in backing HFile. - private static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); + /** Constant for the max sequence ID meta */ + public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); // Set when we obtain a Reader. private long sequenceid = -1; - private static final byte [] MAJOR_COMPACTION_KEY = + /** Constant for major compaction meta */ + public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; + static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META"; + static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA"; + static final byte[] BLOOM_FILTER_TYPE_KEY = + Bytes.toBytes("BLOOM_FILTER_TYPE"); + /* * Regex that will work for straight filenames and for reference names. * If reference, then the regex has more than just one group. Group 1 is @@ -98,11 +116,12 @@ public class StoreFile implements HConstants { private static final Pattern REF_NAME_PARSER = Pattern.compile("^(\\d+)(?:\\.(.+))?$"); - private volatile HFile.Reader reader; + private volatile StoreFile.Reader reader; // Used making file ids. private final static Random rand = new Random(); private final Configuration conf; + private final BloomType bloomType; /** * Constructor, loads a reader and it's indices, etc. May allocate a @@ -112,10 +131,11 @@ public class StoreFile implements HConstants { * @param p The path of the file. * @param blockcache true if the block cache is enabled. * @param conf The current configuration. + * @param bt The bloom type to use for this store file * @throws IOException When opening the reader fails. */ StoreFile(final FileSystem fs, final Path p, final boolean blockcache, - final Configuration conf, final boolean inMemory) + final Configuration conf, final BloomType bt, final boolean inMemory) throws IOException { this.conf = conf; this.fs = fs; @@ -126,7 +146,14 @@ public class StoreFile implements HConstants { this.reference = Reference.read(fs, p); this.referencePath = getReferredToFile(this.path); } - this.reader = open(); + // ignore if the column family config says "no bloom filter" + // even if there is one in the hfile. + if (conf.getBoolean("io.hfile.bloom.enabled", true)) { + this.bloomType = bt; + } else { + this.bloomType = BloomType.NONE; + LOG.info("Ignoring bloom filter check for file (disabled in config)"); + } } /** @@ -255,18 +282,18 @@ public class StoreFile implements HConstants { * Opens reader on this store file. Called by Constructor. * @return Reader for the store file. * @throws IOException - * @see #close() + * @see #closeReader() */ - protected HFile.Reader open() + private StoreFile.Reader open() throws IOException { if (this.reader != null) { throw new IllegalAccessError("Already open"); } if (isReference()) { - this.reader = new HalfHFileReader(this.fs, this.referencePath, + this.reader = new HalfStoreFileReader(this.fs, this.referencePath, getBlockCache(), this.reference); } else { - this.reader = new Reader(this.fs, this.path, getBlockCache(), + this.reader = new StoreFile.Reader(this.fs, this.path, getBlockCache(), this.inMemory); } // Load up indices and fileinfo. @@ -296,44 +323,59 @@ public class StoreFile implements HConstants { this.majorCompaction.set(mc); } } + + if (this.bloomType != BloomType.NONE) { + this.reader.loadBloomfilter(); + } - // TODO read in bloom filter here, ignore if the column family config says - // "no bloom filter" even if there is one in the hfile. + return this.reader; + } + + /** + * @return Reader for StoreFile. creates if necessary + * @throws IOException + */ + public StoreFile.Reader createReader() throws IOException { + if (this.reader == null) { + this.reader = open(); + } return this.reader; } /** - * @return Current reader. Must call open first else returns null. + * @return Current reader. Must call createReader first else returns null. + * @throws IOException + * @see {@link #createReader()} */ - public HFile.Reader getReader() { + public StoreFile.Reader getReader() { return this.reader; } /** * @throws IOException */ - public synchronized void close() throws IOException { + public synchronized void closeReader() throws IOException { if (this.reader != null) { this.reader.close(); this.reader = null; } } - @Override - public String toString() { - return this.path.toString() + - (isReference()? "-" + this.referencePath + "-" + reference.toString(): ""); - } - /** * Delete this file * @throws IOException */ - public void delete() throws IOException { - close(); + public void deleteReader() throws IOException { + closeReader(); this.fs.delete(getPath(), true); } + @Override + public String toString() { + return this.path.toString() + + (isReference()? "-" + this.referencePath + "-" + reference.toString(): ""); + } + /** * Utility to help with rename. * @param fs @@ -361,38 +403,47 @@ public class StoreFile implements HConstants { * @param fs * @param dir Path to family directory. Makes the directory if doesn't exist. * Creates a file with a unique name in this directory. + * @param blocksize size per filesystem block * @return HFile.Writer * @throws IOException */ - public static HFile.Writer getWriter(final FileSystem fs, final Path dir) + public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir, + final int blocksize) throws IOException { - return getWriter(fs, dir, DEFAULT_BLOCKSIZE_SMALL, null, null); + return createWriter(fs,dir,blocksize,null,null,null,BloomType.NONE,0); } /** - * Get a store file writer. Client is responsible for closing file when done. - * If metadata, add BEFORE closing using - * {@link #appendMetadata(org.apache.hadoop.hbase.io.hfile.HFile.Writer, long)}. + * Create a store file writer. Client is responsible for closing file when done. + * If metadata, add BEFORE closing using appendMetadata() * @param fs * @param dir Path to family directory. Makes the directory if doesn't exist. * Creates a file with a unique name in this directory. * @param blocksize * @param algorithm Pass null to get default. + * @param conf HBase system configuration. used with bloom filters + * @param bloomType column family setting for bloom filters * @param c Pass null to get default. + * @param maxKeySize peak theoretical entry size (maintains error rate) * @return HFile.Writer * @throws IOException */ - public static HFile.Writer getWriter(final FileSystem fs, final Path dir, - final int blocksize, final Compression.Algorithm algorithm, - final KeyValue.KeyComparator c) + public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir, + final int blocksize, final Compression.Algorithm algorithm, + final KeyValue.KVComparator c, final HBaseConfiguration conf, + BloomType bloomType, int maxKeySize) throws IOException { if (!fs.exists(dir)) { fs.mkdirs(dir); } Path path = getUniqueFile(fs, dir); - return new HFile.Writer(fs, path, blocksize, - algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm, - c == null? KeyValue.KEY_COMPARATOR: c); + if(conf == null || !conf.getBoolean("io.hfile.bloom.enabled", true)) { + bloomType = BloomType.NONE; + } + + return new StoreFile.Writer(fs, path, blocksize, + algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm, + conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize); } /** @@ -442,35 +493,6 @@ public class StoreFile implements HConstants { return p; } - /** - * Write file metadata. - * Call before you call close on the passed w since its written - * as metadata to that file. - * - * @param w hfile writer - * @param maxSequenceId Maximum sequence id. - * @throws IOException - */ - static void appendMetadata(final HFile.Writer w, final long maxSequenceId) - throws IOException { - appendMetadata(w, maxSequenceId, false); - } - - /** - * Writes metadata. - * Call before you call close on the passed w since its written - * as metadata to that file. - * @param maxSequenceId Maximum sequence id. - * @param mc True if this file is product of a major compaction - * @throws IOException - */ - public static void appendMetadata(final HFile.Writer w, final long maxSequenceId, - final boolean mc) - throws IOException { - w.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); - w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(mc)); - } - /* * Write out a split reference. * @param fs @@ -497,4 +519,298 @@ public class StoreFile implements HConstants { Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName); return r.write(fs, p); } + + public static enum BloomType { + /** + * Bloomfilters disabled + */ + NONE, + /** + * Bloom enabled with Table row as Key + */ + ROW, + /** + * Bloom enabled with Table row & column (family+qualifier) as Key + */ + ROWCOL + } + + /** + * + */ + public static class Reader extends HFile.Reader { + /** Bloom Filter class. Caches only meta, pass in data */ + protected BloomFilter bloomFilter = null; + /** Type of bloom filter (e.g. ROW vs ROWCOL) */ + protected BloomType bloomFilterType; + + public Reader(FileSystem fs, Path path, BlockCache cache, + boolean inMemory) + throws IOException { + super(fs, path, cache, inMemory); + } + + public Reader(final FSDataInputStream fsdis, final long size, + final BlockCache cache, final boolean inMemory) { + super(fsdis,size,cache,inMemory); + bloomFilterType = BloomType.NONE; + } + + @Override + public Map loadFileInfo() + throws IOException { + Map fi = super.loadFileInfo(); + + byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY); + if (b != null) { + bloomFilterType = BloomType.valueOf(Bytes.toString(b)); + } + + return fi; + } + + /** + * Load the bloom filter for this HFile into memory. + * Assumes the HFile has already been loaded + */ + public void loadBloomfilter() { + if (this.bloomFilter != null) { + return; // already loaded + } + + // see if bloom filter information is in the metadata + try { + ByteBuffer b = getMetaBlock(BLOOM_FILTER_META_KEY, false); + if (b != null) { + if (bloomFilterType == BloomType.NONE) { + throw new IOException("valid bloom filter type not found in FileInfo"); + } + this.bloomFilter = new ByteBloomFilter(b); + LOG.info("Loaded " + (bloomFilterType==BloomType.ROW? "row":"col") + + " bloom filter metadata for " + name); + } + } catch (IOException e) { + LOG.error("Error reading bloom filter meta -- proceeding without", e); + this.bloomFilter = null; + } catch (IllegalArgumentException e) { + LOG.error("Bad bloom filter meta -- proceeding without", e); + this.bloomFilter = null; + } + } + + BloomFilter getBloomFilter() { + return this.bloomFilter; + } + + /** + * @return bloom type information associated with this store file + */ + public BloomType getBloomFilterType() { + return this.bloomFilterType; + } + + @Override + public int getFilterEntries() { + return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount() + : super.getFilterEntries(); + } + + @Override + public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { + return new Scanner(this, cacheBlocks, pread); + } + + protected class Scanner extends HFile.Reader.Scanner { + public Scanner(Reader r, boolean cacheBlocks, final boolean pread) { + super(r, cacheBlocks, pread); + } + + @Override + public boolean shouldSeek(final byte[] row, + final SortedSet columns) { + if (bloomFilter == null) { + return true; + } + + byte[] key; + switch(bloomFilterType) { + case ROW: + key = row; + break; + case ROWCOL: + if (columns.size() == 1) { + byte[] col = columns.first(); + key = Bytes.add(row, col); + break; + } + //$FALL-THROUGH$ + default: + return true; + } + + try { + ByteBuffer bloom = getMetaBlock(BLOOM_FILTER_DATA_KEY, true); + if (bloom != null) { + return bloomFilter.contains(key, bloom); + } + } catch (IOException e) { + LOG.error("Error reading bloom filter data -- proceeding without", + e); + bloomFilter = null; + } catch (IllegalArgumentException e) { + LOG.error("Bad bloom filter data -- proceeding without", e); + bloomFilter = null; + } + + return true; + } + + } + } + + /** + * + */ + public static class Writer extends HFile.Writer { + private final BloomFilter bloomFilter; + private final BloomType bloomType; + private KVComparator kvComparator; + private KeyValue lastKv = null; + private byte[] lastByteArray = null; + + /** + * Creates an HFile.Writer that also write helpful meta data. + * @param fs file system to write to + * @param path file name to create + * @param blocksize HDFS block size + * @param compress HDFS block compression + * @param conf user configuration + * @param comparator key comparator + * @param bloomType bloom filter setting + * @param maxKeys maximum amount of keys to add (for blooms) + * @throws IOException problem writing to FS + */ + public Writer(FileSystem fs, Path path, int blocksize, + Compression.Algorithm compress, final HBaseConfiguration conf, + final KVComparator comparator, BloomType bloomType, int maxKeys) + throws IOException { + super(fs, path, blocksize, compress, comparator.getRawComparator()); + + this.kvComparator = comparator; + + if (bloomType != BloomType.NONE && conf != null) { + float err = conf.getFloat("io.hfile.bloom.error.rate", (float)0.01); + int maxFold = conf.getInt("io.hfile.bloom.max.fold", 7); + + this.bloomFilter = new ByteBloomFilter(maxKeys, err, + Hash.getHashType(conf), maxFold); + this.bloomFilter.allocBloom(); + this.bloomType = bloomType; + } else { + this.bloomFilter = null; + this.bloomType = BloomType.NONE; + } + } + + /** + * Writes meta data. + * Call before {@link #close()} since its written as meta data to this file. + * @param maxSequenceId Maximum sequence id. + * @param majorCompaction True if this file is product of a major compaction + * @throws IOException problem writing to FS + */ + public void appendMetadata(final long maxSequenceId, + final boolean majorCompaction) + throws IOException { + appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); + appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); + } + + @Override + public void append(final KeyValue kv) + throws IOException { + if (this.bloomFilter != null) { + // only add to the bloom filter on a new, unique key + boolean newKey = true; + if (this.lastKv != null) { + switch(bloomType) { + case ROW: + newKey = ! kvComparator.matchingRows(kv, lastKv); + break; + case ROWCOL: + newKey = ! kvComparator.matchingRowColumn(kv, lastKv); + break; + case NONE: + newKey = false; + } + } + if (newKey) { + /* + * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png + * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp + * + * 2 Types of Filtering: + * 1. Row = Row + * 2. RowCol = Row + Qualifier + */ + switch (bloomType) { + case ROW: + this.bloomFilter.add(kv.getBuffer(), kv.getRowOffset(), + kv.getRowLength()); + break; + case ROWCOL: + // merge(row, qualifier) + int ro = kv.getRowOffset(); + int rl = kv.getRowLength(); + int qo = kv.getQualifierOffset(); + int ql = kv.getQualifierLength(); + byte [] result = new byte[rl + ql]; + System.arraycopy(kv.getBuffer(), ro, result, 0, rl); + System.arraycopy(kv.getBuffer(), qo, result, rl, ql); + + this.bloomFilter.add(result); + break; + default: + } + this.lastKv = kv; + } + } + super.append(kv); + } + + @Override + public void append(final byte [] key, final byte [] value) + throws IOException { + if (this.bloomFilter != null) { + // only add to the bloom filter on a new row + if(this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) { + this.bloomFilter.add(key); + this.lastByteArray = key; + } + } + super.append(key, value); + } + + @Override + public void close() + throws IOException { + // make sure we wrote something to the bloom before adding it + if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) { + bloomFilter.finalize(); + if (this.bloomFilter.getMaxKeys() > 0) { + int b = this.bloomFilter.getByteSize(); + int k = this.bloomFilter.getKeyCount(); + int m = this.bloomFilter.getMaxKeys(); + StoreFile.LOG.info("Bloom added to HFile. " + b + "B, " + + k + "/" + m + " (" + NumberFormat.getPercentInstance().format( + ((double)k) / ((double)m)) + ")"); + } + appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter()); + appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter()); + appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); + } + super.close(); + } + + } } diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index d9e866a..52d228b 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -53,12 +53,12 @@ class StoreFileScanner implements KeyValueScanner { * Return an array of scanners corresponding to the given * set of store files. */ - public static List getScannersForStoreFiles( + public static List getScannersForStoreFiles( Collection filesToCompact, boolean cacheBlocks, boolean usePread) { - List scanners = - new ArrayList(filesToCompact.size()); + List scanners = + new ArrayList(filesToCompact.size()); for (StoreFile file : filesToCompact) { Reader r = file.getReader(); if (r == null) { @@ -72,6 +72,10 @@ class StoreFileScanner implements KeyValueScanner { return scanners; } + public HFileScanner getHFileScanner() { + return this.hfs; + } + public String toString() { return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]"; } diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 32daa77..dc5b58e 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -61,10 +61,10 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb store.versionsToReturn(scan.getMaxVersions())); this.isGet = scan.isGetScan(); - List scanners = getScanners(); + // pass columns = try to filter out unnecessary ScanFiles + List scanners = getScanners(scan, columns); // Seek all scanners to the initial key - // TODO if scan.isGetScan, use bloomfilters to skip seeking for(KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } @@ -124,9 +124,37 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb private List getScanners() { // First the store file scanners Map map = this.store.getStorefiles().descendingMap(); + List sfScanners = StoreFileScanner + .getScannersForStoreFiles(map.values(), cacheBlocks, isGet); List scanners = - StoreFileScanner.getScannersForStoreFiles(map.values(), - cacheBlocks, isGet); + new ArrayList(sfScanners.size()+1); + scanners.addAll(sfScanners); + // Then the memstore scanners + scanners.addAll(this.store.memstore.getScanners()); + return scanners; + } + + /* + * @return List of scanners to seek, possibly filtered by StoreFile. + */ + private List getScanners(Scan scan, + final NavigableSet columns) { + // First the store file scanners + Map map = this.store.getStorefiles().descendingMap(); + List sfScanners = StoreFileScanner + .getScannersForStoreFiles(map.values(), cacheBlocks, isGet); + List scanners = + new ArrayList(sfScanners.size()+1); + + // exclude scan files that have failed file filters + for(StoreFileScanner sfs : sfScanners) { + if (isGet && + !sfs.getHFileScanner().shouldSeek(scan.getStartRow(), columns)) { + continue; // exclude this hfs + } + scanners.add(sfs); + } + // Then the memstore scanners scanners.addAll(this.store.memstore.getScanners()); return scanners; diff --git a/core/src/main/java/org/apache/hadoop/hbase/rest/model/ColumnSchemaModel.java b/core/src/main/java/org/apache/hadoop/hbase/rest/model/ColumnSchemaModel.java index 4547c46..caf5368 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/rest/model/ColumnSchemaModel.java +++ b/core/src/main/java/org/apache/hadoop/hbase/rest/model/ColumnSchemaModel.java @@ -146,16 +146,15 @@ public class ColumnSchemaModel implements Serializable { } /** - * @return true if the BLOOMFILTER attribute is present and true + * @return the value of the BLOOMFILTER attribute or its default if unset */ - public boolean __getBloomfilter() { + public String __getBloomfilter() { Object o = attrs.get(BLOOMFILTER); - return o != null ? - Boolean.valueOf(o.toString()) : HColumnDescriptor.DEFAULT_BLOOMFILTER; + return o != null ? o.toString() : HColumnDescriptor.DEFAULT_BLOOMFILTER; } /** - * @return the value of the COMPRESSION attribute or its default if it is unset + * @return the value of the COMPRESSION attribute or its default if unset */ public String __getCompression() { Object o = attrs.get(COMPRESSION); @@ -203,8 +202,8 @@ public class ColumnSchemaModel implements Serializable { attrs.put(BLOCKCACHE, Boolean.toString(value)); } - public void __setBloomfilter(boolean value) { - attrs.put(BLOOMFILTER, Boolean.toString(value)); + public void __setBloomfilter(String value) { + attrs.put(BLOOMFILTER, value); } /** diff --git a/core/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java b/core/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java index 9be58d7..f319751 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java +++ b/core/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; import org.apache.hadoop.hbase.thrift.generated.TCell; @@ -47,10 +49,8 @@ public class ThriftUtilities { throws IllegalArgument { Compression.Algorithm comp = Compression.getCompressionAlgorithmByName(in.compression.toLowerCase()); - boolean bloom = false; - if (in.bloomFilterType.compareTo("NONE") != 0) { - bloom = true; - } + StoreFile.BloomType bt = + BloomType.valueOf(in.bloomFilterType); if (in.name == null || in.name.length <= 0) { throw new IllegalArgument("column name is empty"); @@ -58,7 +58,7 @@ public class ThriftUtilities { byte [] parsedName = KeyValue.parseColumn(in.name)[0]; HColumnDescriptor col = new HColumnDescriptor(parsedName, in.maxVersions, comp.getName(), in.inMemory, in.blockCacheEnabled, - in.timeToLive, bloom); + in.timeToLive, bt.toString()); return col; } @@ -77,7 +77,7 @@ public class ThriftUtilities { col.compression = in.getCompression().toString(); col.inMemory = in.isInMemory(); col.blockCacheEnabled = in.isBlockCacheEnabled(); - col.bloomFilterType = Boolean.toString(in.isBloomfilter()); + col.bloomFilterType = in.getBloomFilterType().toString(); return col; } @@ -147,4 +147,4 @@ public class ThriftUtilities { Result [] result = { in }; return rowResultFromHBase(result); } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java b/core/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java new file mode 100644 index 0000000..42e816a --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java @@ -0,0 +1,122 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.Writable; + +/** + * Defines the general behavior of a bloom filter. + *

+ * The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by + * the networking research community in the past decade thanks to the bandwidth efficiencies that it + * offers for the transmission of set membership information between networked hosts. A sender encodes + * the information into a bit vector, the Bloom filter, that is more compact than a conventional + * representation. Computation and space costs for construction are linear in the number of elements. + * The receiver uses the filter to test whether various elements are members of the set. Though the + * filter will occasionally return a false positive, it will never return a false negative. When creating + * the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size. + * + *

+ * Originally created by + * European Commission One-Lab Project 034819. + * + *

+ * It must be extended in order to define the real behavior. + */ +public interface BloomFilter { + /** + * Allocate memory for the bloom filter data. Note that bloom data isn't + * allocated by default because it can grow large & reads would be better + * managed by the LRU cache. + */ + void allocBloom(); + + /** + * Add the specified binary to the bloom filter. + * + * @param buf data to be added to the bloom + */ + void add(byte []buf); + + /** + * Add the specified binary to the bloom filter. + * + * @param buf data to be added to the bloom + * @param offset offset into the data to be added + * @param len length of the data to be added + */ + void add(byte []buf, int offset, int len); + + /** + * Check if the specified key is contained in the bloom filter. + * + * @param buf data to check for existence of + * @param bloom bloom filter data to search + * @return true if matched by bloom, false if not + */ + boolean contains(byte [] buf, ByteBuffer bloom); + + /** + * Check if the specified key is contained in the bloom filter. + * + * @param buf data to check for existence of + * @param offset offset into the data + * @param length length of the data + * @param bloom bloom filter data to search + * @return true if matched by bloom, false if not + */ + boolean contains(byte [] buf, int offset, int length, ByteBuffer bloom); + + /** + * @return The number of keys added to the bloom + */ + int getKeyCount(); + + /** + * @return The max number of keys that can be inserted + * to maintain the desired error rate + */ + public int getMaxKeys(); + + /** + * Size of the bloom, in bytes + */ + public int getByteSize(); + + /** + * Finalize the bloom before writing metadata & data to disk + */ + void finalize(); + + /** + * Get a writable interface into bloom filter meta data. + * @return writable class + */ + Writable getMetaWriter(); + + /** + * Get a writable interface into bloom filter data (actual bloom). + * @return writable class + */ + Writable getDataWriter(); +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java b/core/src/main/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java new file mode 100644 index 0000000..c6bb358 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java @@ -0,0 +1,422 @@ +/** + * + * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org) + * All rights reserved. + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the distribution. + * - Neither the name of the University Catholique de Louvain - UCL + * nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.DataOutput; +import java.io.DataInput; +import java.io.IOException; +import java.lang.Math; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.bloom.Filter; + +/** + * Implements a Bloom filter, as defined by Bloom in 1970. + *

+ * The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by + * the networking research community in the past decade thanks to the bandwidth efficiencies that it + * offers for the transmission of set membership information between networked hosts. A sender encodes + * the information into a bit vector, the Bloom filter, that is more compact than a conventional + * representation. Computation and space costs for construction are linear in the number of elements. + * The receiver uses the filter to test whether various elements are members of the set. Though the + * filter will occasionally return a false positive, it will never return a false negative. When creating + * the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size. + * + *

+ * Originally created by + * European Commission One-Lab Project 034819. + * + * @see BloomFilter The general behavior of a filter + * + * @see Space/Time Trade-Offs in Hash Coding with Allowable Errors + */ +public class ByteBloomFilter implements BloomFilter { + /** Current file format version */ + public static final int VERSION = 1; + + /** Bytes (B) in the array */ + protected int byteSize; + /** Number of hash functions */ + protected final int hashCount; + /** Hash type */ + protected final int hashType; + /** Hash Function */ + protected final Hash hash; + /** Keys currently in the bloom */ + protected int keyCount; + /** Max Keys expected for the bloom */ + protected int maxKeys; + /** Bloom bits */ + protected ByteBuffer bloom; + + /** Bit-value lookup array to prevent doing the same work over and over */ + private static final byte [] bitvals = { + (byte) 0x01, + (byte) 0x02, + (byte) 0x04, + (byte) 0x08, + (byte) 0x10, + (byte) 0x20, + (byte) 0x40, + (byte) 0x80 + }; + + /** + * Loads bloom filter meta data from file input. + * @param meta stored bloom meta data + * @throws IllegalArgumentException meta data is invalid + */ + public ByteBloomFilter(ByteBuffer meta) + throws IllegalArgumentException { + int version = meta.getInt(); + if (version != VERSION) throw new IllegalArgumentException("Bad version"); + + this.byteSize = meta.getInt(); + this.hashCount = meta.getInt(); + this.hashType = meta.getInt(); + this.keyCount = meta.getInt(); + this.maxKeys = this.keyCount; + + this.hash = Hash.getInstance(this.hashType); + sanityCheck(); + } + + /** + * Determines & initializes bloom filter meta data from user config. Call + * {@link #allocBloom()} to allocate bloom filter data. + * @param maxKeys Maximum expected number of keys that will be stored in this bloom + * @param errorRate Desired false positive error rate. Lower rate = more storage required + * @param hashType Type of hash function to use + * @param foldFactor When finished adding entries, you may be able to 'fold' + * this bloom to save space. Tradeoff potentially excess bytes in bloom for + * ability to fold if keyCount is exponentially greater than maxKeys. + * @throws IllegalArgumentException + */ + public ByteBloomFilter(int maxKeys, float errorRate, int hashType, int foldFactor) + throws IllegalArgumentException { + /* + * Bloom filters are very sensitive to the number of elements inserted + * into them. For HBase, the number of entries depends on the size of the + * data stored in the column. Currently the default region size is 256MB, + * so entry count ~= 256MB / (average value size for column). Despite + * this rule of thumb, there is no efficient way to calculate the entry + * count after compactions. Therefore, it is often easier to use a + * dynamic bloom filter that will add extra space instead of allowing the + * error rate to grow. + * + * ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf ) + * + * m denotes the number of bits in the Bloom filter (bitSize) + * n denotes the number of elements inserted into the Bloom filter (maxKeys) + * k represents the number of hash functions used (nbHash) + * e represents the desired false positive rate for the bloom (err) + * + * If we fix the error rate (e) and know the number of entries, then + * the optimal bloom size m = -(n * ln(err) / (ln(2)^2) + * ~= n * ln(err) / ln(0.6185) + * + * The probability of false positives is minimized when k = m/n ln(2). + */ + int bitSize = (int)Math.ceil(maxKeys * (Math.log(errorRate) / Math.log(0.6185))); + int functionCount = (int)Math.ceil(Math.log(2) * (bitSize / maxKeys)); + + // increase byteSize so folding is possible + int byteSize = (bitSize + 7) / 8; + int mask = (1 << foldFactor) - 1; + if ( (mask & byteSize) != 0) { + byteSize >>= foldFactor; + ++byteSize; + byteSize <<= foldFactor; + } + + this.byteSize = byteSize; + this.hashCount = functionCount; + this.hashType = hashType; + this.keyCount = 0; + this.maxKeys = maxKeys; + + this.hash = Hash.getInstance(hashType); + sanityCheck(); + } + + @Override + public void allocBloom() { + if (this.bloom != null) { + throw new IllegalArgumentException("can only create bloom once."); + } + this.bloom = ByteBuffer.allocate(this.byteSize); + assert this.bloom.hasArray(); + } + + void sanityCheck() throws IllegalArgumentException { + if(this.byteSize <= 0) { + throw new IllegalArgumentException("maxValue must be > 0"); + } + + if(this.hashCount <= 0) { + throw new IllegalArgumentException("Hash function count must be > 0"); + } + + if (this.hash == null) { + throw new IllegalArgumentException("hashType must be known"); + } + + if (this.keyCount < 0) { + throw new IllegalArgumentException("must have positive keyCount"); + } + } + + void bloomCheck(ByteBuffer bloom) throws IllegalArgumentException { + if (this.byteSize != bloom.limit()) { + throw new IllegalArgumentException( + "Configured bloom length should match actual length"); + } + } + + @Override + public void add(byte [] buf) { + add(buf, 0, buf.length); + } + + @Override + public void add(byte [] buf, int offset, int len) { + /* + * For faster hashing, use combinatorial generation + * http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf + */ + int hash1 = this.hash.hash(buf, offset, len, 0); + int hash2 = this.hash.hash(buf, offset, len, hash1); + + for (int i = 0; i < this.hashCount; i++) { + int hashLoc = Math.abs((hash1 + i * hash2) % (this.byteSize * 8)); + set(hashLoc); + } + + ++this.keyCount; + } + + /** + * Should only be used in tests when writing a bloom filter. + */ + boolean contains(byte [] buf) { + return contains(buf, 0, buf.length, this.bloom); + } + + /** + * Should only be used in tests when writing a bloom filter. + */ + boolean contains(byte [] buf, int offset, int length) { + return contains(buf, offset, length, this.bloom); + } + + @Override + public boolean contains(byte [] buf, ByteBuffer theBloom) { + return contains(buf, 0, buf.length, theBloom); + } + + @Override + public boolean contains(byte [] buf, int offset, int length, + ByteBuffer theBloom) { + + if(theBloom.limit() != this.byteSize) { + throw new IllegalArgumentException("Bloom does not match expected size"); + } + + int hash1 = this.hash.hash(buf, offset, length, 0); + int hash2 = this.hash.hash(buf, offset, length, hash1); + + for (int i = 0; i < this.hashCount; i++) { + int hashLoc = Math.abs((hash1 + i * hash2) % (this.byteSize * 8)); + if (!get(hashLoc, theBloom) ) { + return false; + } + } + return true; + } + + //--------------------------------------------------------------------------- + /** Private helpers */ + + /** + * Set the bit at the specified index to 1. + * + * @param pos index of bit + */ + void set(int pos) { + int bytePos = pos / 8; + int bitPos = pos % 8; + byte curByte = bloom.get(bytePos); + curByte |= bitvals[bitPos]; + bloom.put(bytePos, curByte); + } + + /** + * Check if bit at specified index is 1. + * + * @param pos index of bit + * @return true if bit at specified index is 1, false if 0. + */ + static boolean get(int pos, ByteBuffer theBloom) { + int bytePos = pos / 8; + int bitPos = pos % 8; + byte curByte = theBloom.get(bytePos); + curByte &= bitvals[bitPos]; + return (curByte != 0); + } + + @Override + public int getKeyCount() { + return this.keyCount; + } + + @Override + public int getMaxKeys() { + return this.maxKeys; + } + + @Override + public int getByteSize() { + return this.byteSize; + } + + @Override + public void finalize() { + // see if the actual size is exponentially smaller than expected. + if (this.keyCount > 0 && this.bloom.hasArray()) { + int pieces = 1; + int newByteSize = this.byteSize; + int newMaxKeys = this.maxKeys; + + // while exponentially smaller & folding is lossless + while ( (newByteSize & 1) == 0 && newMaxKeys > (this.keyCount<<1) ) { + pieces <<= 1; + newByteSize >>= 1; + newMaxKeys >>= 1; + } + + // if we should fold these into pieces + if (pieces > 1) { + byte[] array = this.bloom.array(); + int start = this.bloom.arrayOffset(); + int end = start + newByteSize; + int off = end; + for(int p = 1; p < pieces; ++p) { + for(int pos = start; pos < end; ++pos) { + array[pos] |= array[off++]; + } + } + // folding done, only use a subset of this array + this.bloom.rewind(); + this.bloom.limit(newByteSize); + this.bloom = this.bloom.slice(); + this.byteSize = newByteSize; + this.maxKeys = newMaxKeys; + } + } + } + + + //--------------------------------------------------------------------------- + + /** + * Writes just the bloom filter to the output array + * @param out OutputStream to place bloom + * @throws IOException Error writing bloom array + */ + public void writeBloom(final DataOutput out) throws IOException { + if (!this.bloom.hasArray()) { + throw new IOException("Only writes ByteBuffer with underlying array."); + } + out.write(bloom.array(), bloom.arrayOffset(), bloom.limit()); + } + + @Override + public Writable getMetaWriter() { + return new MetaWriter(); + } + + @Override + public Writable getDataWriter() { + return new DataWriter(); + } + + private class MetaWriter implements Writable { + protected MetaWriter() {} + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Cant read with this class."); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(VERSION); + out.writeInt(byteSize); + out.writeInt(hashCount); + out.writeInt(hashType); + out.writeInt(keyCount); + } + } + + private class DataWriter implements Writable { + protected DataWriter() {} + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Cant read with this class."); + } + + @Override + public void write(DataOutput out) throws IOException { + writeBloom(out); + } + } + +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java b/core/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java new file mode 100644 index 0000000..f818279 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java @@ -0,0 +1,334 @@ +/** + * + * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org) + * All rights reserved. + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the distribution. + * - Neither the name of the University Catholique de Louvain - UCL + * nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.Writable; + +/** + * Implements a dynamic Bloom filter, as defined in the INFOCOM 2006 paper. + *

+ * A dynamic Bloom filter (DBF) makes use of a s * m bit matrix but + * each of the s rows is a standard Bloom filter. The creation + * process of a DBF is iterative. At the start, the DBF is a 1 * m + * bit matrix, i.e., it is composed of a single standard Bloom filter. + * It assumes that nr elements are recorded in the + * initial bit vector, where nr <= n (n is + * the cardinality of the set A to record in the filter). + *

+ * As the size of A grows during the execution of the application, + * several keys must be inserted in the DBF. When inserting a key into the DBF, + * one must first get an active Bloom filter in the matrix. A Bloom filter is + * active when the number of recorded keys, nr, is + * strictly less than the current cardinality of A, n. + * If an active Bloom filter is found, the key is inserted and + * nr is incremented by one. On the other hand, if there + * is no active Bloom filter, a new one is created (i.e., a new row is added to + * the matrix) according to the current size of A and the element + * is added in this new Bloom filter and the nr value of + * this new Bloom filter is set to one. A given key is said to belong to the + * DBF if the k positions are set to one in one of the matrix rows. + *

+ * Originally created by + * European Commission One-Lab Project 034819. + * + * @see BloomFilter A Bloom filter + * + * @see Theory and Network Applications of Dynamic Bloom Filters + */ +public class DynamicByteBloomFilter implements BloomFilter { + /** Current file format version */ + public static final int VERSION = 2; + /** Maximum number of keys in a dynamic Bloom filter row. */ + protected final int keyInterval; + /** The maximum false positive rate per bloom */ + protected final float errorRate; + /** Hash type */ + protected final int hashType; + /** The number of keys recorded in the current Bloom filter. */ + protected int curKeys; + /** expected size of bloom filter matrix (used during reads) */ + protected int readMatrixSize; + /** The matrix of Bloom filters (contains bloom data only during writes). */ + protected ByteBloomFilter[] matrix; + + /** + * Normal read constructor. Loads bloom filter meta data. + * @param meta stored bloom meta data + * @throws IllegalArgumentException meta data is invalid + */ + public DynamicByteBloomFilter(ByteBuffer meta) + throws IllegalArgumentException { + int version = meta.getInt(); + if (version != VERSION) throw new IllegalArgumentException("Bad version"); + + this.keyInterval = meta.getInt(); + this.errorRate = meta.getFloat(); + this.hashType = meta.getInt(); + this.readMatrixSize = meta.getInt(); + this.curKeys = meta.getInt(); + + readSanityCheck(); + + this.matrix = new ByteBloomFilter[1]; + this.matrix[0] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); +} + + /** + * Normal write constructor. Note that this doesn't allocate bloom data by + * default. Instead, call allocBloom() before adding entries. + * @param bitSize The vector size of this filter. + * @param functionCount The number of hash function to consider. + * @param hashType type of the hashing function (see + * {@link org.apache.hadoop.util.hash.Hash}). + * @param keyInterval Maximum number of keys to record per Bloom filter row. + * @throws IllegalArgumentException The input parameters were invalid + */ + public DynamicByteBloomFilter(int keyInterval, float errorRate, int hashType) + throws IllegalArgumentException { + this.keyInterval = keyInterval; + this.errorRate = errorRate; + this.hashType = hashType; + this.curKeys = 0; + + if(keyInterval <= 0) { + throw new IllegalArgumentException("keyCount must be > 0"); + } + + this.matrix = new ByteBloomFilter[1]; + this.matrix[0] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); +} + + @Override + public void allocBloom() { + this.matrix[0].allocBloom(); + } + + void readSanityCheck() throws IllegalArgumentException { + if (this.curKeys <= 0) { + throw new IllegalArgumentException("last bloom's key count invalid"); + } + + if (this.readMatrixSize <= 0) { + throw new IllegalArgumentException("matrix size must be known"); + } + } + + @Override + public void add(byte []buf, int offset, int len) { + BloomFilter bf = getCurBloom(); + + if (bf == null) { + addRow(); + bf = matrix[matrix.length - 1]; + curKeys = 0; + } + + bf.add(buf, offset, len); + curKeys++; + } + + @Override + public void add(byte []buf) { + add(buf, 0, buf.length); + } + + /** + * Should only be used in tests when writing a bloom filter. + */ + boolean contains(byte [] buf) { + return contains(buf, 0, buf.length); + } + + /** + * Should only be used in tests when writing a bloom filter. + */ + boolean contains(byte [] buf, int offset, int length) { + for (int i = 0; i < matrix.length; i++) { + if (matrix[i].contains(buf, offset, length)) { + return true; + } + } + return false; + } + + @Override + public boolean contains(byte [] buf, ByteBuffer theBloom) { + return contains(buf, 0, buf.length, theBloom); + } + + @Override + public boolean contains(byte[] buf, int offset, int length, + ByteBuffer theBloom) { + if(offset + length > buf.length) { + return false; + } + + // current version assumes uniform size + int bytesPerBloom = this.matrix[0].getByteSize(); + + if(theBloom.limit() != bytesPerBloom * readMatrixSize) { + throw new IllegalArgumentException("Bloom does not match expected size"); + } + + ByteBuffer tmp = theBloom.duplicate(); + + // note: actually searching an array of blooms that have been serialized + for (int m = 0; m < readMatrixSize; ++m) { + tmp.position(m* bytesPerBloom); + tmp.limit(tmp.position() + bytesPerBloom); + boolean match = this.matrix[0].contains(buf, offset, length, tmp.slice()); + if (match) { + return true; + } + } + + // matched no bloom filters + return false; + } + + int bloomCount() { + return Math.max(this.matrix.length, this.readMatrixSize); + } + + @Override + public int getKeyCount() { + return (bloomCount()-1) * this.keyInterval + this.curKeys; + } + + @Override + public int getMaxKeys() { + return bloomCount() * this.keyInterval; + } + + @Override + public int getByteSize() { + return bloomCount() * this.matrix[0].getByteSize(); + } + + @Override + public void finalize() { + } + + /** + * Adds a new row to this dynamic Bloom filter. + */ + private void addRow() { + ByteBloomFilter[] tmp = new ByteBloomFilter[matrix.length + 1]; + + for (int i = 0; i < matrix.length; i++) { + tmp[i] = matrix[i]; + } + + tmp[tmp.length-1] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); + tmp[tmp.length-1].allocBloom(); + matrix = tmp; + } + + /** + * Returns the currently-unfilled row in the dynamic Bloom Filter array. + * @return BloomFilter The active standard Bloom filter. + * Null otherwise. + */ + private BloomFilter getCurBloom() { + if (curKeys >= keyInterval) { + return null; + } + + return matrix[matrix.length - 1]; + } + + @Override + public Writable getMetaWriter() { + return new MetaWriter(); + } + + @Override + public Writable getDataWriter() { + return new DataWriter(); + } + + private class MetaWriter implements Writable { + protected MetaWriter() {} + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Cant read with this class."); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(VERSION); + out.writeInt(keyInterval); + out.writeFloat(errorRate); + out.writeInt(hashType); + out.writeInt(matrix.length); + out.writeInt(curKeys); + } + } + + private class DataWriter implements Writable { + protected DataWriter() {} + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Cant read with this class."); + } + + @Override + public void write(DataOutput out) throws IOException { + for (int i = 0; i < matrix.length; ++i) { + matrix[i].writeBloom(out); + } + } + } +} diff --git a/core/src/main/java/org/apache/hadoop/hbase/util/Hash.java b/core/src/main/java/org/apache/hadoop/hbase/util/Hash.java index 9e1d9e2..0a533d9 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/util/Hash.java +++ b/core/src/main/java/org/apache/hadoop/hbase/util/Hash.java @@ -106,16 +106,29 @@ public abstract class Hash { * @return hash value */ public int hash(byte[] bytes, int initval) { - return hash(bytes, bytes.length, initval); + return hash(bytes, 0, bytes.length, initval); } /** * Calculate a hash using bytes from 0 to length, and * the provided seed value * @param bytes input bytes - * @param length length of the valid bytes to consider + * @param length length of the valid bytes after offset to consider * @param initval seed value * @return hash value */ - public abstract int hash(byte[] bytes, int length, int initval); + public int hash(byte[] bytes, int length, int initval) { + return hash(bytes, 0, length, initval); + } + + /** + * Calculate a hash using bytes from 0 to length, and + * the provided seed value + * @param bytes input bytes + * @param offset the offset into the array to start consideration + * @param length length of the valid bytes after offset to consider + * @param initval seed value + * @return hash value + */ + public abstract int hash(byte[] bytes, int offset, int length, int initval); } diff --git a/core/src/main/java/org/apache/hadoop/hbase/util/JenkinsHash.java b/core/src/main/java/org/apache/hadoop/hbase/util/JenkinsHash.java index 0c6c607..1e67371 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/util/JenkinsHash.java +++ b/core/src/main/java/org/apache/hadoop/hbase/util/JenkinsHash.java @@ -80,11 +80,11 @@ public class JenkinsHash extends Hash { */ @Override @SuppressWarnings("fallthrough") - public int hash(byte[] key, int nbytes, int initval) { + public int hash(byte[] key, int off, int nbytes, int initval) { int length = nbytes; long a, b, c; // We use longs because we don't have unsigned ints a = b = c = (0x00000000deadbeefL + length + initval) & INT_MASK; - int offset = 0; + int offset = off; for (; length > 12; offset += 12, length -= 12) { //noinspection PointlessArithmeticExpression a = (a + (key[offset + 0] & BYTE_MASK)) & INT_MASK; diff --git a/core/src/main/java/org/apache/hadoop/hbase/util/MurmurHash.java b/core/src/main/java/org/apache/hadoop/hbase/util/MurmurHash.java index fcf543e..085bf1e 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/util/MurmurHash.java +++ b/core/src/main/java/org/apache/hadoop/hbase/util/MurmurHash.java @@ -35,7 +35,7 @@ public class MurmurHash extends Hash { } @Override - public int hash(byte[] data, int length, int seed) { + public int hash(byte[] data, int offset, int length, int seed) { int m = 0x5bd1e995; int r = 24; @@ -44,7 +44,7 @@ public class MurmurHash extends Hash { int len_4 = length >> 2; for (int i = 0; i < len_4; i++) { - int i_4 = i << 2; + int i_4 = (i << 2) + offset; int k = data[i_4 + 3]; k = k << 8; k = k | (data[i_4 + 2] & 0xff); @@ -63,16 +63,17 @@ public class MurmurHash extends Hash { // avoid calculating modulo int len_m = len_4 << 2; int left = length - len_m; + int i_m = len_m + offset; if (left != 0) { if (left >= 3) { - h ^= data[len_m + 2] << 16; + h ^= data[i_m + 2] << 16; } if (left >= 2) { - h ^= data[len_m + 1] << 8; + h ^= data[i_m + 1] << 8; } if (left >= 1) { - h ^= data[len_m]; + h ^= data[i_m]; } h *= m; diff --git a/core/src/main/ruby/hbase/admin.rb b/core/src/main/ruby/hbase/admin.rb index 75490b7..04da078 100644 --- a/core/src/main/ruby/hbase/admin.rb +++ b/core/src/main/ruby/hbase/admin.rb @@ -334,7 +334,7 @@ module Hbase arg[HColumnDescriptor::BLOCKCACHE]? JBoolean.valueOf(arg[HColumnDescriptor::BLOCKCACHE]): HColumnDescriptor::DEFAULT_BLOCKCACHE, arg[HColumnDescriptor::BLOCKSIZE]? JInteger.valueOf(arg[HColumnDescriptor::BLOCKSIZE]): HColumnDescriptor::DEFAULT_BLOCKSIZE, arg[HColumnDescriptor::TTL]? JInteger.new(arg[HColumnDescriptor::TTL]): HColumnDescriptor::DEFAULT_TTL, - arg[HColumnDescriptor::BLOOMFILTER]? JBoolean.valueOf(arg[HColumnDescriptor::BLOOMFILTER]): HColumnDescriptor::DEFAULT_BLOOMFILTER, + arg[HColumnDescriptor::BLOOMFILTER]? arg[HColumnDescriptor::BLOOMFILTER]: HColumnDescriptor::DEFAULT_BLOOMFILTER) arg[HColumnDescriptor::REPLICATION_SCOPE]? JInteger.new(arg[REPLICATION_SCOPE]): HColumnDescriptor::DEFAULT_REPLICATION_SCOPE) end diff --git a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java index 83be097..e8c1d0c 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java @@ -194,14 +194,19 @@ public abstract class HBaseTestCase extends TestCase { HTableDescriptor htd = new HTableDescriptor(name); htd.addFamily(new HColumnDescriptor(fam1, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL)); + Integer.MAX_VALUE, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HConstants.REPLICATION_SCOPE_LOCAL)); htd.addFamily(new HColumnDescriptor(fam2, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL)); + Integer.MAX_VALUE, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HConstants.REPLICATION_SCOPE_LOCAL)); htd.addFamily(new HColumnDescriptor(fam3, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, Integer.MAX_VALUE, HConstants.FOREVER, - false, HConstants.REPLICATION_SCOPE_LOCAL)); + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HConstants.REPLICATION_SCOPE_LOCAL)); return htd; } diff --git a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index a6aca1d..f600104 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -333,7 +333,8 @@ public class HBaseTestingUtility { HColumnDescriptor.DEFAULT_IN_MEMORY, HColumnDescriptor.DEFAULT_BLOCKCACHE, Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, - false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); desc.addFamily(hcd); } (new HBaseAdmin(getConfiguration())).createTable(desc); @@ -359,7 +360,8 @@ public class HBaseTestingUtility { HColumnDescriptor.DEFAULT_IN_MEMORY, HColumnDescriptor.DEFAULT_BLOCKCACHE, Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, - false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); + HColumnDescriptor.DEFAULT_BLOOMFILTER, + HColumnDescriptor.DEFAULT_REPLICATION_SCOPE); desc.addFamily(hcd); i++; } diff --git a/core/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/core/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index 6b32b25..acb4fdc 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/core/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -29,12 +31,14 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.hfile.HFile.BlockIndex; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Writable; /** * test hfile features. @@ -170,7 +174,18 @@ public class TestHFile extends HBaseTestCase { private void writeNumMetablocks(Writer writer, int n) { for (int i = 0; i < n; i++) { - writer.appendMetaBlock("HFileMeta" + i, ("something to test" + i).getBytes()); + writer.appendMetaBlock("HFileMeta" + i, new Writable() { + private int val; + public Writable setVal(int val) { this.val = val; return this; } + + @Override + public void write(DataOutput out) throws IOException { + out.write(("something to test" + val).getBytes()); + } + + @Override + public void readFields(DataInput in) throws IOException { } + }.setVal(i)); } } @@ -180,10 +195,10 @@ public class TestHFile extends HBaseTestCase { private void readNumMetablocks(Reader reader, int n) throws IOException { for (int i = 0; i < n; i++) { - ByteBuffer b = reader.getMetaBlock("HFileMeta" + i); - byte [] found = Bytes.toBytes(b); - assertTrue("failed to match metadata", Arrays.equals( - ("something to test" + i).getBytes(), found)); + ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); + ByteBuffer expected = + ByteBuffer.wrap(("something to test" + i).getBytes()); + assertTrue("failed to match metadata", actual.compareTo(expected) == 0); } } @@ -227,7 +242,7 @@ public class TestHFile extends HBaseTestCase { fout.close(); Reader reader = new Reader(fs, mFile, null, false); reader.loadFileInfo(); - assertNull(reader.getMetaBlock("non-existant")); + assertNull(reader.getMetaBlock("non-existant", false)); } /** @@ -244,7 +259,7 @@ public class TestHFile extends HBaseTestCase { Path mFile = new Path(ROOT_DIR, "meta.tfile"); FSDataOutputStream fout = createFSOutput(mFile); Writer writer = new Writer(fout, minBlockSize, (Compression.Algorithm) null, - new RawComparator() { + new KeyComparator() { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java index 142c145..7428324 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanner.java @@ -68,7 +68,8 @@ public class TestScanner extends HBaseTestCase { TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), false, true, 8 * 1024, - HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL)); + HConstants.FOREVER, StoreFile.BloomType.NONE.toString(), + false, HConstants.REPLICATION_SCOPE_LOCAL)); } /** HRegionInfo for root region */ public static final HRegionInfo REGION_INFO = diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 9f240f2..d14a27f 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -134,8 +134,9 @@ public class TestStore extends TestCase { long seqid = f.getMaxSequenceId(); HBaseConfiguration c = new HBaseConfiguration(); FileSystem fs = FileSystem.get(c); - Writer w = StoreFile.getWriter(fs, storedir); - StoreFile.appendMetadata(w, seqid + 1); + StoreFile.Writer w = StoreFile.createWriter(fs, storedir, + StoreFile.DEFAULT_BLOCKSIZE_SMALL); + w.appendMetadata(seqid + 1, false); w.close(); this.store.close(); // Reopen it... should pick up two files diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index 0c6efde..55dfa86 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -21,10 +21,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; @@ -69,11 +72,11 @@ public class TestStoreFile extends HBaseTestCase { */ public void testBasicHalfMapFile() throws Exception { // Make up a directory hierarchy that has a regiondir and familyname. - HFile.Writer writer = StoreFile.getWriter(this.fs, - new Path(new Path(this.testDir, "regionname"), "familyname"), - 2 * 1024, null, null); + HFile.Writer writer = StoreFile.createWriter(this.fs, + new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024); writeStoreFile(writer); - checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf, false)); + checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf, + StoreFile.BloomType.NONE, false)); } /* @@ -109,11 +112,11 @@ public class TestStoreFile extends HBaseTestCase { Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname"); Path dir = new Path(storedir, "1234567890"); // Make a store file and write data to it. - HFile.Writer writer = StoreFile.getWriter(this.fs, dir, 8 * 1024, null, - null); + HFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024); writeStoreFile(writer); - StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, false); - HFile.Reader reader = hsf.getReader(); + StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, + StoreFile.BloomType.NONE, false); + HFile.Reader reader = hsf.createReader(); // Split on a row, not in middle of row. Midkey returned by reader // may be in middle of row. Create new one with empty column and // timestamp. @@ -123,10 +126,11 @@ public class TestStoreFile extends HBaseTestCase { byte [] finalRow = kv.getRow(); // Make a reference Path refPath = StoreFile.split(fs, dir, hsf, midRow, Range.top); - StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf, false); + StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf, + StoreFile.BloomType.NONE, false); // Now confirm that I can read from the reference and that it only gets // keys from top half of the file. - HFileScanner s = refHsf.getReader().getScanner(false, false); + HFileScanner s = refHsf.createReader().getScanner(false, false); for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) { ByteBuffer bb = s.getKey(); kv = KeyValue.createKeyValueFromKey(bb); @@ -140,7 +144,7 @@ public class TestStoreFile extends HBaseTestCase { private void checkHalfHFile(final StoreFile f) throws IOException { - byte [] midkey = f.getReader().midkey(); + byte [] midkey = f.createReader().midkey(); KeyValue midKV = KeyValue.createKeyValueFromKey(midkey); byte [] midRow = midKV.getRow(); // Create top split. @@ -159,8 +163,10 @@ public class TestStoreFile extends HBaseTestCase { Path bottomPath = StoreFile.split(this.fs, bottomDir, f, midRow, Range.bottom); // Make readers on top and bottom. - HFile.Reader top = new StoreFile(this.fs, topPath, true, conf, false).getReader(); - HFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader(); + HFile.Reader top = new StoreFile(this.fs, topPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); + HFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); ByteBuffer previous = null; LOG.info("Midkey: " + midKV.toString()); ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midkey); @@ -212,8 +218,10 @@ public class TestStoreFile extends HBaseTestCase { topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top); bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey, Range.bottom); - top = new StoreFile(this.fs, topPath, true, conf, false).getReader(); - bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader(); + top = new StoreFile(this.fs, topPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); + bottom = new StoreFile(this.fs, bottomPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); bottomScanner = bottom.getScanner(false, false); int count = 0; while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) || @@ -256,8 +264,10 @@ public class TestStoreFile extends HBaseTestCase { topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top); bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey, Range.bottom); - top = new StoreFile(this.fs, topPath, true, conf, false).getReader(); - bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader(); + top = new StoreFile(this.fs, topPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); + bottom = new StoreFile(this.fs, bottomPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); first = true; bottomScanner = bottom.getScanner(false, false); while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) || @@ -296,4 +306,138 @@ public class TestStoreFile extends HBaseTestCase { fs.delete(f.getPath(), true); } } -} \ No newline at end of file + + private static String ROOT_DIR = + System.getProperty("test.build.data", "/tmp/TestStoreFile"); + private static String localFormatter = "%010d"; + + public void testBloomFilter() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + conf.setFloat("io.hfile.bloom.error.rate", (float)0.01); + conf.setBoolean("io.hfile.bloom.enabled", true); + + // write the file + Path f = new Path(ROOT_DIR, getName()); + StoreFile.Writer writer = new StoreFile.Writer(fs, f, + StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, + conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000); + + long now = System.currentTimeMillis(); + for (int i = 0; i < 2000; i += 2) { + String row = String.format(localFormatter, Integer.valueOf(i)); + KeyValue kv = new KeyValue(row.getBytes(), "family:col".getBytes(), + now, "value".getBytes()); + writer.append(kv); + } + writer.close(); + + StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false); + reader.loadFileInfo(); + reader.loadBloomfilter(); + HFileScanner scanner = reader.getScanner(false, false); + + // check false positives rate + int falsePos = 0; + int falseNeg = 0; + for (int i = 0; i < 2000; i++) { + String row = String.format(localFormatter, Integer.valueOf(i)); + TreeSet columns = new TreeSet(); + columns.add("family:col".getBytes()); + + boolean exists = scanner.shouldSeek(row.getBytes(), columns); + if (i % 2 == 0) { + if (!exists) falseNeg++; + } else { + if (exists) falsePos++; + } + } + reader.close(); + fs.delete(f, true); + System.out.println("False negatives: " + falseNeg); + assertEquals(0, falseNeg); + System.out.println("False positives: " + falsePos); + assertTrue(falsePos < 2); + } + + public void testBloomTypes() throws Exception { + float err = (float) 0.01; + FileSystem fs = FileSystem.getLocal(conf); + conf.setFloat("io.hfile.bloom.error.rate", err); + conf.setBoolean("io.hfile.bloom.enabled", true); + + int rowCount = 50; + int colCount = 10; + int versions = 2; + + // run once using columns and once using rows + StoreFile.BloomType[] bt = + {StoreFile.BloomType.ROWCOL, StoreFile.BloomType.ROW}; + int[] expKeys = {rowCount*colCount, rowCount}; + // below line deserves commentary. it is expected bloom false positives + // column = rowCount*2*colCount inserts + // row-level = only rowCount*2 inserts, but failures will be magnified by + // 2nd for loop for every column (2*colCount) + float[] expErr = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err}; + + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(ROOT_DIR, getName()); + StoreFile.Writer writer = new StoreFile.Writer(fs, f, + StoreFile.DEFAULT_BLOCKSIZE_SMALL, + HFile.DEFAULT_COMPRESSION_ALGORITHM, + conf, KeyValue.COMPARATOR, bt[x], expKeys[x]); + + long now = System.currentTimeMillis(); + for (int i = 0; i < rowCount*2; i += 2) { // rows + for (int j = 0; j < colCount*2; j += 2) { // column qualifiers + String row = String.format(localFormatter, Integer.valueOf(i)); + String col = String.format(localFormatter, Integer.valueOf(j)); + for (int k= 0; k < versions; ++k) { // versions + KeyValue kv = new KeyValue(row.getBytes(), + ("family:col" + col).getBytes(), + now-k, Bytes.toBytes((long)-1)); + writer.append(kv); + } + } + } + writer.close(); + + StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false); + reader.loadFileInfo(); + reader.loadBloomfilter(); + HFileScanner scanner = reader.getScanner(false, false); + assertEquals(expKeys[x], reader.getBloomFilter().getKeyCount()); + + // check false positives rate + int falsePos = 0; + int falseNeg = 0; + for (int i = 0; i < rowCount*2; ++i) { // rows + for (int j = 0; j < colCount*2; ++j) { // column qualifiers + String row = String.format(localFormatter, Integer.valueOf(i)); + String col = String.format(localFormatter, Integer.valueOf(j)); + TreeSet columns = new TreeSet(); + columns.add(("col" + col).getBytes()); + + boolean exists = scanner.shouldSeek(row.getBytes(), columns); + boolean shouldRowExist = i % 2 == 0; + boolean shouldColExist = j % 2 == 0; + shouldColExist = shouldColExist || bt[x] == StoreFile.BloomType.ROW; + if (shouldRowExist && shouldColExist) { + if (!exists) falseNeg++; + } else { + if (exists) falsePos++; + } + } + } + reader.close(); + fs.delete(f, true); + System.out.println(bt[x].toString()); + System.out.println(" False negatives: " + falseNeg); + System.out.println(" False positives: " + falsePos); + assertEquals(0, falseNeg); + assertTrue(falsePos < 2*expErr[x]); + } + + } + +} diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java index 7a7ec33..0d5a17a 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java @@ -52,7 +52,8 @@ public class TestWideScanner extends HBaseTestCase { TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), false, true, 8 * 1024, - HConstants.FOREVER, false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE)); + HConstants.FOREVER, StoreFile.BloomType.NONE.toString(), + HColumnDescriptor.DEFAULT_REPLICATION_SCOPE)); } /** HRegionInfo for root region */ public static final HRegionInfo REGION_INFO = diff --git a/core/src/test/java/org/apache/hadoop/hbase/rest/model/TestColumnSchemaModel.java b/core/src/test/java/org/apache/hadoop/hbase/rest/model/TestColumnSchemaModel.java index 2c2ccc2..517b142 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/rest/model/TestColumnSchemaModel.java +++ b/core/src/test/java/org/apache/hadoop/hbase/rest/model/TestColumnSchemaModel.java @@ -33,7 +33,7 @@ public class TestColumnSchemaModel extends TestCase { protected static final String COLUMN_NAME = "testcolumn"; protected static final boolean BLOCKCACHE = true; protected static final int BLOCKSIZE = 16384; - protected static final boolean BLOOMFILTER = false; + protected static final String BLOOMFILTER = "none"; protected static final String COMPRESSION = "GZ"; protected static final boolean IN_MEMORY = false; protected static final int TTL = 86400; @@ -42,7 +42,7 @@ public class TestColumnSchemaModel extends TestCase { protected static final String AS_XML = ">2, b.getByteSize()); + int falsePositives = 0; + for (int i = 0; i < 25; ++i) { + if (b.contains(Bytes.toBytes(i))) { + if(i >= 12) falsePositives++; + } else { + assertFalse(i < 12); + } + } + assertTrue(falsePositives <= 1); + + // test: foldFactor > log(max/actual) + } + + public void testBloomPerf() throws Exception { + // add + float err = (float)0.01; + ByteBloomFilter b = new ByteBloomFilter(10*1000*1000, (float)err, Hash.MURMUR_HASH, 3); + b.allocBloom(); + long startTime = System.currentTimeMillis(); + int origSize = b.getByteSize(); + for (int i = 0; i < 1*1000*1000; ++i) { + b.add(Bytes.toBytes(i)); + } + long endTime = System.currentTimeMillis(); + System.out.println("Total Add time = " + (endTime - startTime) + "ms"); + + // fold + startTime = System.currentTimeMillis(); + b.finalize(); + endTime = System.currentTimeMillis(); + System.out.println("Total Fold time = " + (endTime - startTime) + "ms"); + assertTrue(origSize >= b.getByteSize()<<3); + + // test + startTime = System.currentTimeMillis(); + int falsePositives = 0; + for (int i = 0; i < 2*1000*1000; ++i) { + + if (b.contains(Bytes.toBytes(i))) { + if(i >= 1*1000*1000) falsePositives++; + } else { + assertFalse(i < 1*1000*1000); + } + } + endTime = System.currentTimeMillis(); + System.out.println("Total Contains time = " + (endTime - startTime) + "ms"); + System.out.println("False Positive = " + falsePositives); + assertTrue(falsePositives <= (1*1000*1000)*err); + + // test: foldFactor > log(max/actual) + } + + public void testDynamicBloom() throws Exception { + int keyInterval = 1000; + float err = (float)0.01; + BitSet valid = new BitSet(keyInterval*4); + + DynamicByteBloomFilter bf1 = new DynamicByteBloomFilter(keyInterval, err, + Hash.MURMUR_HASH); + bf1.allocBloom(); + + for (int i = 0; i < keyInterval*4; ++i) { // add + if (Math.random() > 0.5) { + bf1.add(Bytes.toBytes(i)); + valid.set(i); + } + } + assertTrue(2 <= bf1.bloomCount() && bf1.bloomCount() <= 3); + + // test serialization/deserialization + ByteArrayOutputStream metaOut = new ByteArrayOutputStream(); + ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); + bf1.getMetaWriter().write(new DataOutputStream(metaOut)); + bf1.getDataWriter().write(new DataOutputStream(dataOut)); + ByteBuffer bb = ByteBuffer.wrap(dataOut.toByteArray()); + DynamicByteBloomFilter newBf1 = new DynamicByteBloomFilter( + ByteBuffer.wrap(metaOut.toByteArray())); + + int falsePositives = 0; + for (int i = 0; i < keyInterval*4; ++i) { // check + if (newBf1.contains(Bytes.toBytes(i), bb)) { + if (!valid.get(i)) ++falsePositives; + } else { + if (valid.get(i)) { + assert false; + } + } + } + + // note that actualErr = err * bloomCount + // error rate should be roughly: (keyInterval*2)*(err*2), allow some tolerance + System.out.println("False positives: " + falsePositives); + assertTrue(falsePositives <= (keyInterval*5)*err); + } + +}