From 65835be2e8eb96909cdbf10a3559fb9678ca9a96 Mon Sep 17 00:00:00 2001 From: Guangxu Cheng Date: Tue, 29 May 2018 20:11:42 +0800 Subject: [PATCH] HBASE-20636 Introduce two bloom filter type : ROWPREFIX and ROWPREFIX_DELIMITED Signed-off-by: Andrew Purtell Amending-Author: Andrew Purtell --- .../hadoop/hbase/regionserver/BloomType.java | 10 +- .../actions/ChangeBloomFilterAction.java | 6 + .../hbase/mapreduce/HFileOutputFormat2.java | 51 +++ .../hadoop/hbase/util/LoadTestTool.java | 18 + .../io/hfile/CompoundBloomFilterWriter.java | 6 +- .../apache/hadoop/hbase/master/HMaster.java | 14 + .../hadoop/hbase/regionserver/HStoreFile.java | 3 + .../hbase/regionserver/StoreFileReader.java | 120 +++++- .../hbase/regionserver/StoreFileWriter.java | 45 +- .../hadoop/hbase/util/BloomFilterChunk.java | 6 +- .../hadoop/hbase/util/BloomFilterUtil.java | 50 ++- .../util/RowPrefixDelimiterBloomContext.java | 62 +++ .../RowPrefixFixedLengthBloomContext.java | 58 +++ .../hfile/TestSeekBeforeWithInlineBlocks.java | 5 + .../regionserver/CreateRandomStoreFile.java | 23 + .../hbase/regionserver/TestHStoreFile.java | 2 +- .../regionserver/TestMultiColumnScanner.java | 3 + .../TestRowPrefixBloomFilter.java | 399 ++++++++++++++++++ .../regionserver/TestScanWithBloomError.java | 3 + .../regionserver/TestSeekOptimizations.java | 3 + 20 files changed, 852 insertions(+), 35 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixFixedLengthBloomContext.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java index 7787842325..c0b2dd2041 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java @@ -34,5 +34,13 @@ public enum BloomType { /** * Bloom enabled with Table row & column (family+qualifier) as Key */ - ROWCOL + ROWCOL, + /** + * Bloom enabled with Table row prefix as Key, specify the length of the prefix + */ + ROWPREFIX_FIXED_LENGTH, + /** + * Bloom enabled with Table row prefix as Key, specify the delimiter of the prefix + */ + ROWPREFIX_DELIMITED } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeBloomFilterAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeBloomFilterAction.java index 47ef6489df..4faf05df98 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeBloomFilterAction.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeBloomFilterAction.java @@ -22,6 +22,7 @@ import java.util.Random; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.BloomFilterUtil; /** * Action that tries to adjust the bloom filter setting on all the columns of a @@ -53,6 +54,11 @@ public class ChangeBloomFilterAction extends Action { LOG.debug("Performing action: About to set bloom filter type to " + bloomType + " on column " + columnName + " of table " + tableName); columnBuilder.setBloomFilterType(bloomType); + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + columnBuilder.setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "10"); + } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + columnBuilder.setConfiguration(BloomFilterUtil.DELIMITER_KEY, "#"); + } }); LOG.debug("Performing action: Just set bloom filter types on table " + tableName); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 0063c56a99..c911e8c867 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -149,6 +150,8 @@ public class HFileOutputFormat2 "hbase.hfileoutputformat.families.compression"; static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; + static final String BLOOM_PARAM_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.bloomparam"; static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = @@ -216,6 +219,7 @@ public class HFileOutputFormat2 // create a map from column family to the compression algorithm final Map compressionMap = createFamilyCompressionMap(conf); final Map bloomTypeMap = createFamilyBloomTypeMap(conf); + final Map bloomParamMap = createFamilyBloomParamMap(conf); final Map blockSizeMap = createFamilyBlockSizeMap(conf); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); @@ -399,6 +403,12 @@ public class HFileOutputFormat2 compression = compression == null ? defaultCompression : compression; BloomType bloomType = bloomTypeMap.get(tableAndFamily); bloomType = bloomType == null ? BloomType.NONE : bloomType; + String bloomParam = bloomParamMap.get(tableAndFamily); + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); + } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + conf.set(BloomFilterUtil.DELIMITER_KEY, bloomParam); + } Integer blockSize = blockSizeMap.get(tableAndFamily); blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; DataBlockEncoding encoding = overriddenEncoding; @@ -667,6 +677,8 @@ public class HFileOutputFormat2 tableDescriptors)); conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); + conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails, + tableDescriptors)); conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); @@ -694,6 +706,8 @@ public class HFileOutputFormat2 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); + conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); @@ -741,6 +755,19 @@ public class HFileOutputFormat2 return bloomTypeMap; } + /** + * Runs inside the task to deserialize column family to bloom filter param + * map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to the the configured bloom filter param + */ + @VisibleForTesting + static Map createFamilyBloomParamMap(Configuration conf) { + return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); + } + + /** * Runs inside the task to deserialize column family to block size * map from the configuration. @@ -908,6 +935,30 @@ public class HFileOutputFormat2 return bloomType; }; + /** + * Serialize column family to bloom param map to configuration. Invoked while + * configuring the MR job for incremental load. + * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into + * + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static Function bloomParamDetails = familyDescriptor -> { + BloomType bloomType = familyDescriptor.getBloomFilterType(); + String bloomParam = ""; + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); + } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.DELIMITER_KEY); + } + return bloomParam; + }; + /** * Serialize column family to data block encoding map to configuration. * Invoked while configuring the MR job for incremental load. diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index f4f483e847..f0e04c4d60 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -119,6 +119,7 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_VERBOSE = "verbose"; public static final String OPT_BLOOM = "bloom"; + public static final String OPT_BLOOM_PARAM = "bloom_param"; public static final String OPT_COMPRESSION = "compression"; public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; @@ -330,6 +331,7 @@ public class LoadTestTool extends AbstractHBaseTool { addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); + addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type"); addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE); addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + @@ -551,6 +553,22 @@ public class LoadTestTool extends AbstractHBaseTool { bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr); + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + if (!cmd.hasOption(OPT_BLOOM_PARAM)) { + LOG.error("the parameter of bloom filter {} is not specified", bloomType.name()); + } else { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); + } + } + + if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + if (!cmd.hasOption(OPT_BLOOM_PARAM)) { + LOG.error("the parameter of bloom filter {} is not specified", bloomType.name()); + } else { + conf.set(BloomFilterUtil.DELIMITER_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); + } + } + inMemoryCF = cmd.hasOption(OPT_INMEMORY); if (cmd.hasOption(OPT_ENCRYPTION)) { cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java index d63a824832..a4a805e8c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java @@ -174,12 +174,12 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase + Bytes.toStringBinary(firstKeyInChunk)); } // This will be done only once per chunk - if (bloomType == BloomType.ROW) { - firstKeyInChunk = CellUtil.copyRow(cell); - } else { + if (bloomType == BloomType.ROWCOL) { firstKeyInChunk = PrivateCellUtil .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell)); + } else { + firstKeyInChunk = CellUtil.copyRow(cell); } allocateNewChunk(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index ad7c355357..86e7260e7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; +import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -191,6 +192,7 @@ import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EncryptionTest; @@ -2153,6 +2155,8 @@ public class HMaster extends HRegionServer implements MasterServices { // check replication scope checkReplicationScope(hcd); + // check bloom filter type + checkBloomFilterType(hcd); // check data replication factor, it can be 0(default value) when user has not explicitly // set the value, in this case we use default replication factor set in the file system. @@ -2235,6 +2239,16 @@ public class HMaster extends HRegionServer implements MasterServices { } } + private static void checkBloomFilterType(ColumnFamilyDescriptor cfd) + throws IOException { + Configuration conf = new CompoundConfiguration().addStringMap(cfd.getConfiguration()); + try { + BloomFilterUtil.getBloomFilterParam(cfd.getBloomFilterType(), conf); + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException("Failed to get bloom filter param", e); + } + } + // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, String message, Exception cause) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 4a0c66f501..4aff949c6f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -86,6 +86,9 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener { /** Bloom filter Type in FileInfo */ public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); + /** Bloom filter param in FileInfo */ + public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM"); + /** Delete Family Count in FileInfo */ public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index 9080c2db84..eade3d71a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; import static org.apache.hadoop.hbase.regionserver.HStoreFile.LAST_BLOOM_KEY; @@ -74,6 +75,8 @@ public class StoreFileReader { private boolean bulkLoadResult = false; private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; private boolean skipResetSeqId = true; + private int prefixLength = -1; + private byte[] delimiter = null; // Counter that is incremented every time a scanner is created on the // store file. It is decremented when the scan on the store file is @@ -119,6 +122,8 @@ public class StoreFileReader { this.bulkLoadResult = reader.bulkLoadResult; this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV; this.skipResetSeqId = reader.skipResetSeqId; + this.prefixLength = reader.prefixLength; + this.delimiter = reader.delimiter; } public boolean isPrimaryReplicaReader() { @@ -228,7 +233,7 @@ public class StoreFileReader { /** * Check if this storeFile may contain keys within the TimeRange that * have not expired (i.e. not older than oldestUnexpiredTS). - * @param timeRange the timeRange to restrict + * @param tr the timeRange to restrict * @param oldestUnexpiredTS the oldest timestamp that is not expired, as * determined by the column family's TTL * @return false if queried keys definitely don't exist in this StoreFile @@ -255,18 +260,18 @@ public class StoreFileReader { * False if the Bloom filter is applicable and the scan fails it. */ boolean passesBloomFilter(Scan scan, final SortedSet columns) { - // Multi-column non-get scans will use Bloom filters through the - // lower-level API function that this function calls. - if (!scan.isGetScan()) { - return true; - } - byte[] row = scan.getStartRow(); switch (this.bloomFilterType) { case ROW: + if (!scan.isGetScan()) { + return true; + } return passesGeneralRowBloomFilter(row, 0, row.length); case ROWCOL: + if (!scan.isGetScan()) { + return true; + } if (columns != null && columns.size() == 1) { byte[] column = columns.first(); // create the required fake key @@ -277,7 +282,10 @@ public class StoreFileReader { // For multi-column queries the Bloom filter is checked from the // seekExact operation. return true; - + case ROWPREFIX_FIXED_LENGTH: + return passesGeneralRowPrefixBloomFilter(scan); + case ROWPREFIX_DELIMITED: + return passesGeneralDelimitedRowPrefixBloomFilter(scan); default: return true; } @@ -318,7 +326,7 @@ public class StoreFileReader { * * @return True if passes */ - public boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) { + private boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) { BloomFilter bloomFilter = this.generalBloomFilter; if (bloomFilter == null) { return true; @@ -328,7 +336,7 @@ public class StoreFileReader { byte[] key = null; if (rowOffset != 0 || rowLen != row.length) { throw new AssertionError( - "For row-only Bloom filters the row " + "must occupy the whole array"); + "For row-only Bloom filters the row must occupy the whole array"); } key = row; return checkGeneralBloomFilter(key, null, bloomFilter); @@ -358,6 +366,76 @@ public class StoreFileReader { return checkGeneralBloomFilter(null, kvKey, bloomFilter); } + /** + * A method for checking Bloom filters. Called directly from + * StoreFileScanner in case of a multi-column query. + * + * @return True if passes + */ + private boolean passesGeneralRowPrefixBloomFilter(Scan scan) { + BloomFilter bloomFilter = this.generalBloomFilter; + if (bloomFilter == null) { + return true; + } + + byte[] row = scan.getStartRow(); + byte[] rowPrefix; + if (scan.isGetScan()) { + rowPrefix = Bytes.copy(row, 0, Math.min(prefixLength, row.length)); + } else { + // For non-get scans + // Find out the common prefix of startRow and stopRow. + int commonLength = Bytes.findCommonPrefix(scan.getStartRow(), scan.getStopRow(), + scan.getStartRow().length, scan.getStopRow().length, 0, 0); + // startRow and stopRow don't have the common prefix. + // Or the common prefix length is less than prefixLength + if (commonLength <= 0 || commonLength < prefixLength) { + return true; + } + rowPrefix = Bytes.copy(row, 0, prefixLength); + } + return checkGeneralBloomFilter(rowPrefix, null, bloomFilter); + } + + /** + * A method for checking Bloom filters. Called directly from + * StoreFileScanner in case of a multi-column query. + * + * @return True if passes + */ + private boolean passesGeneralDelimitedRowPrefixBloomFilter(Scan scan) { + BloomFilter bloomFilter = this.generalBloomFilter; + if (bloomFilter == null) { + return true; + } + + byte[] row = scan.getStartRow(); + byte[] rowPrefix; + if (scan.isGetScan()) { + int rowPrefixLength = Bytes.indexOf(row, delimiter); + if (rowPrefixLength <= 0) { + rowPrefix = row; + } else { + rowPrefix = Bytes.copy(row, 0, rowPrefixLength); + } + } else { + // For non-get scans + // If startRow does not contain delimiter, return true directly. + int startRowPrefixLength = Bytes.indexOf(row, delimiter); + if (startRowPrefixLength <= 0) { + return true; + } + // If stopRow does not have the same prefix as startRow, return true directly. + int commonLength = Bytes.findCommonPrefix(scan.getStartRow(), scan.getStopRow(), + startRowPrefixLength, scan.getStopRow().length, 0, 0); + if (commonLength < startRowPrefixLength) { + return true; + } + rowPrefix = Bytes.copy(row, 0, startRowPrefixLength); + } + return checkGeneralBloomFilter(rowPrefix, null, bloomFilter); + } + private boolean checkGeneralBloomFilter(byte[] key, Cell kvKey, BloomFilter bloomFilter) { // Empty file if (reader.getTrailer().getEntryCount() == 0) { @@ -386,10 +464,10 @@ public class StoreFileReader { // hbase:meta does not have blooms. So we need not have special interpretation // of the hbase:meta cells. We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom if (keyIsAfterLast) { - if (bloomFilterType == BloomType.ROW) { - keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0); - } else { + if (bloomFilterType == BloomType.ROWCOL) { keyIsAfterLast = (CellComparator.getInstance().compare(kvKey, lastBloomKeyOnlyKV)) > 0; + } else { + keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0); } } @@ -465,6 +543,13 @@ public class StoreFileReader { bloomFilterType = BloomType.valueOf(Bytes.toString(b)); } + byte[] p = fi.get(BLOOM_FILTER_PARAM_KEY); + if (bloomFilterType == BloomType.ROWPREFIX_FIXED_LENGTH) { + prefixLength = Bytes.toInt(p); + } else if (bloomFilterType == BloomType.ROWPREFIX_DELIMITED) { + delimiter = p; + } + lastBloomKey = fi.get(LAST_BLOOM_KEY); if(bloomFilterType == BloomType.ROWCOL) { lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(lastBloomKey, 0, lastBloomKey.length); @@ -665,4 +750,13 @@ public class StoreFileReader { public interface Listener { void storeFileReaderClosed(StoreFileReader reader); } + + + public int getPrefixLength() { + return prefixLength; + } + + public byte[] getDelimiter() { + return delimiter; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index 2782e6f5d0..b31df39534 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; @@ -44,11 +45,14 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.util.BloomContext; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.RowBloomContext; import org.apache.hadoop.hbase.util.RowColBloomContext; +import org.apache.hadoop.hbase.util.RowPrefixDelimiterBloomContext; +import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +69,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { private final BloomFilterWriter generalBloomFilterWriter; private final BloomFilterWriter deleteFamilyBloomFilterWriter; private final BloomType bloomType; + private byte[] bloomParam = null; private long earliestPutTs = HConstants.LATEST_TIMESTAMP; private long deleteFamilyCnt = 0; private BloomContext bloomContext = null; @@ -110,21 +115,32 @@ public class StoreFileWriter implements CellSink, ShipperListener { if (generalBloomFilterWriter != null) { this.bloomType = bloomType; + this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); if (LOG.isTraceEnabled()) { - LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", " + - generalBloomFilterWriter.getClass().getSimpleName()); + LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " + + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH? + Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam)) + + ", " + generalBloomFilterWriter.getClass().getSimpleName()); } // init bloom context switch (bloomType) { - case ROW: - bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator); - break; - case ROWCOL: - bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator); - break; - default: - throw new IOException( - "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL expected)"); + case ROW: + bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator); + break; + case ROWCOL: + bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator); + break; + case ROWPREFIX_FIXED_LENGTH: + bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, comparator, + Bytes.toInt(bloomParam)); + break; + case ROWPREFIX_DELIMITED: + bloomContext = new RowPrefixDelimiterBloomContext(generalBloomFilterWriter, comparator, + bloomParam); + break; + default: + throw new IOException("Invalid Bloom filter type: " + + bloomType + " (ROW or ROWCOL or ROWPREFIX or ROWPREFIX_DELIMITED expected)"); } } else { // Not using Bloom filters. @@ -206,9 +222,11 @@ public class StoreFileWriter implements CellSink, ShipperListener { * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp * - * 2 Types of Filtering: + * 4 Types of Filtering: * 1. Row = Row * 2. RowCol = Row + Qualifier + * 3. RowPrefixFixedLength = Fixed Length Row Prefix + * 4. RowPrefixDelimiter = Delimited Row Prefix */ bloomContext.writeBloom(cell); } @@ -280,6 +298,9 @@ public class StoreFileWriter implements CellSink, ShipperListener { if (hasGeneralBloom) { writer.addGeneralBloomFilter(generalBloomFilterWriter); writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); + if (bloomParam != null) { + writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); + } bloomContext.addLastBloomKey(writer); } return hasGeneralBloom; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java index d8fc3fb3c9..06cf699e34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java @@ -187,12 +187,12 @@ public class BloomFilterChunk implements BloomFilterBase { int hash1; int hash2; HashKey hashKey; - if (this.bloomType == BloomType.ROW) { - hashKey = new RowBloomHashKey(cell); + if (this.bloomType == BloomType.ROWCOL) { + hashKey = new RowColBloomHashKey(cell); hash1 = this.hash.hash(hashKey, 0); hash2 = this.hash.hash(hashKey, hash1); } else { - hashKey = new RowColBloomHashKey(cell); + hashKey = new RowBloomHashKey(cell); hash1 = this.hash.hash(hashKey, 0); hash2 = this.hash.hash(hashKey, hash1); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java index 33bea7a498..003f38444c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.util; import java.text.NumberFormat; import java.util.Random; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.yetus.audience.InterfaceAudience; @@ -46,6 +48,9 @@ public final class BloomFilterUtil { * estimate the ideal false positive rate. */ private static Random randomGeneratorForTest; + + public static final String PREFIX_LENGTH_KEY = "RowPrefixBloomFilter.prefix_length"; + public static final String DELIMITER_KEY = "RowPrefixDelimitedBloomFilter.delimiter"; /** Bit-value lookup array to prevent doing the same work over and over */ public static final byte [] bitvals = { @@ -239,8 +244,8 @@ public final class BloomFilterUtil { public static boolean contains(Cell cell, ByteBuff bloomBuf, int bloomOffset, int bloomSize, Hash hash, int hashCount, BloomType type) { - HashKey hashKey = type == BloomType.ROW ? new RowBloomHashKey(cell) - : new RowColBloomHashKey(cell); + HashKey hashKey = type == BloomType.ROWCOL ? new RowColBloomHashKey(cell) + : new RowBloomHashKey(cell); return contains(bloomBuf, bloomOffset, bloomSize, hash, hashCount, hashKey); } @@ -284,4 +289,45 @@ public final class BloomFilterUtil { return formatStats(bloomFilter) + STATS_RECORD_SEP + "Actual error rate: " + String.format("%.8f", bloomFilter.actualErrorRate()); } + + public static byte[] getBloomFilterParam(BloomType bloomFilterType, Configuration conf) + throws IllegalArgumentException{ + byte[] bloomParam = null; + String message = "Bloom filter type is " + bloomFilterType + ", "; + switch (bloomFilterType) { + case ROWPREFIX_FIXED_LENGTH: + String prefixLengthString = conf.get(PREFIX_LENGTH_KEY); + if (prefixLengthString == null) { + message += PREFIX_LENGTH_KEY + " not specified."; + throw new IllegalArgumentException(message); + } + int prefixLength; + try { + prefixLength = Integer.parseInt(prefixLengthString); + if (prefixLength <= 0 || prefixLength > HConstants.MAX_ROW_LENGTH) { + message += "the value of " + PREFIX_LENGTH_KEY + + " must >=0 and < " + HConstants.MAX_ROW_LENGTH; + throw new IllegalArgumentException(message); + } + } catch (NumberFormatException nfe) { + message = "Number format exception when parsing " + PREFIX_LENGTH_KEY + " for BloomType " + + bloomFilterType.toString() + ":" + + prefixLengthString; + throw new IllegalArgumentException(message, nfe); + } + bloomParam = Bytes.toBytes(prefixLength); + break; + case ROWPREFIX_DELIMITED: + String delimiterString = conf.get(DELIMITER_KEY); + if (delimiterString == null || delimiterString.length() == 0) { + message += DELIMITER_KEY + " not specified."; + throw new IllegalArgumentException(message); + } + bloomParam = Bytes.toBytes(delimiterString); + break; + default: + break; + } + return bloomParam; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java new file mode 100644 index 0000000000..5fcf43b114 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Handles ROWPREFIX_DELIMITED bloom related context. + * It works with both ByteBufferedCell and byte[] backed cells + */ +@InterfaceAudience.Private +public class RowPrefixDelimiterBloomContext extends RowBloomContext { + private final byte[] delimiter; + + public RowPrefixDelimiterBloomContext(BloomFilterWriter bloomFilterWriter, + CellComparator comparator, byte[] delimiter) { + super(bloomFilterWriter, comparator); + this.delimiter = delimiter; + } + + public void writeBloom(Cell cell) throws IOException { + super.writeBloom(getDelimitedRowPrefixCell(cell)); + } + + /** + * @param cell the new cell + * @return the new cell created by delimited row prefix + */ + private Cell getDelimitedRowPrefixCell(Cell cell) { + byte[] row = CellUtil.copyRow(cell); + int prefixLength = Bytes.indexOf(row, delimiter); + if (prefixLength <= 0) { + return cell; + } + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(row, 0, Math.min(prefixLength, row.length)) + .setType(Cell.Type.Put) + .build(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixFixedLengthBloomContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixFixedLengthBloomContext.java new file mode 100644 index 0000000000..5abbf1c159 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixFixedLengthBloomContext.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Handles ROWPREFIX bloom related context. + * It works with both ByteBufferedCell and byte[] backed cells + */ +@InterfaceAudience.Private +public class RowPrefixFixedLengthBloomContext extends RowBloomContext { + private final int prefixLength; + + public RowPrefixFixedLengthBloomContext(BloomFilterWriter bloomFilterWriter, + CellComparator comparator, int prefixLength) { + super(bloomFilterWriter, comparator); + this.prefixLength = prefixLength; + } + + public void writeBloom(Cell cell) throws IOException { + super.writeBloom(getRowPrefixCell(cell)); + } + + /** + * @param cell the cell + * @return the new cell created by row prefix + */ + private Cell getRowPrefixCell(Cell cell) { + byte[] row = CellUtil.copyRow(cell); + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(row, 0, Math.min(prefixLength, row.length)) + .setType(Cell.Type.Put) + .build(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java index f6b6fbc231..cadcad6e1f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; import org.junit.Test; @@ -80,6 +81,8 @@ public class TestSeekBeforeWithInlineBlocks { @Test public void testMultiIndexLevelRandomHFileWithBlooms() throws IOException { conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#"); // Try out different HFile versions to ensure reverse scan works on each version for (int hfileVersion = HFile.MIN_FORMAT_VERSION_WITH_TAGS; @@ -101,6 +104,8 @@ public class TestSeekBeforeWithInlineBlocks { conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, indexBlockSize); conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); + conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + conf.set(BloomFilterUtil.DELIMITER_KEY, "#"); Cell[] cells = new Cell[NUM_KV]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java index aa3acc4346..21c6d6e2df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.io.BytesWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +65,7 @@ public class CreateRandomStoreFile { private static final String VALUE_SIZE_OPTION = "v"; private static final String COMPRESSION_OPTION = "c"; private static final String BLOOM_FILTER_OPTION = "bf"; + private static final String BLOOM_FILTER_PARAM_OPTION = "bfp"; private static final String BLOCK_SIZE_OPTION = "bs"; private static final String BLOOM_BLOCK_SIZE_OPTION = "bfbs"; private static final String INDEX_BLOCK_SIZE_OPTION = "ibs"; @@ -103,6 +105,8 @@ public class CreateRandomStoreFile { options.addOption(BLOOM_FILTER_OPTION, "bloom_filter", true, "Bloom filter type, one of " + Arrays.toString(BloomType.values())); + options.addOption(BLOOM_FILTER_PARAM_OPTION, "bloom_param", true, + "the parameter of the bloom filter"); options.addOption(BLOCK_SIZE_OPTION, "block_size", true, "HFile block size"); options.addOption(BLOOM_BLOCK_SIZE_OPTION, "bloom_block_size", true, @@ -169,6 +173,25 @@ public class CreateRandomStoreFile { BLOOM_FILTER_OPTION)); } + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + if (!cmdLine.hasOption(BLOOM_FILTER_PARAM_OPTION)) { + LOG.error("the parameter of bloom filter is not specified"); + return false; + } else { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, + cmdLine.getOptionValue(BLOOM_FILTER_PARAM_OPTION)); + } + } + + if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + if (!cmdLine.hasOption(BLOOM_FILTER_PARAM_OPTION)) { + LOG.error("the parameter of bloom filter is not specified"); + return false; + } else { + conf.set(BloomFilterUtil.DELIMITER_KEY, cmdLine.getOptionValue(BLOOM_FILTER_PARAM_OPTION)); + } + } + int blockSize = HConstants.DEFAULT_BLOCKSIZE; if (cmdLine.hasOption(BLOCK_SIZE_OPTION)) blockSize = Integer.valueOf(cmdLine.getOptionValue(BLOCK_SIZE_OPTION)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index 5cd0403f5d..4b8f5f200f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -748,7 +748,7 @@ public class TestHStoreFile extends HBaseTestCase { reader.loadFileInfo(); reader.loadBloomfilter(); StoreFileScanner scanner = getStoreFileScanner(reader, false, false); - assertEquals(expKeys[x], reader.generalBloomFilter.getKeyCount()); + assertEquals(expKeys[x], reader.getGeneralBloomFilter().getKeyCount()); HStore store = mock(HStore.class); when(store.getColumnFamilyDescriptor()) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java index 20d28b3e4e..2ff0d8c24a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; import org.junit.Test; @@ -150,6 +151,8 @@ public class TestMultiColumnScanner { @Test public void testMultiColumnScanner() throws IOException { + TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#"); HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME, new HColumnDescriptor(FAMILY) .setCompressionType(comprAlgo) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java new file mode 100644 index 0000000000..61de21f44d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test TestRowPrefixBloomFilter + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestRowPrefixBloomFilter { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowPrefixBloomFilter.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestRowPrefixBloomFilter.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + private static final ChecksumType CKTYPE = ChecksumType.CRC32C; + private static final int CKBYTES = 512; + private boolean localfs = false; + private static Configuration conf; + private static FileSystem fs; + private static Path testDir; + private static final int BLOCKSIZE_SMALL = 8192; + private static final float err = (float) 0.01; + private static final int prefixLength = 10; + private static final String delimiter = "#"; + private static final String invalidFormatter = "%08d"; + private static final String prefixFormatter = "%010d"; + private static final String suffixFormatter = "%010d"; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err); + conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true); + conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, prefixLength); + conf.set(BloomFilterUtil.DELIMITER_KEY, delimiter); + + localfs = + (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0); + + if (fs == null) { + fs = FileSystem.get(conf); + } + try { + if (localfs) { + testDir = TEST_UTIL.getDataTestDir("TestRowPrefixBloomFilter"); + if (fs.exists(testDir)) { + fs.delete(testDir, true); + } + } else { + testDir = FSUtils.getRootDir(conf); + } + } catch (Exception e) { + LOG.error(HBaseMarkers.FATAL, "error during setup", e); + throw e; + } + } + + @After + public void tearDown() throws Exception { + try { + if (localfs) { + if (fs.exists(testDir)) { + fs.delete(testDir, true); + } + } + } catch (Exception e) { + LOG.error(HBaseMarkers.FATAL, "error during tear down", e); + } + } + + private static StoreFileScanner getStoreFileScanner(StoreFileReader reader) { + return reader.getStoreFileScanner(false, false, false, 0, 0, false); + } + + private void writeStoreFile(final Path f, BloomType bt, int expKeys, int prefixRowCount, + int suffixRowCount) throws IOException { + HFileContext meta = new HFileContextBuilder() + .withBlockSize(BLOCKSIZE_SMALL) + .withChecksumType(CKTYPE) + .withBytesPerCheckSum(CKBYTES) + .build(); + // Make a store file and write data to it. + StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withFilePath(f) + .withBloomType(bt) + .withMaxKeyCount(expKeys) + .withFileContext(meta) + .build(); + long now = System.currentTimeMillis(); + try { + //Put with valid row style + for (int i = 0; i < prefixRowCount; i += 2) { // prefix rows + String prefixRow = String.format(prefixFormatter, i); + for (int j = 0; j < suffixRowCount; j++) { // suffix rows + String row = prefixRow + "#" + String.format(suffixFormatter, j); + KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"), + Bytes.toBytes("col"), now, Bytes.toBytes("value")); + writer.append(kv); + } + } + + //Put with invalid row style + for (int i = prefixRowCount; i < prefixRowCount*2; i += 2) { // prefix rows + String row = String.format(invalidFormatter, i); + KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"), + Bytes.toBytes("col"), now, Bytes.toBytes("value")); + writer.append(kv); + } + } finally { + writer.close(); + } + } + + @Test + public void testRowPrefixBloomFilter() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + float expErr = 2*prefixRowCount*suffixRowCount*err; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + // read the file + StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true, + new AtomicInteger(0), true, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + //check basic param + assertEquals(bt[x], reader.getBloomFilterType()); + if (bt[x] == BloomType.ROWPREFIX_FIXED_LENGTH) { + assertEquals(prefixLength, reader.getPrefixLength()); + assertEquals("null", Bytes.toStringBinary(reader.getDelimiter())); + } else if (bt[x] == BloomType.ROWPREFIX_DELIMITED){ + assertEquals(-1, reader.getPrefixLength()); + assertEquals(delimiter, Bytes.toStringBinary(reader.getDelimiter())); + } + assertEquals(expKeys, reader.getGeneralBloomFilter().getKeyCount()); + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getColumnFamilyDescriptor()) + .thenReturn(ColumnFamilyDescriptorBuilder.of("family")); + // check false positives rate + int falsePos = 0; + int falseNeg = 0; + for (int i = 0; i < prefixRowCount; i++) { // prefix rows + String prefixRow = String.format(prefixFormatter, i); + for (int j = 0; j < suffixRowCount; j++) { // suffix rows + String startRow = prefixRow + "#" + String.format(suffixFormatter, j); + String stopRow = prefixRow + "#" + String.format(suffixFormatter, j+1); + Scan scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + boolean shouldPrefixRowExist = i % 2 == 0; + if (shouldPrefixRowExist) { + if (!exists) { + falseNeg++; + } + } else { + if (exists) { + falsePos++; + } + } + } + } + + for (int i = prefixRowCount; i < prefixRowCount * 2; i++) { // prefix rows + String row = String.format(invalidFormatter, i); + Scan scan = new Scan(new Get(Bytes.toBytes(row))); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + boolean shouldPrefixRowExist = i % 2 == 0; + if (shouldPrefixRowExist) { + if (!exists) { + falseNeg++; + } + } else { + if (exists) { + falsePos++; + } + } + } + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + assertEquals("False negatives: " + falseNeg, 0, falseNeg); + int maxFalsePos = (int) (2 * expErr); + assertTrue("Too many false positives: " + falsePos + + " (err=" + err + ", expected no more than " + maxFalsePos + ")", + falsePos <= maxFalsePos); + } + } + + @Test + public void testRowPrefixBloomFilterWithGet() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true, + new AtomicInteger(0), true, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getColumnFamilyDescriptor()) + .thenReturn(ColumnFamilyDescriptorBuilder.of("family")); + + //Get with valid row style + //prefix row in bloom + String prefixRow = String.format(prefixFormatter, prefixRowCount-2); + String row = prefixRow + "#" + String.format(suffixFormatter, 0); + Scan scan = new Scan(new Get(Bytes.toBytes(row))); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // prefix row not in bloom + prefixRow = String.format(prefixFormatter, prefixRowCount-1); + row = prefixRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + // Get with invalid row style + // ROWPREFIX: the length of row is less than prefixLength + // ROWPREFIX_DELIMITED: Row does not contain delimiter + // row in bloom + row = String.format(invalidFormatter, prefixRowCount+2); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // row not in bloom + row = String.format(invalidFormatter, prefixRowCount+1); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + } + } + + @Test + public void testRowPrefixBloomFilterWithScan() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true, + new AtomicInteger(0), true, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getColumnFamilyDescriptor()) + .thenReturn(ColumnFamilyDescriptorBuilder.of("family")); + + //Scan with valid row style. startRow and stopRow have a common prefix. + //And the length of the common prefix is no less than prefixLength. + //prefix row in bloom + String prefixRow = String.format(prefixFormatter, prefixRowCount-2); + String startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + String stopRow = prefixRow + "#" + String.format(suffixFormatter, 1); + Scan scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // prefix row not in bloom + prefixRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixRow + "#" + String.format(suffixFormatter, 1); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + // There is no common prefix between startRow and stopRow. + prefixRow = String.format(prefixFormatter, prefixRowCount-2); + startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + if (bt[x] == BloomType.ROWPREFIX_FIXED_LENGTH) { + // startRow and stopRow have a common prefix. + // But the length of the common prefix is less than prefixLength. + String prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + String prefixStopRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixStartRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + }else if (bt[x] == BloomType.ROWPREFIX_DELIMITED) { + // startRow does not contain delimiter + String prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + String prefixStopRow = String.format(prefixFormatter, prefixRowCount-2); + startRow = prefixStartRow + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // startRow contains delimiter, but stopRow does not have the same prefix as startRow. + prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + prefixStopRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixStartRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + } + + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java index ae692e96c6..4c58c011ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.ClassRule; @@ -103,6 +104,8 @@ public class TestScanWithBloomError { public void setUp() throws IOException{ conf = TEST_UTIL.getConfiguration(); fs = FileSystem.get(conf); + conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + conf.set(BloomFilterUtil.DELIMITER_KEY, "#"); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java index a6a6510b3b..1e04e8de8f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; @@ -142,6 +143,8 @@ public class TestSeekOptimizations { public void setUp() { rand = new Random(91238123L); expectedKVs.clear(); + TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#"); } @Test -- 2.19.0