Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 942269) +++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -332,7 +332,7 @@ HColumnDescriptor.DEFAULT_IN_MEMORY, HColumnDescriptor.DEFAULT_BLOCKCACHE, Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, - false); + HColumnDescriptor.DEFAULT_BLOOMFILTER); desc.addFamily(hcd); } (new HBaseAdmin(getConfiguration())).createTable(desc); @@ -358,7 +358,7 @@ HColumnDescriptor.DEFAULT_IN_MEMORY, HColumnDescriptor.DEFAULT_BLOCKCACHE, Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, - false); + HColumnDescriptor.DEFAULT_BLOOMFILTER); desc.addFamily(hcd); i++; } Index: src/test/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestCase.java (revision 942269) +++ src/test/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -197,13 +197,16 @@ HTableDescriptor htd = new HTableDescriptor(name); htd.addFamily(new HColumnDescriptor(fam1, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, false)); + Integer.MAX_VALUE, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_BLOOMFILTER)); htd.addFamily(new HColumnDescriptor(fam2, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, false)); + Integer.MAX_VALUE, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_BLOOMFILTER)); htd.addFamily(new HColumnDescriptor(fam3, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, false)); + Integer.MAX_VALUE, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_BLOOMFILTER)); return htd; } Index: src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java (revision 942269) +++ src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java (working copy) @@ -32,7 +32,7 @@ TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), false, true, 8 * 1024, - HConstants.FOREVER, false)); + HConstants.FOREVER, StoreFile.BloomType.NONE.toString())); } /** HRegionInfo for root region */ public static final HRegionInfo REGION_INFO = Index: src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java (revision 942269) +++ src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java (working copy) @@ -74,7 +74,7 @@ TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), false, true, 8 * 1024, - HConstants.FOREVER, false)); + HConstants.FOREVER, StoreFile.BloomType.NONE.toString())); } /** HRegionInfo for root region */ public static final HRegionInfo REGION_INFO = Index: src/test/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 942269) +++ src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -131,8 +131,9 @@ long seqid = f.getMaxSequenceId(); HBaseConfiguration c = new HBaseConfiguration(); FileSystem fs = FileSystem.get(c); - Writer w = StoreFile.getWriter(fs, storedir); - StoreFile.appendMetadata(w, seqid + 1); + StoreFile.Writer w = StoreFile.createWriter(fs, storedir, + StoreFile.DEFAULT_BLOCKSIZE_SMALL); + w.appendMetadata(seqid + 1, false); w.close(); this.store.close(); // Reopen it... should pick up two files Index: src/test/org/apache/hadoop/hbase/regionserver/TestStoreFile.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (revision 942269) +++ src/test/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (working copy) @@ -21,7 +21,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HStoreKey; @@ -34,6 +36,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.TreeSet; /** * Test HStoreFile @@ -70,11 +73,11 @@ */ public void testBasicHalfMapFile() throws Exception { // Make up a directory hierarchy that has a regiondir and familyname. - HFile.Writer writer = StoreFile.getWriter(this.fs, - new Path(new Path(this.testDir, "regionname"), "familyname"), - 2 * 1024, null, null); + HFile.Writer writer = StoreFile.createWriter(this.fs, + new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024); writeStoreFile(writer); - checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf, false)); + checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf, + StoreFile.BloomType.NONE, false)); } /* @@ -110,11 +113,11 @@ Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname"); Path dir = new Path(storedir, "1234567890"); // Make a store file and write data to it. - HFile.Writer writer = StoreFile.getWriter(this.fs, dir, 8 * 1024, null, - null); + HFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024); writeStoreFile(writer); - StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, false); - HFile.Reader reader = hsf.getReader(); + StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, + StoreFile.BloomType.NONE, false); + HFile.Reader reader = hsf.createReader(); // Split on a row, not in middle of row. Midkey returned by reader // may be in middle of row. Create new one with empty column and // timestamp. @@ -124,10 +127,11 @@ byte [] finalKey = hsk.getRow(); // Make a reference Path refPath = StoreFile.split(fs, dir, hsf, reader.midkey(), Range.top); - StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf, false); + StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf, + StoreFile.BloomType.NONE, false); // Now confirm that I can read from the reference and that it only gets // keys from top half of the file. - HFileScanner s = refHsf.getReader().getScanner(false, false); + HFileScanner s = refHsf.createReader().getScanner(false, false); for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) { ByteBuffer bb = s.getKey(); hsk = HStoreKey.create(bb.array(), bb.arrayOffset(), bb.limit()); @@ -141,7 +145,7 @@ private void checkHalfHFile(final StoreFile f) throws IOException { - byte [] midkey = f.getReader().midkey(); + byte [] midkey = f.createReader().midkey(); // Create top split. Path topDir = Store.getStoreHomedir(this.testDir, 1, Bytes.toBytes(f.getPath().getParent().getName())); @@ -158,8 +162,10 @@ Path bottomPath = StoreFile.split(this.fs, bottomDir, f, midkey, Range.bottom); // Make readers on top and bottom. - HFile.Reader top = new StoreFile(this.fs, topPath, true, conf, false).getReader(); - HFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader(); + HFile.Reader top = new StoreFile(this.fs, topPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); + HFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); ByteBuffer previous = null; LOG.info("Midkey: " + Bytes.toString(midkey)); byte [] midkeyBytes = new HStoreKey(midkey).getBytes(); @@ -212,8 +218,10 @@ topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top); bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey, Range.bottom); - top = new StoreFile(this.fs, topPath, true, conf, false).getReader(); - bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader(); + top = new StoreFile(this.fs, topPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); + bottom = new StoreFile(this.fs, bottomPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); bottomScanner = bottom.getScanner(false, false); int count = 0; while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) || @@ -256,8 +264,10 @@ topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top); bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey, Range.bottom); - top = new StoreFile(this.fs, topPath, true, conf, false).getReader(); - bottom = new StoreFile(this.fs, bottomPath, true, conf, false).getReader(); + top = new StoreFile(this.fs, topPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); + bottom = new StoreFile(this.fs, bottomPath, true, conf, + StoreFile.BloomType.NONE, false).createReader(); first = true; bottomScanner = bottom.getScanner(false, false); while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) || @@ -296,4 +306,138 @@ fs.delete(f.getPath(), true); } } + + private static String ROOT_DIR = + System.getProperty("test.build.data", "/tmp/TestStoreFile"); + private static String localFormatter = "%010d"; + + public void testBloomFilter() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + conf.setFloat("io.hfile.bloom.error.rate", (float)0.01); + conf.setBoolean("io.hfile.bloom.enabled", true); + + // write the file + Path f = new Path(ROOT_DIR, getName()); + StoreFile.Writer writer = new StoreFile.Writer(fs, f, + StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, + conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000); + + long now = System.currentTimeMillis(); + for (int i = 0; i < 2000; i += 2) { + String row = String.format(localFormatter, Integer.valueOf(i)); + KeyValue kv = new KeyValue(row.getBytes(), "family:col".getBytes(), + now, "value".getBytes()); + writer.append(kv); + } + writer.close(); + + StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false); + reader.loadFileInfo(); + reader.loadBloomfilter(); + HFileScanner scanner = reader.getScanner(false, false); + + // check false positives rate + int falsePos = 0; + int falseNeg = 0; + for (int i = 0; i < 2000; i++) { + String row = String.format(localFormatter, Integer.valueOf(i)); + TreeSet columns = new TreeSet(); + columns.add("family:col".getBytes()); + + boolean exists = scanner.shouldSeek(row.getBytes(), columns); + if (i % 2 == 0) { + if (!exists) falseNeg++; + } else { + if (exists) falsePos++; + } + } + reader.close(); + fs.delete(f, true); + System.out.println("False negatives: " + falseNeg); + assertEquals(0, falseNeg); + System.out.println("False positives: " + falsePos); + assertTrue(falsePos < 2); + } + + public void testBloomTypes() throws Exception { + float err = (float) 0.01; + FileSystem fs = FileSystem.getLocal(conf); + conf.setFloat("io.hfile.bloom.error.rate", err); + conf.setBoolean("io.hfile.bloom.enabled", true); + + int rowCount = 50; + int colCount = 10; + int versions = 2; + + // run once using columns and once using rows + StoreFile.BloomType[] bt = + {StoreFile.BloomType.ROWCOL, StoreFile.BloomType.ROW}; + int[] expKeys = {rowCount*colCount, rowCount}; + // below line deserves commentary. it is expected bloom false positives + // column = rowCount*2*colCount inserts + // row-level = only rowCount*2 inserts, but failures will be magnified by + // 2nd for loop for every column (2*colCount) + float[] expErr = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err}; + + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(ROOT_DIR, getName()); + StoreFile.Writer writer = new StoreFile.Writer(fs, f, + StoreFile.DEFAULT_BLOCKSIZE_SMALL, + HFile.DEFAULT_COMPRESSION_ALGORITHM, + conf, KeyValue.COMPARATOR, bt[x], expKeys[x]); + + long now = System.currentTimeMillis(); + for (int i = 0; i < rowCount*2; i += 2) { // rows + for (int j = 0; j < colCount*2; j += 2) { // column qualifiers + String row = String.format(localFormatter, Integer.valueOf(i)); + String col = String.format(localFormatter, Integer.valueOf(j)); + for (int k= 0; k < versions; ++k) { // versions + KeyValue kv = new KeyValue(row.getBytes(), + ("family:col" + col).getBytes(), + now-k, Bytes.toBytes((long)-1)); + writer.append(kv); + } + } + } + writer.close(); + + StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false); + reader.loadFileInfo(); + reader.loadBloomfilter(); + HFileScanner scanner = reader.getScanner(false, false); + assertEquals(expKeys[x], reader.getBloomFilter().getKeyCount()); + + // check false positives rate + int falsePos = 0; + int falseNeg = 0; + for (int i = 0; i < rowCount*2; ++i) { // rows + for (int j = 0; j < colCount*2; ++j) { // column qualifiers + String row = String.format(localFormatter, Integer.valueOf(i)); + String col = String.format(localFormatter, Integer.valueOf(j)); + TreeSet columns = new TreeSet(); + columns.add(("col" + col).getBytes()); + + boolean exists = scanner.shouldSeek(row.getBytes(), columns); + boolean shouldRowExist = i % 2 == 0; + boolean shouldColExist = j % 2 == 0; + shouldColExist = shouldColExist || bt[x] == StoreFile.BloomType.ROW; + if (shouldRowExist && shouldColExist) { + if (!exists) falseNeg++; + } else { + if (exists) falsePos++; + } + } + } + reader.close(); + fs.delete(f, true); + System.out.println(bt[x].toString()); + System.out.println(" False negatives: " + falseNeg); + System.out.println(" False positives: " + falsePos); + assertEquals(0, falseNeg); + assertTrue(falsePos < 2*expErr[x]); + } + + } + } \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java =================================================================== --- src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java (revision 942269) +++ src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java (working copy) @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -29,12 +31,14 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.hfile.HFile.BlockIndex; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Writable; /** * test hfile features. @@ -170,7 +174,18 @@ private void writeNumMetablocks(Writer writer, int n) { for (int i = 0; i < n; i++) { - writer.appendMetaBlock("HFileMeta" + i, ("something to test" + i).getBytes()); + writer.appendMetaBlock("HFileMeta" + i, new Writable() { + private int val; + public Writable setVal(int val) { this.val = val; return this; } + + @Override + public void write(DataOutput out) throws IOException { + out.write(("something to test" + val).getBytes()); + } + + @Override + public void readFields(DataInput in) throws IOException { } + }.setVal(i)); } } @@ -180,10 +195,10 @@ private void readNumMetablocks(Reader reader, int n) throws IOException { for (int i = 0; i < n; i++) { - ByteBuffer b = reader.getMetaBlock("HFileMeta" + i); - byte [] found = Bytes.toBytes(b); - assertTrue("failed to match metadata", Arrays.equals( - ("something to test" + i).getBytes(), found)); + ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); + ByteBuffer expected = + ByteBuffer.wrap(("something to test" + i).getBytes()); + assertTrue("failed to match metadata", actual.compareTo(expected) == 0); } } @@ -227,7 +242,7 @@ fout.close(); Reader reader = new Reader(fs, mFile, null, false); reader.loadFileInfo(); - assertNull(reader.getMetaBlock("non-existant")); + assertNull(reader.getMetaBlock("non-existant", false)); } /** @@ -244,12 +259,11 @@ Path mFile = new Path(ROOT_DIR, "meta.tfile"); FSDataOutputStream fout = createFSOutput(mFile); Writer writer = new Writer(fout, minBlockSize, (Compression.Algorithm) null, - new RawComparator() { + new KeyComparator() { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -Bytes.compareTo(b1, s1, l1, b2, s2, l2); - } @Override public int compare(byte[] o1, byte[] o2) { Index: src/test/org/apache/hadoop/hbase/client/TestClient.java =================================================================== --- src/test/org/apache/hadoop/hbase/client/TestClient.java (revision 942269) +++ src/test/org/apache/hadoop/hbase/client/TestClient.java (working copy) @@ -2866,7 +2866,8 @@ HColumnDescriptor.DEFAULT_COMPRESSION, HColumnDescriptor.DEFAULT_IN_MEMORY, HColumnDescriptor.DEFAULT_BLOCKCACHE, - Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, false); + Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER); desc.addFamily(hcd); } HBaseAdmin admin = new HBaseAdmin(conf); @@ -2884,7 +2885,8 @@ HColumnDescriptor.DEFAULT_COMPRESSION, HColumnDescriptor.DEFAULT_IN_MEMORY, HColumnDescriptor.DEFAULT_BLOCKCACHE, - Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, false); + Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER); desc.addFamily(hcd); i++; } Index: src/contrib/stargate/src/test/org/apache/hadoop/hbase/stargate/model/TestColumnSchemaModel.java =================================================================== --- src/contrib/stargate/src/test/org/apache/hadoop/hbase/stargate/model/TestColumnSchemaModel.java (revision 942269) +++ src/contrib/stargate/src/test/org/apache/hadoop/hbase/stargate/model/TestColumnSchemaModel.java (working copy) @@ -33,7 +33,7 @@ protected static final String COLUMN_NAME = "testcolumn"; protected static final boolean BLOCKCACHE = true; protected static final int BLOCKSIZE = 16384; - protected static final boolean BLOOMFILTER = false; + protected static final String BLOOMFILTER = "none"; protected static final String COMPRESSION = "GZ"; protected static final boolean IN_MEMORY = false; protected static final int TTL = 86400; @@ -42,7 +42,7 @@ protected static final String AS_XML = "[a-zA-Z_0-9] and does not @@ -204,7 +207,7 @@ public HColumnDescriptor(final byte [] familyName, final int maxVersions, final String compression, final boolean inMemory, final boolean blockCacheEnabled, - final int timeToLive, final boolean bloomFilter) { + final int timeToLive, final String bloomFilter) { this(familyName, maxVersions, compression, inMemory, blockCacheEnabled, DEFAULT_BLOCKSIZE, timeToLive, bloomFilter); } @@ -224,7 +227,7 @@ * @param maxValueLength Restrict values to <= this value (UNSUPPORTED) * @param timeToLive Time-to-live of cell contents, in seconds * (use HConstants.FOREVER for unlimited TTL) - * @param bloomFilter Enable the specified bloom filter for this column + * @param bloomFilter Bloom filter type for this column * * @throws IllegalArgumentException if passed a family name that is made of * other than 'word' characters: i.e. [a-zA-Z_0-9] and does not @@ -236,7 +239,7 @@ // final String compression, final boolean inMemory, // final boolean blockCacheEnabled, final int blocksize, // final int maxValueLength, -// final int timeToLive, final boolean bloomFilter) { +// final int timeToLive, final String bloomFilter) { // this(familyName, maxVersions, compression, inMemory, blockCacheEnabled, // blocksize, timeToLive, bloomFilter); // } @@ -253,7 +256,7 @@ * @param blocksize * @param timeToLive Time-to-live of cell contents, in seconds * (use HConstants.FOREVER for unlimited TTL) - * @param bloomFilter Enable the specified bloom filter for this column + * @param bloomFilter Bloom filter type for this column * * @throws IllegalArgumentException if passed a family name that is made of * other than 'word' characters: i.e. [a-zA-Z_0-9] and does not @@ -263,7 +266,7 @@ public HColumnDescriptor(final byte [] familyName, final int maxVersions, final String compression, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, - final int timeToLive, final boolean bloomFilter) { + final int timeToLive, final String bloomFilter) { this.name = stripColon(familyName); isLegalFamilyName(this.name); @@ -278,7 +281,8 @@ setTimeToLive(timeToLive); setCompressionType(Compression.Algorithm. valueOf(compression.toUpperCase())); - setBloomfilter(bloomFilter); + setBloomFilterType(StoreFile.BloomType. + valueOf(bloomFilter.toUpperCase())); setBlocksize(blocksize); } @@ -510,21 +514,27 @@ setValue(BLOCKCACHE, Boolean.toString(blockCacheEnabled)); } + /** @return bloom filter type being used for the column family */ + public StoreFile.BloomType getBloomFilter() { + String n = getValue(BLOOMFILTER); + if (n == null) { + n = DEFAULT_BLOOMFILTER; + } + return StoreFile.BloomType.valueOf(n.toUpperCase()); + } + /** - * @return true if a bloom filter is enabled + * @return bloom filter type used for new StoreFiles in ColumnFamily */ - public boolean isBloomfilter() { - String value = getValue(BLOOMFILTER); - if (value != null) - return Boolean.valueOf(value).booleanValue(); - return DEFAULT_BLOOMFILTER; + public StoreFile.BloomType getBloomFilterType() { + return getBloomFilter(); } /** - * @param onOff Enable/Disable bloom filter + * @param toggle bloom filter type */ - public void setBloomfilter(final boolean onOff) { - setValue(BLOOMFILTER, Boolean.toString(onOff)); + public void setBloomFilterType(final StoreFile.BloomType bt) { + setValue(BLOOMFILTER, bt.toString()); } /** @@ -550,10 +560,6 @@ values.entrySet()) { String key = Bytes.toString(e.getKey().get()); String value = Bytes.toString(e.getValue().get()); - if (key != null && key.toUpperCase().equals(BLOOMFILTER)) { - // Don't emit bloomfilter. Its not working. - continue; - } s.append(", "); s.append(key); s.append(" => '"); @@ -613,8 +619,8 @@ int ordinal = in.readInt(); setCompressionType(Compression.Algorithm.values()[ordinal]); setInMemory(in.readBoolean()); - setBloomfilter(in.readBoolean()); - if (isBloomfilter() && version < 5) { + setBloomFilterType(in.readBoolean() ? BloomType.ROW : BloomType.NONE); + if (getBloomFilterType() != BloomType.NONE && version < 5) { // If a bloomFilter is enabled and the column descriptor is less than // version 5, we need to skip over it to read the rest of the column // descriptor. There are no BloomFilterDescriptors written to disk for @@ -630,7 +636,7 @@ setTimeToLive(in.readInt()); } } else { - // version 7+ + // version 6+ this.name = Bytes.readByteArray(in); this.values.clear(); int numValues = in.readInt(); @@ -639,6 +645,15 @@ ImmutableBytesWritable value = new ImmutableBytesWritable(); key.readFields(in); value.readFields(in); + + // in version 8, the BloomFilter setting changed from bool to enum + if (version < 8 && Bytes.toString(key.get()).equals(BLOOMFILTER)) { + value.set(Bytes.toBytes( + Boolean.getBoolean(Bytes.toString(value.get())) + ? BloomType.ROW.toString() + : BloomType.NONE.toString())); + } + values.put(key, value); } if (version == 6) { Index: src/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java =================================================================== --- src/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java (working copy) @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; import org.apache.hadoop.hbase.thrift.generated.TCell; @@ -49,17 +51,15 @@ throws IllegalArgument { Compression.Algorithm comp = Compression.getCompressionAlgorithmByName(in.compression.toLowerCase()); - boolean bloom = false; - if (in.bloomFilterType.compareTo("NONE") != 0) { - bloom = true; - } + StoreFile.BloomType bt = + BloomType.valueOf(in.bloomFilterType); if (in.name == null || in.name.length <= 0) { throw new IllegalArgument("column name is empty"); } HColumnDescriptor col = new HColumnDescriptor(in.name, in.maxVersions, comp.getName(), in.inMemory, in.blockCacheEnabled, - in.timeToLive, bloom); + in.timeToLive, bt.toString()); return col; } @@ -78,7 +78,7 @@ col.compression = in.getCompression().toString(); col.inMemory = in.isInMemory(); col.blockCacheEnabled = in.isBlockCacheEnabled(); - col.bloomFilterType = Boolean.toString(in.isBloomfilter()); + col.bloomFilterType = in.getBloomFilterType().toString(); return col; } Index: src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -61,10 +61,10 @@ store.versionsToReturn(scan.getMaxVersions())); this.isGet = scan.isGetScan(); - List scanners = getScanners(); + // pass columns = try to filter out unnecessary ScanFiles + List scanners = getScanners(scan, columns); // Seek all scanners to the initial key - // TODO if scan.isGetScan, use bloomfilters to skip seeking for(KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } @@ -132,6 +132,27 @@ return scanners; } + /* + * @return List of scanners to seek, possibly filtered by StoreFile. + */ + private List getScanners(Scan scan, + final NavigableSet columns) { + List hfScanners = getStoreFileScanners(); + List scanners = + new ArrayList(hfScanners.size()+1); + for(HFileScanner hfs : hfScanners) { + if (isGet && !hfs.shouldSeek(scan.getStartRow(), columns)) { + continue; // exclude this hfs + } + scanners.add(new StoreFileScanner(hfs)); + } + KeyValueScanner [] memstorescanners = this.store.memstore.getScanners(); + for (int i = memstorescanners.length - 1; i >= 0; i--) { + scanners.add(memstorescanners[i]); + } + return scanners; + } + public synchronized KeyValue peek() { return this.heap.peek(); } Index: src/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -99,7 +99,7 @@ private final HRegion region; private final HColumnDescriptor family; final FileSystem fs; - private final HBaseConfiguration conf; + final HBaseConfiguration conf; // ttl in milliseconds. protected long ttl; private long majorCompactionTime; @@ -142,7 +142,6 @@ // Comparing KeyValues final KeyValue.KVComparator comparator; - final KeyValue.KVComparator comparatorIgnoringType; /** * Constructor @@ -173,7 +172,6 @@ this.blocksize = family.getBlocksize(); this.compression = family.getCompression(); this.comparator = info.getComparator(); - this.comparatorIgnoringType = this.comparator.getComparatorIgnoringType(); // getTimeToLive returns ttl in seconds. Convert to milliseconds. this.ttl = family.getTimeToLive(); if (ttl == HConstants.FOREVER) { @@ -421,7 +419,9 @@ } StoreFile curfile = null; try { - curfile = new StoreFile(fs, p, blockcache, this.conf, this.inMemory); + curfile = new StoreFile(fs, p, blockcache, this.conf, + this.family.getBloomFilterType(), this.inMemory); + curfile.createReader(); } catch (IOException ioe) { LOG.warn("Failed open of " + p + "; presumption is that file was " + "corrupted at flush and lost edits picked up by commit log replay. " + @@ -498,7 +498,7 @@ // Clear so metrics doesn't find them. this.storefiles.clear(); for (StoreFile f: result) { - f.close(); + f.closeReader(); } LOG.debug("closed " + this.storeNameStr); return result; @@ -539,7 +539,7 @@ private StoreFile internalFlushCache(final SortedSet set, final long logCacheFlushId) throws IOException { - HFile.Writer writer = null; + StoreFile.Writer writer = null; long flushed = 0; // Don't flush if there are no entries. if (set.size() == 0) { @@ -551,7 +551,7 @@ // if we fail. synchronized (flushLock) { // A. Write the map out to the disk - writer = getWriter(); + writer = createWriter(this.homedir, set.size()); int entries = 0; try { for (KeyValue kv: set) { @@ -564,13 +564,13 @@ } finally { // Write out the log sequence number that corresponds to this output // hfile. The hfile is current up to and including logCacheFlushId. - StoreFile.appendMetadata(writer, logCacheFlushId); + writer.appendMetadata(logCacheFlushId, false); writer.close(); } } StoreFile sf = new StoreFile(this.fs, writer.getPath(), blockcache, - this.conf, this.inMemory); - Reader r = sf.getReader(); + this.conf, this.family.getBloomFilterType(), this.inMemory); + Reader r = sf.createReader(); this.storeSize += r.length(); if(LOG.isDebugEnabled()) { LOG.debug("Added " + sf + ", entries=" + r.getEntries() + @@ -582,22 +582,16 @@ return sf; } - /** - * @return Writer for this store. - * @throws IOException - */ - HFile.Writer getWriter() throws IOException { - return getWriter(this.homedir); - } - /* * @return Writer for this store. * @param basedir Directory to put writer in. * @throws IOException */ - private HFile.Writer getWriter(final Path basedir) throws IOException { - return StoreFile.getWriter(this.fs, basedir, this.blocksize, - this.compression, this.comparator.getRawComparator()); + private StoreFile.Writer createWriter(final Path basedir, int maxKeyCount) + throws IOException { + return StoreFile.createWriter(this.fs, basedir, this.blocksize, + this.compression, this.comparator, this.conf, + this.family.getBloomFilterType(), maxKeyCount); } /* @@ -905,7 +899,7 @@ // output to writer: for (KeyValue kv : kvs) { if (writer == null) { - writer = getWriter(this.regionCompactionDir); + writer = createWriter(this.regionCompactionDir, maxKeyCount); } writer.append(kv); } @@ -920,7 +914,7 @@ MinorCompactingStoreScanner scanner = null; try { scanner = new MinorCompactingStoreScanner(this, scanners); - writer = getWriter(this.regionCompactionDir); + writer = createWriter(this.regionCompactionDir, maxKeyCount); while (scanner.next(writer)) { // Nothing to do } @@ -931,7 +925,7 @@ } } finally { if (writer != null) { - StoreFile.appendMetadata(writer, maxId, majorCompaction); + writer.appendMetadata(maxId, majorCompaction); writer.close(); } } @@ -975,7 +969,9 @@ LOG.error("Failed move of compacted file " + compactedFile.getPath(), e); return null; } - result = new StoreFile(this.fs, p, blockcache, this.conf, this.inMemory); + result = new StoreFile(this.fs, p, blockcache, this.conf, + this.family.getBloomFilterType(), this.inMemory); + result.createReader(); } this.lock.writeLock().lock(); try { @@ -1005,7 +1001,7 @@ notifyChangedReadersObservers(); // Finally, delete old store files. for (StoreFile hsf: compactedFiles) { - hsf.delete(); + hsf.deleteReader(); } } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); @@ -1564,7 +1560,7 @@ } public static final long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + + ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN + ClassSize.align(ClassSize.ARRAY)); Index: src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -23,7 +23,12 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.Arrays; import java.util.Map; +import java.util.SortedSet; import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; @@ -31,29 +36,35 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.HalfHFileReader; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.util.BloomFilter; +import org.apache.hadoop.hbase.util.ByteBloomFilter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.util.StringUtils; /** * A Store data file. Stores usually have one or more of these files. They * are produced by flushing the memstore to disk. To - * create, call {@link #getWriter(FileSystem, Path)} and append data. Be + * create, call {@link #createWriter(FileSystem, Path, int)} and append data. Be * sure to add any metadata before calling close on the Writer * (Use the appendMetadata convenience methods). On close, a StoreFile is * sitting in the Filesystem. To refer to it, create a StoreFile instance - * passing filesystem and path. To read, call {@link #getReader()}. + * passing filesystem and path. To read, call {@link #createReader()}. *

StoreFiles may also reference store files in another Store. */ public class StoreFile implements HConstants { @@ -65,7 +76,7 @@ // Make default block size for StoreFiles 8k while testing. TODO: FIX! // Need to make it 8k for testing. - private static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024; + public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024; private final FileSystem fs; // This file's path. @@ -80,16 +91,23 @@ private boolean inMemory; // Keys for metadata stored in backing HFile. - private static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); + /** Constant for the max sequence ID meta */ + public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); // Set when we obtain a Reader. private long sequenceid = -1; - private static final byte [] MAJOR_COMPACTION_KEY = + /** Constant for major compaction meta */ + public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); // If true, this file was product of a major compaction. Its then set // whenever you get a Reader. private AtomicBoolean majorCompaction = null; + static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META"; + static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA"; + static final byte[] BLOOM_FILTER_TYPE_KEY = + Bytes.toBytes("BLOOM_FILTER_TYPE"); + /* * Regex that will work for straight filenames and for reference names. * If reference, then the regex has more than just one group. Group 1 is @@ -98,11 +116,12 @@ private static final Pattern REF_NAME_PARSER = Pattern.compile("^(\\d+)(?:\\.(.+))?$"); - private volatile HFile.Reader reader; + private volatile StoreFile.Reader reader; // Used making file ids. private final static Random rand = new Random(); private final HBaseConfiguration conf; + private final BloomType bloomType; /** * Constructor, loads a reader and it's indices, etc. May allocate a @@ -112,10 +131,12 @@ * @param p The path of the file. * @param blockcache true if the block cache is enabled. * @param conf The current configuration. + * @param bt The bloom type to use for this store file * @throws IOException When opening the reader fails. */ StoreFile(final FileSystem fs, final Path p, final boolean blockcache, - final HBaseConfiguration conf, final boolean inMemory) + final HBaseConfiguration conf, final BloomType bt, + final boolean inMemory) throws IOException { this.conf = conf; this.fs = fs; @@ -126,7 +147,13 @@ this.reference = Reference.read(fs, p); this.referencePath = getReferredToFile(this.path); } - this.reader = open(); + // ignore if the column family config says "no bloom filter" + // even if there is one in the hfile. + if (conf.getBoolean("io.hfile.bloom.enabled", true)) { + this.bloomType = bt; + } else { + this.bloomType = BloomType.NONE; + } } /** @@ -255,9 +282,9 @@ * Opens reader on this store file. Called by Constructor. * @return Reader for the store file. * @throws IOException - * @see #close() + * @see #closeReader() */ - protected HFile.Reader open() + private StoreFile.Reader open() throws IOException { if (this.reader != null) { throw new IllegalAccessError("Already open"); @@ -266,7 +293,7 @@ this.reader = new HalfHFileReader(this.fs, this.referencePath, getBlockCache(), this.reference); } else { - this.reader = new Reader(this.fs, this.path, getBlockCache(), + this.reader = new StoreFile.Reader(this.fs, this.path, getBlockCache(), this.inMemory); } // Load up indices and fileinfo. @@ -296,44 +323,59 @@ this.majorCompaction.set(mc); } } + + if (this.bloomType != BloomType.NONE) { + this.reader.loadBloomfilter(); + } - // TODO read in bloom filter here, ignore if the column family config says - // "no bloom filter" even if there is one in the hfile. return this.reader; } + + /** + * @return Reader for StoreFile. creates if necessary + * @throws IOException + */ + public StoreFile.Reader createReader() throws IOException { + if (this.reader == null) { + this.reader = open(); + } + return this.reader; + } /** - * @return Current reader. Must call open first else returns null. + * @return Current reader. Must call createReader first else returns null. + * @throws IOException + * @see {@link #createReader()} */ - public HFile.Reader getReader() { + public StoreFile.Reader getReader() { return this.reader; } /** * @throws IOException */ - public synchronized void close() throws IOException { + public synchronized void closeReader() throws IOException { if (this.reader != null) { this.reader.close(); this.reader = null; } } - @Override - public String toString() { - return this.path.toString() + - (isReference()? "-" + this.referencePath + "-" + reference.toString(): ""); - } - /** * Delete this file * @throws IOException */ - public void delete() throws IOException { - close(); + public void deleteReader() throws IOException { + closeReader(); this.fs.delete(getPath(), true); } + @Override + public String toString() { + return this.path.toString() + + (isReference()? "-" + this.referencePath + "-" + reference.toString(): ""); + } + /** * Utility to help with rename. * @param fs @@ -361,38 +403,47 @@ * @param fs * @param dir Path to family directory. Makes the directory if doesn't exist. * Creates a file with a unique name in this directory. + * @param blocksize size per filesystem block * @return HFile.Writer * @throws IOException */ - public static HFile.Writer getWriter(final FileSystem fs, final Path dir) + public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir, + final int blocksize) throws IOException { - return getWriter(fs, dir, DEFAULT_BLOCKSIZE_SMALL, null, null); + return createWriter(fs,dir,blocksize,null,null,null,BloomType.NONE,0); } /** - * Get a store file writer. Client is responsible for closing file when done. - * If metadata, add BEFORE closing using - * {@link #appendMetadata(org.apache.hadoop.hbase.io.hfile.HFile.Writer, long)}. + * Create a store file writer. Client is responsible for closing file when done. + * If metadata, add BEFORE closing using appendMetadata() * @param fs * @param dir Path to family directory. Makes the directory if doesn't exist. * Creates a file with a unique name in this directory. * @param blocksize * @param algorithm Pass null to get default. + * @param conf HBase system configuration. used with bloom filters + * @param bloomType column family setting for bloom filters * @param c Pass null to get default. + * @param maxKeySize peek theoretical entry size * @return HFile.Writer * @throws IOException */ - public static HFile.Writer getWriter(final FileSystem fs, final Path dir, - final int blocksize, final Compression.Algorithm algorithm, - final KeyValue.KeyComparator c) + public static StoreFile.Writer createWriter(final FileSystem fs, final Path dir, + final int blocksize, final Compression.Algorithm algorithm, + final KeyValue.KVComparator c, final HBaseConfiguration conf, + BloomType bloomType, int maxKeySize) throws IOException { if (!fs.exists(dir)) { fs.mkdirs(dir); } Path path = getUniqueFile(fs, dir); - return new HFile.Writer(fs, path, blocksize, - algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm, - c == null? KeyValue.KEY_COMPARATOR: c); + if(conf == null || !conf.getBoolean("io.hfile.bloom.enabled", true)) { + bloomType = BloomType.NONE; + } + + return new StoreFile.Writer(fs, path, blocksize, + algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm, + conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize); } /** @@ -442,35 +493,6 @@ return p; } - /** - * Write file metadata. - * Call before you call close on the passed w since its written - * as metadata to that file. - * - * @param w hfile writer - * @param maxSequenceId Maximum sequence id. - * @throws IOException - */ - static void appendMetadata(final HFile.Writer w, final long maxSequenceId) - throws IOException { - appendMetadata(w, maxSequenceId, false); - } - - /** - * Writes metadata. - * Call before you call close on the passed w since its written - * as metadata to that file. - * @param maxSequenceId Maximum sequence id. - * @param mc True if this file is product of a major compaction - * @throws IOException - */ - public static void appendMetadata(final HFile.Writer w, final long maxSequenceId, - final boolean mc) - throws IOException { - w.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); - w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(mc)); - } - /* * Write out a split reference. * @param fs @@ -497,4 +519,320 @@ Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName); return r.write(fs, p); } + + public static enum BloomType { + /** + * Bloomfilters disabled + */ + NONE, + /** + * Bloom enabled with Table row as Key + */ + ROW, + /** + * Bloom enabled with Table row & column (family+qualifier) as Key + */ + ROWCOL + } + + /** + * + */ + public static class Reader extends HFile.Reader { + /** Bloom Filter class. Caches only meta, pass in data */ + protected BloomFilter bloomFilter = null; + /** Type of bloom filter (e.g. ROW vs ROWCOL) */ + protected BloomType bt; + + /** + * @param fs + * @param path + * @param cache + * @param inMemory + * @throws IOException + */ + public Reader(FileSystem fs, Path path, BlockCache cache, + boolean inMemory) + throws IOException { + super(fs, path, cache, inMemory); + } + + /** + * @param fsdis + * @param size + * @param cache + * @param inMemory + */ + public Reader(final FSDataInputStream fsdis, final long size, + final BlockCache cache, final boolean inMemory) { + super(fsdis,size,cache,inMemory); + bt = BloomType.NONE; + } + + @Override + public Map loadFileInfo() + throws IOException { + Map fi = super.loadFileInfo(); + + byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY); + if (b != null) { + bt = BloomType.valueOf(Bytes.toString(b)); + } + + return fi; + } + + /** + * Load the bloom filter for this HFile into memory. + * Assumes the HFile has already been loaded + */ + public void loadBloomfilter() { + if (this.bloomFilter != null) { + return; // already loaded + } + + // see if bloom filter information is in the metadata + try { + ByteBuffer b = getMetaBlock(BLOOM_FILTER_META_KEY, false); + if (b != null) { + if (bt == BloomType.NONE) { + throw new IOException("valid bloom filter type not found in FileInfo"); + } + this.bloomFilter = new ByteBloomFilter(b); + LOG.info("Loaded " + (bt==BloomType.ROW? "row":"col") + + " bloom filter metadata for " + name); + } + } catch (IOException e) { + LOG.error("Error reading bloom filter meta: " + e + + ". proceeding without"); + this.bloomFilter = null; + } catch (IllegalArgumentException e) { + LOG.error("Bad bloom filter meta: " + e + ". proceeding without"); + this.bloomFilter = null; + } + } + + BloomFilter getBloomFilter() { + return this.bloomFilter; + } + + /** + * @return bloom type information associated with this store file + */ + public BloomType getBloomFilterType() { + return this.bt; + } + + @Override + public int getFilterEntries() { + // TODO: getKeyCount could cause under-sized blooms if the user switches + // bloom type (e.g. from ROW to ROWCOL) + if (this.bloomFilter != null) { + return this.bloomFilter.getKeyCount(); + } + return super.getFilterEntries(); + } + + @Override + public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { + return new Scanner(this, cacheBlocks, pread); + } + + protected class Scanner extends HFile.Reader.Scanner { + public Scanner(Reader r, boolean cacheBlocks, final boolean pread) { + super(r, cacheBlocks, pread); + } + + @Override + public boolean shouldSeek(final byte[] row, + final SortedSet columns) { + // lookup key if bloom not present OR passed bloom filter + if (bloomFilter != null) { + byte[] key; + int off; + int len; + switch(bt) { + case ROW: + key = row; + off = 0; + len = row.length; + break; + case ROWCOL: + if (columns.size() == 1) { + byte[] col = columns.first(); + key = Bytes.add(row, col); + off = 0; + len = key.length; + break; + } + //$FALL-THROUGH$ + default: + return true; + } + + try { + ByteBuffer bloom = getMetaBlock(BLOOM_FILTER_DATA_KEY, true); + if (bloom != null) { + return bloomFilter.contains(key, off, len, bloom); + } + } catch (IOException e) { + LOG.error("Error reading bloom filter data: " + e + + ". proceeding without"); + bloomFilter = null; + } catch (IllegalArgumentException e) { + LOG.error("Bad bloom filter data: " + e + ". proceeding without"); + bloomFilter = null; + } + + } + return true; + } + } + } + + /** + * + */ + public static class Writer extends HFile.Writer { + private final BloomFilter bloomFilter; + private final BloomType bloomType; + private KVComparator kvComparator; + private KeyValue lastKv = null; + private byte[] lastByteArray = null; + + /** + * Creates an HFile.Writer that also write helpful meta data. + * @param fs file system to write to + * @param path file name to create + * @param blocksize HDFS block size + * @param compress HDFS block compression + * @param conf user configuration + * @param comparator key comparator + * @param bloomType bloom filter setting + * @param maxKeys maximum amount of keys to add (for blooms) + * @throws IOException problem writing to FS + */ + public Writer(FileSystem fs, Path path, int blocksize, + Compression.Algorithm compress, final HBaseConfiguration conf, + final KVComparator comparator, BloomType bloomType, int maxKeys) + throws IOException { + super(fs, path, blocksize, compress, comparator.getRawComparator()); + + if (bloomType != BloomType.NONE && conf != null) { + this.kvComparator = comparator; + + float err = conf.getFloat("io.hfile.bloom.error.rate", (float)0.01); + int maxFold = conf.getInt("io.hfile.bloom.max.fold", 7); + + this.bloomFilter = new ByteBloomFilter(maxKeys, err, + Hash.getHashType(conf), maxFold); + this.bloomFilter.allocBloom(); + this.bloomType = bloomType; + } else { + this.bloomFilter = null; + this.bloomType = BloomType.NONE; + } + } + + /** + * Writes meta data. + * Call before {@link #close()} since its written as meta data to this file. + * @param maxSequenceId Maximum sequence id. + * @param majorCompaction True if this file is product of a major compaction + * @throws IOException problem writing to FS + */ + public void appendMetadata(final long maxSequenceId, + final boolean majorCompaction) + throws IOException { + appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); + appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); + } + + @Override + public void append(final KeyValue kv) + throws IOException { + if (this.bloomFilter != null) { + // only add to the bloom filter on a new, unique key + boolean newKey = true; + if (this.lastKv != null) { + switch(bloomType) { + case ROW: + newKey = ! kvComparator.matchingRows(kv, lastKv); + break; + case ROWCOL: + newKey = ! kvComparator.matchingRowColumn(kv, lastKv); + break; + case NONE: + newKey = false; + } + } + if (newKey) { + /* + * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png + * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp + * + * 2 Types of Filtering: + * 1. Row = Row + * 2. RowCol = Row + Qualifier + */ + switch (bloomType) { + case ROW: + this.bloomFilter.add(kv.getBuffer(), kv.getRowOffset(), + kv.getRowLength()); + break; + case ROWCOL: + // merge(row, qualifier) + int ro = kv.getRowOffset(); + int rl = kv.getRowLength(); + int qo = kv.getQualifierOffset(); + int ql = kv.getQualifierLength(); + byte [] result = new byte[rl + ql]; + System.arraycopy(kv.getBuffer(), ro, result, 0, rl); + System.arraycopy(kv.getBuffer(), qo, result, rl, ql); + + this.bloomFilter.add(result); + break; + default: + } + this.lastKv = kv; + } + } + super.append(kv); + } + + @Override + public void append(final byte [] key, final byte [] value) + throws IOException { + if (this.bloomFilter != null) { + // only add to the bloom filter on a new row + if(this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) { + this.bloomFilter.add(key); + this.lastByteArray = key; + } + } + super.append(key, value); + } + + @Override + public void close() + throws IOException { + // make sure we wrote something to the bloom before adding it + if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) { + bloomFilter.finalize(); + if (this.bloomFilter.getMaxKeys() > 0) { + int b = this.bloomFilter.getByteSize(); + int k = this.bloomFilter.getKeyCount(); + int m = this.bloomFilter.getMaxKeys(); + StoreFile.LOG.info("Bloom added to HFile. " + b + "B, " + + k + "/" + m + " (" + NumberFormat.getPercentInstance().format( + ((double)k) / ((double)m)) + ")"); + } + appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter()); + appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter()); + appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); + } + super.close(); + } + + } } Index: src/java/org/apache/hadoop/hbase/rest/parser/XMLRestParser.java =================================================================== --- src/java/org/apache/hadoop/hbase/rest/parser/XMLRestParser.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/rest/parser/XMLRestParser.java (working copy) @@ -97,7 +97,7 @@ boolean in_memory = HColumnDescriptor.DEFAULT_IN_MEMORY; boolean block_cache = HColumnDescriptor.DEFAULT_BLOCKCACHE; int ttl = HColumnDescriptor.DEFAULT_TTL; - boolean bloomfilter = HColumnDescriptor.DEFAULT_BLOOMFILTER; + String bloomfilter = HColumnDescriptor.DEFAULT_BLOOMFILTER; if (currentTDesp != null) { HColumnDescriptor currentCDesp = currentTDesp.getFamily(Bytes @@ -108,7 +108,7 @@ in_memory = currentCDesp.isInMemory(); block_cache = currentCDesp.isBlockCacheEnabled(); ttl = currentCDesp.getTimeToLive(); - bloomfilter = currentCDesp.isBloomfilter(); + bloomfilter = currentCDesp.getBloomFilterType().toString(); } } @@ -147,8 +147,8 @@ NodeList bloomfilter_list = columnfamily .getElementsByTagName("bloomfilter"); if (bloomfilter_list.getLength() > 0) { - bloomfilter = Boolean.valueOf(bloomfilter_list.item(0).getFirstChild() - .getNodeValue()); + bloomfilter = bloomfilter_list.item(0).getFirstChild() + .getNodeValue().toUpperCase(); } HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(colname), Index: src/java/org/apache/hadoop/hbase/rest/serializer/SimpleXMLSerializer.java =================================================================== --- src/java/org/apache/hadoop/hbase/rest/serializer/SimpleXMLSerializer.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/rest/serializer/SimpleXMLSerializer.java (working copy) @@ -157,7 +157,7 @@ printer.print(""); // bloomfilter printer.print(""); - printer.print(column.getCompressionType().toString()); + printer.print(column.getBloomFilterType().toString()); printer.print(""); // max-versions printer.print(""); Index: src/java/org/apache/hadoop/hbase/rest/TableController.java =================================================================== --- src/java/org/apache/hadoop/hbase/rest/TableController.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/rest/TableController.java (working copy) @@ -71,12 +71,12 @@ * (non-Javadoc) * * @param input column descriptor JSON. Should be of the form:

-   * {"column_families":[ { "name":STRING, "bloomfilter":BOOLEAN,
+   * {"column_families":[ { "name":STRING, "bloomfilter":STRING,
    * "max_versions":INTEGER, "compression_type":STRING, "in_memory":BOOLEAN,
    * "block_cache_enabled":BOOLEAN, "max_value_length":INTEGER,
    * "time_to_live":INTEGER ]} 
If any of the json object fields (except * name) are not included the default values will be included instead. The - * default values are:
 bloomfilter => false max_versions => 3
+   * default values are: 
 bloomfilter => NONE max_versions => 3
    * compression_type => NONE in_memory => false block_cache_enabled => false
    * max_value_length => 2147483647 time_to_live => Integer.MAX_VALUE 
* Index: src/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.rest.exception.HBaseRestException; import org.apache.hadoop.hbase.rest.serializer.IRestSerializer; import org.apache.hadoop.hbase.rest.serializer.ISerializable; @@ -645,7 +646,7 @@ new HColumnDescriptor[] { new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), true, true, 8 * 1024, - HConstants.FOREVER, false) }); + HConstants.FOREVER, StoreFile.BloomType.NONE.toString()) }); /** Table descriptor for .META. catalog table */ public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor( @@ -653,11 +654,11 @@ new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), true, true, 8 * 1024, - HConstants.FOREVER, false), + HConstants.FOREVER, StoreFile.BloomType.NONE.toString()), new HColumnDescriptor(HConstants.CATALOG_HISTORIAN_FAMILY, HConstants.ALL_VERSIONS, Compression.Algorithm.NONE.getName(), false, false, 8 * 1024, - HConstants.WEEK_IN_SECONDS, false)}); + HConstants.WEEK_IN_SECONDS, StoreFile.BloomType.NONE.toString())}); /* (non-Javadoc) * @see org.apache.hadoop.hbase.rest.xml.IOutputXML#toXML() Index: src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (working copy) @@ -112,7 +112,10 @@ private void close(final HFile.Writer w) throws IOException { if (w != null) { - StoreFile.appendMetadata(w, System.currentTimeMillis(), true); + w.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, + Bytes.toBytes(System.currentTimeMillis())); + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)); w.close(); } } Index: src/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- src/java/org/apache/hadoop/hbase/KeyValue.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -979,7 +979,7 @@ System.arraycopy(this.bytes, o, result, 0, l); return result; } - + //--------------------------------------------------------------------------- // // KeyValue splitter @@ -1406,7 +1406,7 @@ } /** - * Compares the row and column of two keyvalues + * Compares the row and column of two keyvalues for equality * @param left * @param right * @return True if same row and column. @@ -1415,10 +1415,9 @@ final KeyValue right) { short lrowlength = left.getRowLength(); short rrowlength = right.getRowLength(); - if (!matchingRows(left, lrowlength, right, rrowlength)) { - return false; - } - return compareColumns(left, lrowlength, right, rrowlength) == 0; + return left.getTimestampOffset() == right.getTimestampOffset() && + matchingRows(left, lrowlength, right, rrowlength) && + compareColumns(left, lrowlength, right, rrowlength) == 0; } /** @@ -1431,6 +1430,7 @@ } /** + * Compares the row of two keyvalues for equality * @param left * @param right * @return True if rows match. @@ -1450,11 +1450,8 @@ */ public boolean matchingRows(final KeyValue left, final short lrowlength, final KeyValue right, final short rrowlength) { - int compare = compareRows(left, lrowlength, right, rrowlength); - if (compare != 0) { - return false; - } - return true; + return lrowlength == rrowlength && + compareRows(left, lrowlength, right, rrowlength) == 0; } public boolean matchingRows(final byte [] left, final int loffset, Index: src/java/org/apache/hadoop/hbase/io/HalfHFileReader.java =================================================================== --- src/java/org/apache/hadoop/hbase/io/HalfHFileReader.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/io/HalfHFileReader.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.SortedSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; /** @@ -46,7 +48,7 @@ * *

This file is not splitable. Calls to {@link #midkey()} return null. */ -public class HalfHFileReader extends HFile.Reader { +public class HalfHFileReader extends StoreFile.Reader { final Log LOG = LogFactory.getLog(HalfHFileReader.class); final boolean top; // This is the key we split around. Its the first possible entry on a row: @@ -153,6 +155,11 @@ return this.delegate.seekBefore(key, offset, length); } + public boolean shouldSeek(byte[] row, + final SortedSet columns) { + return this.delegate.shouldSeek(row, columns); + } + public boolean seekTo() throws IOException { if (top) { int r = this.delegate.seekTo(splitkey); Index: src/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.SortedSet; import org.apache.hadoop.hbase.KeyValue; @@ -65,6 +66,17 @@ public boolean seekBefore(byte [] key) throws IOException; public boolean seekBefore(byte []key, int offset, int length) throws IOException; /** + * Optimization for single key lookups. If the file has a filter, + * perform a lookup on the key. + * @param row the row to scan + * @param family the column family to scan + * @param columns the array of column qualifiers to scan + * @return False if the key definitely does not exist in this ScanFile + * @throws IOException + */ + public boolean shouldSeek(final byte[] row, + final SortedSet columns); + /** * Positions this scanner at the start of the file. * @return False if empty file; i.e. a call to next would return false and * the current key and value are undefined. Index: src/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- src/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.SortedSet; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -44,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.HRegionInfo; @@ -54,6 +56,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; @@ -208,7 +211,7 @@ private long valuelength = 0; // Used to ensure we write in order. - private final RawComparator comparator; + private final RawComparator rawComparator; // A stream made per block written. private DataOutputStream out; @@ -238,7 +241,7 @@ // Meta block system. private ArrayList metaNames = new ArrayList(); - private ArrayList metaData = new ArrayList(); + private ArrayList metaData = new ArrayList(); // Used compression. Used even if no compression -- 'none'. private final Compression.Algorithm compressAlgo; @@ -272,7 +275,7 @@ * @throws IOException */ public Writer(FileSystem fs, Path path, int blocksize, - String compress, final RawComparator comparator) + String compress, final KeyComparator comparator) throws IOException { this(fs, path, blocksize, compress == null? DEFAULT_COMPRESSION_ALGORITHM: @@ -291,7 +294,7 @@ */ public Writer(FileSystem fs, Path path, int blocksize, Compression.Algorithm compress, - final RawComparator comparator) + final KeyComparator comparator) throws IOException { this(fs.create(path), blocksize, compress, comparator); this.closeOutputStream = true; @@ -308,7 +311,7 @@ * @throws IOException */ public Writer(final FSDataOutputStream ostream, final int blocksize, - final String compress, final RawComparator c) + final String compress, final KeyComparator c) throws IOException { this(ostream, blocksize, Compression.getCompressionAlgorithmByName(compress), c); @@ -323,12 +326,12 @@ * @throws IOException */ public Writer(final FSDataOutputStream ostream, final int blocksize, - final Compression.Algorithm compress, final RawComparator c) + final Compression.Algorithm compress, final KeyComparator c) throws IOException { this.outputStream = ostream; this.closeOutputStream = false; this.blocksize = blocksize; - this.comparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c; + this.rawComparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c; this.name = this.outputStream.toString(); this.compressAlgo = compress == null? DEFAULT_COMPRESSION_ALGORITHM: compress; @@ -424,9 +427,19 @@ * @param metaBlockName name of the block * @param bytes uninterpreted bytes of the block. */ - public void appendMetaBlock(String metaBlockName, byte [] bytes) { - metaNames.add(Bytes.toBytes(metaBlockName)); - metaData.add(bytes); + public void appendMetaBlock(String metaBlockName, Writable content) { + byte[] key = Bytes.toBytes(metaBlockName); + int i; + for (i = 0; i < metaNames.size(); ++i) { + // stop when the current key is greater than our own + byte[] cur = metaNames.get(i); + if (this.rawComparator.compare(cur, 0, cur.length, key, 0, key.length) + > 0) { + break; + } + } + metaNames.add(i, key); + metaData.add(i, content); } /** @@ -507,7 +520,7 @@ * @param vlength * @throws IOException */ - public void append(final byte [] key, final int koffset, final int klength, + private void append(final byte [] key, final int koffset, final int klength, final byte [] value, final int voffset, final int vlength) throws IOException { checkKey(key, koffset, klength); @@ -546,7 +559,7 @@ MAXIMUM_KEY_LENGTH); } if (this.lastKeyBuffer != null) { - if (this.comparator.compare(this.lastKeyBuffer, this.lastKeyOffset, + if (this.rawComparator.compare(this.lastKeyBuffer, this.lastKeyOffset, this.lastKeyLength, key, offset, length) > 0) { throw new IOException("Added a key not lexically larger than" + " previous key=" + Bytes.toString(key, offset, length) + @@ -585,10 +598,16 @@ metaOffsets = new ArrayList(metaNames.size()); metaDataSizes = new ArrayList(metaNames.size()); for (int i = 0 ; i < metaNames.size() ; ++ i ) { - metaOffsets.add(Long.valueOf(outputStream.getPos())); - metaDataSizes. - add(Integer.valueOf(METABLOCKMAGIC.length + metaData.get(i).length)); - writeMetaBlock(metaData.get(i)); + // store the beginning offset + long curPos = outputStream.getPos(); + metaOffsets.add(Long.valueOf(curPos)); + // write the metadata content + DataOutputStream dos = getCompressingStream(); + dos.write(METABLOCKMAGIC); + metaData.get(i).write(dos); + int size = releaseCompressingStream(dos); + // store the metadata size + metaDataSizes.add(Integer.valueOf(size)); } } @@ -622,17 +641,6 @@ } } - /* Write a metadata block. - * @param metadata - * @throws IOException - */ - private void writeMetaBlock(final byte [] b) throws IOException { - DataOutputStream dos = getCompressingStream(); - dos.write(METABLOCKMAGIC); - dos.write(b); - releaseCompressingStream(dos); - } - /* * Add last bits of metadata to fileinfo and then write it out. * Reader will be expecting to find all below. @@ -658,7 +666,7 @@ appendFileInfo(this.fileinfo, FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); appendFileInfo(this.fileinfo, FileInfo.COMPARATOR, - Bytes.toBytes(this.comparator.getClass().getName()), false); + Bytes.toBytes(this.rawComparator.getClass().getName()), false); long pos = o.getPos(); this.fileinfo.write(o); return pos; @@ -700,6 +708,7 @@ private final BlockCache cache; public int cacheHits = 0; public int blockLoads = 0; + public int metaLoads = 0; // Whether file is from in-memory store private boolean inMemory = false; @@ -707,16 +716,8 @@ // Name for this object used when logging or in toString. Is either // the result of a toString on the stream or else is toString of passed // file Path plus metadata key/value pairs. - private String name; + protected String name; - /* - * Do not expose the default constructor. - */ - @SuppressWarnings("unused") - private Reader() throws IOException { - this(null, -1, null, false); - } - /** * Opens a HFile. You must load the file info before you can * use it by calling {@link #loadFileInfo()}. @@ -789,7 +790,8 @@ * See {@link Writer#appendFileInfo(byte[], byte[])}. * @throws IOException */ - public Map loadFileInfo() throws IOException { + public Map loadFileInfo() + throws IOException { this.trailer = readTrailer(); // Read in the fileinfo and get what we need from it. @@ -879,16 +881,19 @@ } /** * @param metaBlockName + * @param cacheBlock Add block to cache, if found * @return Block wrapped in a ByteBuffer * @throws IOException */ - public ByteBuffer getMetaBlock(String metaBlockName) throws IOException { + public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) + throws IOException { if (trailer.metaIndexCount == 0) { return null; // there are no meta blocks } if (metaIndex == null) { throw new IOException("Meta index not loaded"); } + byte [] mbname = Bytes.toBytes(metaBlockName); int block = metaIndex.blockContainingKey(mbname, 0, mbname.length); if (block == -1) @@ -900,19 +905,45 @@ blockSize = metaIndex.blockOffsets[block+1] - metaIndex.blockOffsets[block]; } - ByteBuffer buf = decompress(metaIndex.blockOffsets[block], - longToInt(blockSize), metaIndex.blockDataSizes[block], true); - byte [] magic = new byte[METABLOCKMAGIC.length]; - buf.get(magic, 0, magic.length); + long now = System.currentTimeMillis(); - if (! Arrays.equals(magic, METABLOCKMAGIC)) { - throw new IOException("Meta magic is bad in block " + block); + // Per meta key from any given file, synchronize reads for said block + synchronized (metaIndex.blockKeys[block]) { + metaLoads++; + // Check cache for block. If found return. + if (cache != null) { + ByteBuffer cachedBuf = cache.getBlock(name + "meta" + block); + if (cachedBuf != null) { + // Return a distinct 'shallow copy' of the block, + // so pos doesnt get messed by the scanner + cacheHits++; + return cachedBuf.duplicate(); + } + // Cache Miss, please load. + } + + ByteBuffer buf = decompress(metaIndex.blockOffsets[block], + longToInt(blockSize), metaIndex.blockDataSizes[block], true); + byte [] magic = new byte[METABLOCKMAGIC.length]; + buf.get(magic, 0, magic.length); + + if (! Arrays.equals(magic, METABLOCKMAGIC)) { + throw new IOException("Meta magic is bad in block " + block); + } + + // Create a new ByteBuffer 'shallow copy' to hide the magic header + buf = buf.slice(); + + readTime += System.currentTimeMillis() - now; + readOps++; + + // Cache the block + if(cacheBlock && cache != null) { + cache.cacheBlock(name + "meta" + block, buf.duplicate(), inMemory); + } + + return buf; } - // Toss the header. May have to remove later due to performance. - buf.compact(); - buf.limit(buf.limit() - METABLOCKMAGIC.length); - buf.rewind(); - return buf; } /** @@ -942,8 +973,8 @@ if (cache != null) { ByteBuffer cachedBuf = cache.getBlock(name + block); if (cachedBuf != null) { - // Return a distinct 'copy' of the block, so pos doesnt get messed by - // the scanner + // Return a distinct 'shallow copy' of the block, + // so pos doesnt get messed by the scanner cacheHits++; return cachedBuf.duplicate(); } @@ -972,11 +1003,12 @@ if (!Arrays.equals(magic, DATABLOCKMAGIC)) { throw new IOException("Data magic is bad in block " + block); } - // Toss the header. May have to remove later due to performance. - buf.compact(); - buf.limit(buf.limit() - DATABLOCKMAGIC.length); - buf.rewind(); + // 'shallow copy' to hide the header + // NOTE: you WILL GET BIT if you call buf.array() but don't start + // reading at buf.arrayOffset() + buf = buf.slice(); + readTime += System.currentTimeMillis() - now; readOps++; @@ -1035,6 +1067,9 @@ return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0]; } + /** + * @return number of KV entries in this HFile + */ public int getEntries() { if (!this.isFileInfoLoaded()) { throw new RuntimeException("File info not loaded"); @@ -1051,6 +1086,13 @@ } return this.blockIndex.isEmpty()? null: this.lastkey; } + + /** + * @return number of K entries in this HFile's filter. Returns KV count if no filter. + */ + public int getFilterEntries() { + return getEntries(); + } /** * @return Comparator. @@ -1089,7 +1131,7 @@ /* * Implementation of {@link HFileScanner} interface. */ - private static class Scanner implements HFileScanner { + protected static class Scanner implements HFileScanner { private final Reader reader; private ByteBuffer block; private int currBlock; @@ -1170,6 +1212,11 @@ return true; } + public boolean shouldSeek(final byte[] row, + final SortedSet columns) { + return true; + } + public int seekTo(byte [] key) throws IOException { return seekTo(key, 0, key.length); } @@ -1323,10 +1370,10 @@ * parts of the file. Also includes basic metadata on this file. */ private static class FixedFileTrailer { + // Offset to the fileinfo data, a small block of vitals.. + long fileinfoOffset; // Offset to the data block index. long dataIndexOffset; - // Offset to the fileinfo data, a small block of vitals.. - long fileinfoOffset; // How many index counts are there (aka: block count) int dataIndexCount; // Offset to the meta block index. Index: src/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java (revision 0) +++ src/java/org/apache/hadoop/hbase/util/ByteBloomFilter.java (revision 0) @@ -0,0 +1,423 @@ +/** + * + * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org) + * All rights reserved. + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the distribution. + * - Neither the name of the University Catholique de Louvain - UCL + * nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.DataOutput; +import java.io.DataInput; +import java.io.IOException; +import java.lang.Math; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.bloom.Filter; + +/** + * Implements a Bloom filter, as defined by Bloom in 1970. + *

+ * The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by + * the networking research community in the past decade thanks to the bandwidth efficiencies that it + * offers for the transmission of set membership information between networked hosts. A sender encodes + * the information into a bit vector, the Bloom filter, that is more compact than a conventional + * representation. Computation and space costs for construction are linear in the number of elements. + * The receiver uses the filter to test whether various elements are members of the set. Though the + * filter will occasionally return a false positive, it will never return a false negative. When creating + * the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size. + * + *

+ * Originally created by + * European Commission One-Lab Project 034819. + * + * @see Filter The general behavior of a filter + * + * @see Space/Time Trade-Offs in Hash Coding with Allowable Errors + */ +public class ByteBloomFilter implements BloomFilter { + /** Current file format version */ + public static final int VERSION = 1; + + /** Bytes (B) in the array */ + protected int byteSize; + /** Number of hash functions */ + protected final int hashCount; + /** Hash type */ + protected final int hashType; + /** Hash Function */ + protected final Hash hash; + /** Keys currently in the bloom */ + protected int keyCount; + /** Max Keys expected for the bloom */ + protected int maxKeys; + /** Bloom bits */ + protected ByteBuffer bloom; + + /** Bit-value lookup array to prevent doing the same work over and over */ + private static final byte [] bitvals = { + (byte) 0x01, + (byte) 0x02, + (byte) 0x04, + (byte) 0x08, + (byte) 0x10, + (byte) 0x20, + (byte) 0x40, + (byte) 0x80 + }; + + /** + * Loads bloom filter meta data from file input. + * @param meta stored bloom meta data + * @throws IllegalArgumentException meta data is invalid + */ + public ByteBloomFilter(ByteBuffer meta) + throws IllegalArgumentException { + int version = meta.getInt(); + if (version != VERSION) throw new IllegalArgumentException("Bad version"); + + this.byteSize = meta.getInt(); + this.hashCount = meta.getInt(); + this.hashType = meta.getInt(); + this.keyCount = meta.getInt(); + this.maxKeys = this.keyCount; + + this.hash = Hash.getInstance(this.hashType); + sanityCheck(); + } + + /** + * Determines & initializes bloom filter meta data from user config. Call + * {@link #allocBloom()} to allocate bloom filter data. + * @param maxKeys Maximum expected number of keys that will be stored in this bloom + * @param errorRate Desired false positive error rate. Lower rate = more storage required + * @param hashType Type of hash function to use + * @param foldFactor When finished adding entries, you may be able to 'fold' + * this bloom to save space. Tradeoff potentially excess bytes in bloom for + * ability to fold if keyCount is exponentially greater than maxKeys. + * @throws IllegalArgumentException + */ + public ByteBloomFilter(int maxKeys, float errorRate, int hashType, int foldFactor) + throws IllegalArgumentException { + /* + * Bloom filters are very sensitive to the number of elements inserted + * into them. For HBase, the number of entries depends on the size of the + * data stored in the column. Currently the default region size is 256MB, + * so entry count ~= 256MB / (average value size for column). Despite + * this rule of thumb, there is no efficient way to calculate the entry + * count after compactions. Therefore, it is often easier to use a + * dynamic bloom filter that will add extra space instead of allowing the + * error rate to grow. + * + * ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf ) + * + * m denotes the number of bits in the Bloom filter (bitSize) + * n denotes the number of elements inserted into the Bloom filter (maxKeys) + * k represents the number of hash functions used (nbHash) + * e represents the desired false positive rate for the bloom (err) + * + * If we fix the error rate (e) and know the number of entries, then + * the optimal bloom size m = -(n * ln(err) / (ln(2)^2) + * ~= n * ln(err) / ln(0.6185) + * + * The probability of false positives is minimized when k = m/n ln(2). + */ + int bitSize = (int)Math.ceil(maxKeys * (Math.log(errorRate) / Math.log(0.6185))); + int functionCount = (int)Math.ceil(Math.log(2) * (bitSize / maxKeys)); + + // increase byteSize so folding is possible + int byteSize = (bitSize + 7) / 8; + int mask = (1 << foldFactor) - 1; + if ( (mask & byteSize) != 0) { + byteSize >>= foldFactor; + ++byteSize; + byteSize <<= foldFactor; + } + + this.byteSize = byteSize; + this.hashCount = functionCount; + this.hashType = hashType; + this.keyCount = 0; + this.maxKeys = maxKeys; + + this.hash = Hash.getInstance(hashType); + sanityCheck(); + } + + @Override + public void allocBloom() { + if (this.bloom != null) { + throw new IllegalArgumentException("can only create bloom once."); + } + this.bloom = ByteBuffer.allocate(this.byteSize); + assert this.bloom.hasArray(); + } + + void sanityCheck() throws IllegalArgumentException { + if(this.byteSize <= 0) { + throw new IllegalArgumentException("maxValue must be > 0"); + } + + if(this.hashCount <= 0) { + throw new IllegalArgumentException("Hash function count must be > 0"); + } + + if (this.hash == null) { + throw new IllegalArgumentException("hashType must be known"); + } + + if (this.keyCount < 0) { + throw new IllegalArgumentException("must have positive keyCount"); + } + } + + void bloomCheck(ByteBuffer bloom) throws IllegalArgumentException { + if (this.byteSize != bloom.limit()) { + throw new IllegalArgumentException( + "Configured bloom length should match actual length"); + } + } + + @Override + public boolean add(byte [] buf) { + return add(buf, 0, buf.length); + } + + @Override + public boolean add(byte [] buf, int offset, int len) { + /* + * For faster hashing, use combinatorial generation + * http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf + */ + int hash1 = this.hash.hash(buf, offset, len, 0); + int hash2 = this.hash.hash(buf, offset, len, hash1); + + for (int i = 0; i < this.hashCount; i++) { + int hashLoc = Math.abs((hash1 + i * hash2) % (this.byteSize * 8)); + set(hashLoc); + } + + ++this.keyCount; + return true; + } + + /** + * Should only be used in tests when writing a bloom filter. + */ + boolean contains(byte [] buf) { + return contains(buf, 0, buf.length, this.bloom); + } + + /** + * Should only be used in tests when writing a bloom filter. + */ + boolean contains(byte [] buf, int offset, int length) { + return contains(buf, offset, length, this.bloom); + } + + @Override + public boolean contains(byte [] buf, ByteBuffer theBloom) { + return contains(buf, 0, buf.length, theBloom); + } + + @Override + public boolean contains(byte [] buf, int offset, int length, + ByteBuffer theBloom) { + + if(theBloom.limit() != this.byteSize) { + throw new IllegalArgumentException("Bloom does not match expected size"); + } + + int hash1 = this.hash.hash(buf, offset, length, 0); + int hash2 = this.hash.hash(buf, offset, length, hash1); + + for (int i = 0; i < this.hashCount; i++) { + int hashLoc = Math.abs((hash1 + i * hash2) % (this.byteSize * 8)); + if (!get(hashLoc, theBloom) ) { + return false; + } + } + return true; + } + + //--------------------------------------------------------------------------- + /** Private helpers */ + + /** + * Set the bit at the specified index to 1. + * + * @param pos index of bit + */ + void set(int pos) { + int bytePos = pos / 8; + int bitPos = pos % 8; + byte curByte = bloom.get(bytePos); + curByte |= bitvals[bitPos]; + bloom.put(bytePos, curByte); + } + + /** + * Check if bit at specified index is 1. + * + * @param pos index of bit + * @return true if bit at specified index is 1, false if 0. + */ + static boolean get(int pos, ByteBuffer theBloom) { + int bytePos = pos / 8; + int bitPos = pos % 8; + byte curByte = theBloom.get(bytePos); + curByte &= bitvals[bitPos]; + return (curByte != 0); + } + + @Override + public int getKeyCount() { + return this.keyCount; + } + + @Override + public int getMaxKeys() { + return this.maxKeys; + } + + @Override + public int getByteSize() { + return this.byteSize; + } + + @Override + public void finalize() { + // see if the actual size is exponentially smaller than expected. + if (this.keyCount > 0 && this.bloom.hasArray()) { + int pieces = 1; + int newByteSize = this.byteSize; + int newMaxKeys = this.maxKeys; + + // while exponentially smaller & folding is lossless + while ( (newByteSize & 1) == 0 && newMaxKeys > (this.keyCount<<1) ) { + pieces <<= 1; + newByteSize >>= 1; + newMaxKeys >>= 1; + } + + // if we should fold these into pieces + if (pieces > 1) { + byte[] array = this.bloom.array(); + int start = this.bloom.arrayOffset(); + int end = start + newByteSize; + int off = end; + for(int p = 1; p < pieces; ++p) { + for(int pos = start; pos < end; ++pos) { + array[pos] |= array[off++]; + } + } + // folding done, only use a subset of this array + this.bloom.rewind(); + this.bloom.limit(newByteSize); + this.bloom = this.bloom.slice(); + this.byteSize = newByteSize; + this.maxKeys = newMaxKeys; + } + } + } + + + //--------------------------------------------------------------------------- + + /** + * Writes just the bloom filter to the output array + * @param out OutputStream to place bloom + * @throws IOException Error writing bloom array + */ + public void writeBloom(final DataOutput out) throws IOException { + if (!this.bloom.hasArray()) { + throw new IOException("Only writes ByteBuffer with underlying array."); + } + out.write(bloom.array(), bloom.arrayOffset(), bloom.limit()); + } + + @Override + public Writable getMetaWriter() { + return new MetaWriter(); + } + + @Override + public Writable getDataWriter() { + return new DataWriter(); + } + + private class MetaWriter implements Writable { + protected MetaWriter() {} + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Cant read with this class."); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(VERSION); + out.writeInt(byteSize); + out.writeInt(hashCount); + out.writeInt(hashType); + out.writeInt(keyCount); + } + } + + private class DataWriter implements Writable { + protected DataWriter() {} + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Cant read with this class."); + } + + @Override + public void write(DataOutput out) throws IOException { + writeBloom(out); + } + } + +} Index: src/java/org/apache/hadoop/hbase/util/JenkinsHash.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/JenkinsHash.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/util/JenkinsHash.java (working copy) @@ -80,11 +80,11 @@ */ @Override @SuppressWarnings("fallthrough") - public int hash(byte[] key, int nbytes, int initval) { + public int hash(byte[] key, int off, int nbytes, int initval) { int length = nbytes; long a, b, c; // We use longs because we don't have unsigned ints a = b = c = (0x00000000deadbeefL + length + initval) & INT_MASK; - int offset = 0; + int offset = off; for (; length > 12; offset += 12, length -= 12) { a = (a + (key[offset + 0] & BYTE_MASK)) & INT_MASK; a = (a + (((key[offset + 1] & BYTE_MASK) << 8) & INT_MASK)) & INT_MASK; Index: src/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java (revision 0) +++ src/java/org/apache/hadoop/hbase/util/DynamicByteBloomFilter.java (revision 0) @@ -0,0 +1,338 @@ +/** + * + * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org) + * All rights reserved. + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the distribution. + * - Neither the name of the University Catholique de Louvain - UCL + * nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.Writable; + +/** + * Implements a dynamic Bloom filter, as defined in the INFOCOM 2006 paper. + *

+ * A dynamic Bloom filter (DBF) makes use of a s * m bit matrix but + * each of the s rows is a standard Bloom filter. The creation + * process of a DBF is iterative. At the start, the DBF is a 1 * m + * bit matrix, i.e., it is composed of a single standard Bloom filter. + * It assumes that nr elements are recorded in the + * initial bit vector, where nr <= n (n is + * the cardinality of the set A to record in the filter). + *

+ * As the size of A grows during the execution of the application, + * several keys must be inserted in the DBF. When inserting a key into the DBF, + * one must first get an active Bloom filter in the matrix. A Bloom filter is + * active when the number of recorded keys, nr, is + * strictly less than the current cardinality of A, n. + * If an active Bloom filter is found, the key is inserted and + * nr is incremented by one. On the other hand, if there + * is no active Bloom filter, a new one is created (i.e., a new row is added to + * the matrix) according to the current size of A and the element + * is added in this new Bloom filter and the nr value of + * this new Bloom filter is set to one. A given key is said to belong to the + * DBF if the k positions are set to one in one of the matrix rows. + *

+ * Originally created by + * European Commission One-Lab Project 034819. + * + * @see BloomFilter A Bloom filter + * + * @see Theory and Network Applications of Dynamic Bloom Filters + */ +public class DynamicByteBloomFilter implements BloomFilter { + /** Current file format version */ + public static final int VERSION = 2; + /** Maximum number of keys in a dynamic Bloom filter row. */ + protected final int keyInterval; + /** The maximum false positive rate per bloom */ + protected final float errorRate; + /** Hash type */ + protected final int hashType; + /** The number of keys recorded in the current Bloom filter. */ + protected int curKeys; + /** expected size of bloom filter matrix (used during reads) */ + protected int readMatrixSize; + /** The matrix of Bloom filters (contains bloom data only during writes). */ + protected ByteBloomFilter[] matrix; + + /** + * Normal read constructor. Loads bloom filter meta data. + * @param meta stored bloom meta data + * @throws IllegalArgumentException meta data is invalid + */ + public DynamicByteBloomFilter(ByteBuffer meta) + throws IllegalArgumentException { + int version = meta.getInt(); + if (version != VERSION) throw new IllegalArgumentException("Bad version"); + + this.keyInterval = meta.getInt(); + this.errorRate = meta.getFloat(); + this.hashType = meta.getInt(); + this.readMatrixSize = meta.getInt(); + this.curKeys = meta.getInt(); + + readSanityCheck(); + + this.matrix = new ByteBloomFilter[1]; + this.matrix[0] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); +} + + /** + * Normal write constructor. Note that this doesn't allocate bloom data by + * default. Instead, call allocBloom() before adding entries. + * @param bitSize The vector size of this filter. + * @param functionCount The number of hash function to consider. + * @param hashType type of the hashing function (see + * {@link org.apache.hadoop.util.hash.Hash}). + * @param keyInterval Maximum number of keys to record per Bloom filter row. + * @throws IllegalArgumentException The input parameters were invalid + */ + public DynamicByteBloomFilter(int keyInterval, float errorRate, int hashType) + throws IllegalArgumentException { + this.keyInterval = keyInterval; + this.errorRate = errorRate; + this.hashType = hashType; + this.curKeys = 0; + + if(keyInterval <= 0) { + throw new IllegalArgumentException("keyCount must be > 0"); + } + + this.matrix = new ByteBloomFilter[1]; + this.matrix[0] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); +} + + @Override + public void allocBloom() { + this.matrix[0].allocBloom(); + } + + void readSanityCheck() throws IllegalArgumentException { + if (this.curKeys <= 0) { + throw new IllegalArgumentException("last bloom's key count invalid"); + } + + if (this.readMatrixSize <= 0) { + throw new IllegalArgumentException("matrix size must be known"); + } + } + + @Override + public boolean add(byte []buf, int offset, int len) { + BloomFilter bf = getCurBloom(); + + if (bf == null) { + addRow(); + bf = matrix[matrix.length - 1]; + curKeys = 0; + } + + if (bf.add(buf, offset, len)) { + curKeys++; + return true; + } + + return false; + } + + @Override + public boolean add(byte []buf) { + return add(buf, 0, buf.length); + } + + /** + * Should only be used in tests when writing a bloom filter. + */ + boolean contains(byte [] buf) { + return contains(buf, 0, buf.length); + } + + /** + * Should only be used in tests when writing a bloom filter. + */ + boolean contains(byte [] buf, int offset, int length) { + for (int i = 0; i < matrix.length; i++) { + if (matrix[i].contains(buf, offset, length)) { + return true; + } + } + return false; + } + + @Override + public boolean contains(byte [] buf, ByteBuffer theBloom) { + return contains(buf, 0, buf.length, theBloom); + } + + @Override + public boolean contains(byte[] buf, int offset, int length, + ByteBuffer theBloom) { + if(offset + length > buf.length) { + return false; + } + + // current version assumes uniform size + int bytesPerBloom = this.matrix[0].getByteSize(); + + if(theBloom.limit() != bytesPerBloom * readMatrixSize) { + throw new IllegalArgumentException("Bloom does not match expected size"); + } + + ByteBuffer tmp = theBloom.duplicate(); + + // note: actually searching an array of blooms that have been serialized + for (int m = 0; m < readMatrixSize; ++m) { + tmp.position(m* bytesPerBloom); + tmp.limit(tmp.position() + bytesPerBloom); + boolean match = this.matrix[0].contains(buf, offset, length, tmp.slice()); + if (match) { + return true; + } + } + + // matched no bloom filters + return false; + } + + int bloomCount() { + return Math.max(this.matrix.length, this.readMatrixSize); + } + + @Override + public int getKeyCount() { + return (bloomCount()-1) * this.keyInterval + this.curKeys; + } + + @Override + public int getMaxKeys() { + return bloomCount() * this.keyInterval; + } + + @Override + public int getByteSize() { + return bloomCount() * this.matrix[0].getByteSize(); + } + + @Override + public void finalize() { + } + + /** + * Adds a new row to this dynamic Bloom filter. + */ + private void addRow() { + ByteBloomFilter[] tmp = new ByteBloomFilter[matrix.length + 1]; + + for (int i = 0; i < matrix.length; i++) { + tmp[i] = matrix[i]; + } + + tmp[tmp.length-1] = new ByteBloomFilter(keyInterval, errorRate, hashType, 0); + tmp[tmp.length-1].allocBloom(); + matrix = tmp; + } + + /** + * Returns the currently-unfilled row in the dynamic Bloom Filter array. + * @return BloomFilter The active standard Bloom filter. + * Null otherwise. + */ + private BloomFilter getCurBloom() { + if (curKeys >= keyInterval) { + return null; + } + + return matrix[matrix.length - 1]; + } + + @Override + public Writable getMetaWriter() { + return new MetaWriter(); + } + + @Override + public Writable getDataWriter() { + return new DataWriter(); + } + + private class MetaWriter implements Writable { + protected MetaWriter() {} + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Cant read with this class."); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(VERSION); + out.writeInt(keyInterval); + out.writeFloat(errorRate); + out.writeInt(hashType); + out.writeInt(matrix.length); + out.writeInt(curKeys); + } + } + + private class DataWriter implements Writable { + protected DataWriter() {} + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Cant read with this class."); + } + + @Override + public void write(DataOutput out) throws IOException { + for (int i = 0; i < matrix.length; ++i) { + matrix[i].writeBloom(out); + } + } + } +} Index: src/java/org/apache/hadoop/hbase/util/BloomFilter.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/BloomFilter.java (revision 0) +++ src/java/org/apache/hadoop/hbase/util/BloomFilter.java (revision 0) @@ -0,0 +1,89 @@ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.Writable; + +/** + * + */ +public interface BloomFilter { + /** + * Allocate memory for the bloom filter data. Note that bloom data isn't + * allocated by default because it can grow large & reads would be better + * managed by the LRU cache. + */ + void allocBloom(); + + /** + * Add the specified binary to the bloom filter. + * + * @param buf data to be added to the bloom + * @return true on success, false on failure + */ + boolean add(byte []buf); + + /** + * Add the specified binary to the bloom filter. + * + * @param buf data to be added to the bloom + * @param offset offset into the data to be added + * @param len length of the data to be added + * @return true on success, false on failure + */ + boolean add(byte []buf, int offset, int len); + + /** + * Check if the specified key is contained in the bloom filter. + * + * @param buf data to check for existence of + * @param bloom bloom filter data to search + * @return true if matched by bloom, false if not + */ + boolean contains(byte [] buf, ByteBuffer bloom); + + /** + * Check if the specified key is contained in the bloom filter. + * + * @param buf data to check for existence of + * @param offset offset into the data + * @param length length of the data + * @param bloom bloom filter data to search + * @return true if matched by bloom, false if not + */ + boolean contains(byte [] buf, int offset, int length, ByteBuffer bloom); + + /** + * @return The number of keys added to the bloom + */ + int getKeyCount(); + + /** + * @return The max number of keys that can be inserted + * to maintain the desired error rate + */ + public int getMaxKeys(); + + /** + * Size of the bloom, in bytes + */ + public int getByteSize(); + + /** + * Finalize the bloom before writing metadata & data to disk + */ + void finalize(); + + /** + * Get a writable interface into bloom filter meta data. + * @return writable class + */ + Writable getMetaWriter(); + + /** + * Get a writable interface into bloom filter data (actual bloom). + * @return writable class + */ + Writable getDataWriter(); +} Index: src/java/org/apache/hadoop/hbase/util/Hash.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/Hash.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/util/Hash.java (working copy) @@ -104,16 +104,29 @@ * @return hash value */ public int hash(byte[] bytes, int initval) { - return hash(bytes, bytes.length, initval); + return hash(bytes, 0, bytes.length, initval); } /** * Calculate a hash using bytes from 0 to length, and * the provided seed value * @param bytes input bytes - * @param length length of the valid bytes to consider + * @param length length of the valid bytes after offset to consider * @param initval seed value * @return hash value */ - public abstract int hash(byte[] bytes, int length, int initval); + public int hash(byte[] bytes, int length, int initval) { + return hash(bytes, 0, length, initval); + } + + /** + * Calculate a hash using bytes from 0 to length, and + * the provided seed value + * @param bytes input bytes + * @param offset the offset into the array to start consideration + * @param length length of the valid bytes after offset to consider + * @param initval seed value + * @return hash value + */ + public abstract int hash(byte[] bytes, int offset, int length, int initval); } Index: src/java/org/apache/hadoop/hbase/util/Migrate.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/Migrate.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/util/Migrate.java (working copy) @@ -375,9 +375,9 @@ BloomFilterMapFile.Reader src = hsf.getReader(fs, false, false); String compression = conf.get("migrate.compression", "NONE").trim(); Compression.Algorithm compressAlgorithm = Compression.Algorithm.valueOf(compression); - HFile.Writer tgt = StoreFile.getWriter(fs, familydir, + HFile.Writer tgt = StoreFile.createWriter(fs, familydir, conf.getInt("hfile.min.blocksize.size", 64*1024), - compressAlgorithm, getComparator(basedir)); + compressAlgorithm, getComparator(basedir), null, StoreFile.BloomType.NONE, 0); // From old 0.19 HLogEdit. ImmutableBytesWritable deleteBytes = new ImmutableBytesWritable("HBASE::DELETEVAL".getBytes("UTF-8")); @@ -397,8 +397,9 @@ tgt.append(kv); } long seqid = hsf.loadInfo(fs); - StoreFile.appendMetadata(tgt, seqid, - hsf.isMajorCompaction()); + tgt.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(seqid)); + tgt.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(hsf.isMajorCompaction())); // Success, delete src. src.close(); tgt.close(); @@ -415,11 +416,11 @@ } } - private static KeyValue.KeyComparator getComparator(final Path tabledir) { + private static KeyValue.KVComparator getComparator(final Path tabledir) { String tablename = tabledir.getName(); - return tablename.equals("-ROOT-")? KeyValue.META_KEY_COMPARATOR: - tablename.equals(".META.")? KeyValue.META_KEY_COMPARATOR: - KeyValue.KEY_COMPARATOR; + return tablename.equals("-ROOT-")? KeyValue.META_COMPARATOR: + tablename.equals(".META.")? KeyValue.META_COMPARATOR: + KeyValue.COMPARATOR; } /* Index: src/java/org/apache/hadoop/hbase/util/MurmurHash.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/MurmurHash.java (revision 942269) +++ src/java/org/apache/hadoop/hbase/util/MurmurHash.java (working copy) @@ -33,7 +33,7 @@ } @Override - public int hash(byte[] data, int length, int seed) { + public int hash(byte[] data, int offset, int length, int seed) { int m = 0x5bd1e995; int r = 24; @@ -42,7 +42,7 @@ int len_4 = length >> 2; for (int i = 0; i < len_4; i++) { - int i_4 = i << 2; + int i_4 = (i << 2) + offset; int k = data[i_4 + 3]; k = k << 8; k = k | (data[i_4 + 2] & 0xff); @@ -60,16 +60,17 @@ // avoid calculating modulo int len_m = len_4 << 2; int left = length - len_m; + int i_m = len_m + offset; if (left != 0) { if (left >= 3) { - h ^= data[len_m + 2] << 16; + h ^= data[i_m + 2] << 16; } if (left >= 2) { - h ^= data[len_m + 1] << 8; + h ^= data[i_m + 1] << 8; } if (left >= 1) { - h ^= data[len_m]; + h ^= data[i_m]; } h *= m; Index: bin/HBase.rb =================================================================== --- bin/HBase.rb (revision 942269) +++ bin/HBase.rb (working copy) @@ -342,7 +342,7 @@ arg[HColumnDescriptor::BLOCKCACHE]? JBoolean.valueOf(arg[HColumnDescriptor::BLOCKCACHE]): HColumnDescriptor::DEFAULT_BLOCKCACHE, arg[HColumnDescriptor::BLOCKSIZE]? JInteger.valueOf(arg[HColumnDescriptor::BLOCKSIZE]): HColumnDescriptor::DEFAULT_BLOCKSIZE, arg[HColumnDescriptor::TTL]? JInteger.new(arg[HColumnDescriptor::TTL]): HColumnDescriptor::DEFAULT_TTL, - arg[HColumnDescriptor::BLOOMFILTER]? JBoolean.valueOf(arg[HColumnDescriptor::BLOOMFILTER]): HColumnDescriptor::DEFAULT_BLOOMFILTER) + arg[HColumnDescriptor::BLOOMFILTER]? arg[HColumnDescriptor::BLOOMFILTER]: HColumnDescriptor::DEFAULT_BLOOMFILTER) end def zk(args)