From 462c8b7fc9e578207424136febeb3ebb322791fd Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 14 Jan 2019 13:56:38 -0800 Subject: [PATCH] HBASE-21678 Port HBASE-20636 (Introduce two bloom filter type ROWPREFIX and ROWPREFIX_DELIMITED) to branch-1 HBASE-20636 Introduce two bloom filter type ROWPREFIX and ROWPREFIX_DELIMITED --- .../hadoop/hbase/regionserver/BloomType.java | 10 +- .../org/apache/hadoop/hbase/util/Bytes.java | 10 + .../actions/ChangeBloomFilterAction.java | 17 +- .../hbase/mapreduce/HFileOutputFormat2.java | 43 +- .../apache/hadoop/hbase/master/HMaster.java | 15 + .../hadoop/hbase/regionserver/StoreFile.java | 259 +++++++++--- .../hadoop/hbase/util/BloomFilterUtil.java | 80 ++++ .../hfile/TestSeekBeforeWithInlineBlocks.java | 7 +- .../regionserver/CreateRandomStoreFile.java | 23 ++ .../regionserver/TestMultiColumnScanner.java | 3 + .../TestRowPrefixBloomFilter.java | 386 ++++++++++++++++++ .../regionserver/TestScanWithBloomError.java | 3 + .../regionserver/TestSeekOptimizations.java | 3 + .../hadoop/hbase/util/LoadTestTool.java | 18 + 14 files changed, 806 insertions(+), 71 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java index 50b8b15d3b..ff11e91d5b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java @@ -36,5 +36,13 @@ public enum BloomType { /** * Bloom enabled with Table row & column (family+qualifier) as Key */ - ROWCOL + ROWCOL, + /** + * Bloom enabled with Table row prefix as Key, specify the length of the prefix + */ + ROWPREFIX_FIXED_LENGTH, + /** + * Bloom enabled with Table row prefix as Key, specify the delimiter of the prefix + */ + ROWPREFIX_DELIMITED } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 22ade4b386..448f4b5e61 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -2638,4 +2638,14 @@ public class Bytes implements Comparable { return b; } + public static int findCommonPrefix(byte[] left, byte[] right, int leftLength, int rightLength, + int leftOffset, int rightOffset) { + int length = Math.min(leftLength, rightLength); + int result = 0; + + while (result < length && left[leftOffset + result] == right[rightOffset + result]) { + result++; + } + return result; + } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeBloomFilterAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeBloomFilterAction.java index 684cd62986..160e39e682 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeBloomFilterAction.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ChangeBloomFilterAction.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.BloomFilterUtil; /** * Action that tries to adjust the bloom filter setting on all the columns of a @@ -63,14 +64,16 @@ public class ChangeBloomFilterAction extends Action { final int bloomArraySize = bloomArray.length; for (HColumnDescriptor descriptor : columnDescriptors) { - int bloomFilterIndex = random.nextInt(bloomArraySize); + final String columnName = descriptor.getNameAsString(); + final BloomType bloomType = bloomArray[random.nextInt(bloomArraySize)]; LOG.debug("Performing action: About to set bloom filter type to " - + bloomArray[bloomFilterIndex] + " on column " - + descriptor.getNameAsString() + " of table " + tableName); - descriptor.setBloomFilterType(bloomArray[bloomFilterIndex]); - LOG.debug("Performing action: Just set bloom filter type to " - + bloomArray[bloomFilterIndex] + " on column " - + descriptor.getNameAsString() + " of table " + tableName); + + bloomType + " on column " + columnName + " of table " + tableName); + descriptor.setBloomFilterType(bloomType); + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + descriptor.setConfiguration(BloomFilterUtil.PREFIX_LENGTH_KEY, "10"); + } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + descriptor.setConfiguration(BloomFilterUtil.DELIMITER_KEY, "#"); + } } // Don't try the modify if we're stopping diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 31e7e5a770..853f359b90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.NullWritable; @@ -104,6 +105,8 @@ public class HFileOutputFormat2 "hbase.hfileoutputformat.families.compression"; private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; + private static final String BLOOM_PARAM_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.bloomparam"; private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = @@ -156,6 +159,7 @@ public class HFileOutputFormat2 // create a map from column family to the compression algorithm final Map compressionMap = createFamilyCompressionMap(conf); final Map bloomTypeMap = createFamilyBloomTypeMap(conf); + final Map bloomParamMap = createFamilyBloomParamMap(conf); final Map blockSizeMap = createFamilyBlockSizeMap(conf); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); @@ -295,6 +299,12 @@ public class HFileOutputFormat2 compression = compression == null ? defaultCompression : compression; BloomType bloomType = bloomTypeMap.get(family); bloomType = bloomType == null ? BloomType.NONE : bloomType; + String bloomParam = bloomParamMap.get(family); + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); + } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + conf.set(BloomFilterUtil.DELIMITER_KEY, bloomParam); + } Integer blockSize = blockSizeMap.get(family); blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; DataBlockEncoding encoding = overriddenEncoding; @@ -608,6 +618,18 @@ public class HFileOutputFormat2 return bloomTypeMap; } + /** + * Runs inside the task to deserialize column family to bloom filter param + * map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to the the configured bloom filter param + */ + @VisibleForTesting + static Map createFamilyBloomParamMap(Configuration conf) { + return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); + } + /** * Runs inside the task to deserialize column family to block size * map from the configuration. @@ -784,22 +806,33 @@ public class HFileOutputFormat2 return; } StringBuilder bloomTypeConfigValue = new StringBuilder(); + StringBuilder bloomParamConfigValue = new StringBuilder(); Collection families = tableDescriptor.getFamilies(); int i = 0; for (HColumnDescriptor familyDescriptor : families) { if (i++ > 0) { bloomTypeConfigValue.append('&'); + bloomParamConfigValue.append('&'); } - bloomTypeConfigValue.append(URLEncoder.encode( - familyDescriptor.getNameAsString(), "UTF-8")); + bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); bloomTypeConfigValue.append('='); - String bloomType = familyDescriptor.getBloomFilterType().toString(); + bloomParamConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); + bloomParamConfigValue.append('='); + BloomType bloomType = familyDescriptor.getBloomFilterType(); if (bloomType == null) { - bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; + bloomType = BloomType.valueOf(HColumnDescriptor.DEFAULT_BLOOMFILTER); + } + bloomTypeConfigValue.append(URLEncoder.encode(bloomType.toString(), "UTF-8")); + String bloomParam = ""; + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); + } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.DELIMITER_KEY); } - bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); + bloomParamConfigValue.append(URLEncoder.encode(bloomParam, "UTF-8")); } conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString()); + conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, bloomParamConfigValue.toString()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index d17e297c34..34d9bea18b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.CoordinatedStateException; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -155,6 +156,7 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.ConfigUtil; @@ -1911,6 +1913,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // check replication scope checkReplicationScope(hcd); + // check bloom filter type + checkBloomFilterType(hcd); + // check data replication factor, it can be 0(default value) when user has not explicitly // set the value, in this case we use default replication factor set in the file system. if (hcd.getDFSReplication() < 0) { @@ -1993,6 +1998,16 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } } + private static void checkBloomFilterType(HColumnDescriptor cfd) + throws IOException { + Configuration conf = new CompoundConfiguration().addStringMap(cfd.getConfiguration()); + try { + BloomFilterUtil.getBloomFilterParam(cfd.getBloomFilterType(), conf); + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException("Failed to get bloom filter param", e); + } + } + // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, String message, Exception cause) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 4c5cd7ca1e..c2dac8863f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -101,12 +102,13 @@ public class StoreFile { Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); /** Bloom filter Type in FileInfo */ - public static final byte[] BLOOM_FILTER_TYPE_KEY = - Bytes.toBytes("BLOOM_FILTER_TYPE"); + public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); + + /** Bloom filter param in FileInfo */ + public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM"); /** Delete Family Count in FileInfo */ - public static final byte[] DELETE_FAMILY_COUNT = - Bytes.toBytes("DELETE_FAMILY_COUNT"); + public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); /** Last Bloom filter key in FileInfo */ private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); @@ -816,6 +818,7 @@ public class StoreFile { private final BloomFilterWriter generalBloomFilterWriter; private final BloomFilterWriter deleteFamilyBloomFilterWriter; private final BloomType bloomType; + private byte[] bloomParam = null; private byte[] lastBloomKey; private int lastBloomKeyOffset, lastBloomKeyLen; private KVComparator kvComparator; @@ -823,6 +826,8 @@ public class StoreFile { private long earliestPutTs = HConstants.LATEST_TIMESTAMP; private Cell lastDeleteFamilyCell = null; private long deleteFamilyCnt = 0; + private int prefixLength = -1; + private byte[] delimiter = null; /** * timeRangeTrackerSet is used to figure if we were passed a filled-out TimeRangeTracker or not. @@ -903,8 +908,23 @@ public class StoreFile { if (generalBloomFilterWriter != null) { this.bloomType = bloomType; - if (LOG.isTraceEnabled()) LOG.trace("Bloom filter type for " + path + ": " + - this.bloomType + ", " + generalBloomFilterWriter.getClass().getSimpleName()); + this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); + switch (bloomType) { + case ROWPREFIX_FIXED_LENGTH: + this.prefixLength = Bytes.toInt(bloomParam); + break; + case ROWPREFIX_DELIMITED: + this.delimiter = bloomParam; + break; + default: + break; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " + + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH ? Bytes.toInt(bloomParam) : + Bytes.toStringBinary(bloomParam)) + + ", " + generalBloomFilterWriter.getClass().getSimpleName()); + } } else { // Not using Bloom filters. this.bloomType = BloomType.NONE; @@ -970,18 +990,35 @@ public class StoreFile { boolean newKey = true; if (this.lastCell != null) { switch(bloomType) { - case ROW: - newKey = ! kvComparator.matchingRows(cell, lastCell); - break; - case ROWCOL: - newKey = ! kvComparator.matchingRowColumn(cell, lastCell); - break; - case NONE: - newKey = false; - break; - default: - throw new IOException("Invalid Bloom filter type: " + bloomType + - " (ROW or ROWCOL expected)"); + case ROW: + newKey = ! kvComparator.matchingRows(cell, lastCell); + break; + case ROWCOL: + newKey = ! kvComparator.matchingRowColumn(cell, lastCell); + break; + case ROWPREFIX_FIXED_LENGTH: + newKey = ! kvComparator.matchingRows(cell.getRowArray(), cell.getRowOffset(), + Math.min(prefixLength, cell.getRowLength()), + lastCell.getRowArray(), lastCell.getRowOffset(), + Math.min(prefixLength, lastCell.getRowLength())); + break; + case ROWPREFIX_DELIMITED: + byte[] row = CellUtil.cloneRow(cell); + int delimiterIndex = Bytes.indexOf(row, delimiter); + if (delimiterIndex > 0) { + newKey = ! kvComparator.matchingRows(row, 0, delimiterIndex, + lastCell.getRowArray(), lastCell.getRowOffset(), + Math.min(delimiterIndex, lastCell.getRowLength())); + } else { + newKey = ! kvComparator.matchingRows(cell, lastCell); + } + break; + case NONE: + newKey = false; + break; + default: + throw new IOException("Invalid Bloom filter type: " + + bloomType + " (ROW or ROWCOL or ROWPREFIX or ROWPREFIX_DELIMITED expected)"); } } if (newKey) { @@ -989,48 +1026,62 @@ public class StoreFile { * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp * - * 2 Types of Filtering: + * 4 Types of Filtering: * 1. Row = Row * 2. RowCol = Row + Qualifier + * 3. RowPrefixFixedLength = Fixed Length Row Prefix + * 4. RowPrefixDelimiter = Delimited Row Prefix */ byte[] bloomKey; int bloomKeyOffset, bloomKeyLen; switch (bloomType) { - case ROW: - bloomKey = cell.getRowArray(); - bloomKeyOffset = cell.getRowOffset(); - bloomKeyLen = cell.getRowLength(); - break; - case ROWCOL: - // merge(row, qualifier) - // TODO: could save one buffer copy in case of compound Bloom - // filters when this involves creating a KeyValue - bloomKey = generalBloomFilterWriter.createBloomKey(cell.getRowArray(), - cell.getRowOffset(), cell.getRowLength(), cell.getQualifierArray(), - cell.getQualifierOffset(), cell.getQualifierLength()); - bloomKeyOffset = 0; - bloomKeyLen = bloomKey.length; - break; - default: - throw new IOException("Invalid Bloom filter type: " + bloomType + - " (ROW or ROWCOL expected)"); + case ROW: + bloomKey = cell.getRowArray(); + bloomKeyOffset = cell.getRowOffset(); + bloomKeyLen = cell.getRowLength(); + break; + case ROWCOL: + // merge(row, qualifier) + // TODO: could save one buffer copy in case of compound Bloom + // filters when this involves creating a KeyValue + bloomKey = generalBloomFilterWriter.createBloomKey(cell.getRowArray(), + cell.getRowOffset(), cell.getRowLength(), cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength()); + bloomKeyOffset = 0; + bloomKeyLen = bloomKey.length; + break; + case ROWPREFIX_FIXED_LENGTH: + bloomKey = cell.getRowArray(); + bloomKeyOffset = cell.getRowOffset(); + bloomKeyLen = Math.min(this.prefixLength, cell.getRowLength()); + break; + case ROWPREFIX_DELIMITED: + byte[] row = CellUtil.cloneRow(cell); + int delimiterIndex = Bytes.indexOf(row, delimiter); + bloomKey = row; + bloomKeyOffset = 0; + bloomKeyLen = delimiterIndex > 0 ? delimiterIndex : row.length; + break; + default: + throw new IOException("Invalid Bloom filter type: " + + bloomType + " (ROW or ROWCOL or ROWPREFIX or ROWPREFIX_DELIMITED expected)"); } generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen); if (lastBloomKey != null && generalBloomFilterWriter.getComparator().compareFlatKey(bloomKey, - bloomKeyOffset, bloomKeyLen, lastBloomKey, - lastBloomKeyOffset, lastBloomKeyLen) <= 0) { - throw new IOException("Non-increasing Bloom keys: " - + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen) - + " after " - + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset, - lastBloomKeyLen)); - } - lastBloomKey = bloomKey; - lastBloomKeyOffset = bloomKeyOffset; - lastBloomKeyLen = bloomKeyLen; - this.lastCell = cell; + bloomKeyOffset, bloomKeyLen, lastBloomKey, + lastBloomKeyOffset, lastBloomKeyLen) <= 0) { + throw new IOException("Non-increasing Bloom keys: " + + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen) + + " after " + + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset, + lastBloomKeyLen)); + } + lastBloomKey = bloomKey; + lastBloomKeyOffset = bloomKeyOffset; + lastBloomKeyLen = bloomKeyLen; + this.lastCell = cell; } } } @@ -1095,6 +1146,9 @@ public class StoreFile { // add the general Bloom filter writer and append file info if (hasGeneralBloom) { writer.addGeneralBloomFilter(generalBloomFilterWriter); + if (bloomParam != null) { + writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); + } writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); if (lastBloomKey != null) { @@ -1171,6 +1225,8 @@ public class StoreFile { private AtomicInteger refCount = new AtomicInteger(0); // Indicates if the file got compacted private volatile boolean compactedAway = false; + private int prefixLength = -1; + private byte[] delimiter = null; public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -1329,28 +1385,30 @@ public class StoreFile { */ boolean passesBloomFilter(Scan scan, final SortedSet columns) { - // Multi-column non-get scans will use Bloom filters through the - // lower-level API function that this function calls. - if (!scan.isGetScan()) { - return true; - } - byte[] row = scan.getStartRow(); switch (this.bloomFilterType) { case ROW: + if (!scan.isGetScan()) { + return true; + } return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0); case ROWCOL: + if (!scan.isGetScan()) { + return true; + } if (columns != null && columns.size() == 1) { byte[] column = columns.first(); return passesGeneralBloomFilter(row, 0, row.length, column, 0, column.length); } - // For multi-column queries the Bloom filter is checked from the // seekExact operation. return true; - + case ROWPREFIX_FIXED_LENGTH: + return passesGeneralRowPrefixBloomFilter(scan); + case ROWPREFIX_DELIMITED: + return passesGeneralDelimitedRowPrefixBloomFilter(scan); default: return true; } @@ -1409,6 +1467,8 @@ public class StoreFile { byte[] key; switch (bloomFilterType) { case ROW: + case ROWPREFIX_FIXED_LENGTH: + case ROWPREFIX_DELIMITED: if (col != null) { throw new RuntimeException("Row-only Bloom filter called with " + "column specified"); @@ -1492,6 +1552,76 @@ public class StoreFile { return true; } + /** + * A method for checking Bloom filters. Called directly from + * StoreFileScanner in case of a multi-column query. + * + * @return True if passes + */ + private boolean passesGeneralRowPrefixBloomFilter(Scan scan) { + BloomFilter bloomFilter = this.generalBloomFilter; + if (bloomFilter == null) { + return true; + } + + byte[] row = scan.getStartRow(); + byte[] rowPrefix; + if (scan.isGetScan()) { + rowPrefix = Bytes.copy(row, 0, Math.min(prefixLength, row.length)); + } else { + // For non-get scans + // Find out the common prefix of startRow and stopRow. + int commonLength = Bytes.findCommonPrefix(scan.getStartRow(), scan.getStopRow(), + scan.getStartRow().length, scan.getStopRow().length, 0, 0); + // startRow and stopRow don't have the common prefix. + // Or the common prefix length is less than prefixLength + if (commonLength <= 0 || commonLength < prefixLength) { + return true; + } + rowPrefix = Bytes.copy(row, 0, prefixLength); + } + return passesGeneralBloomFilter(rowPrefix, 0, rowPrefix.length, null, 0, 0); + } + + /** + * A method for checking Bloom filters. Called directly from + * StoreFileScanner in case of a multi-column query. + * + * @return True if passes + */ + private boolean passesGeneralDelimitedRowPrefixBloomFilter(Scan scan) { + BloomFilter bloomFilter = this.generalBloomFilter; + if (bloomFilter == null) { + return true; + } + + byte[] row = scan.getStartRow(); + byte[] rowPrefix; + if (scan.isGetScan()) { + int rowPrefixLength = Bytes.indexOf(row, delimiter); + if (rowPrefixLength <= 0) { + rowPrefix = row; + } else { + rowPrefix = Bytes.copy(row, 0, rowPrefixLength); + } + } else { + // For non-get scans + // If startRow does not contain delimiter, return true directly. + int startRowPrefixLength = Bytes.indexOf(row, delimiter); + if (startRowPrefixLength <= 0) { + return true; + } + // If stopRow does not have the same prefix as startRow, return true directly. + int commonLength = Bytes.findCommonPrefix(scan.getStartRow(), scan.getStopRow(), + startRowPrefixLength, scan.getStopRow().length, 0, 0); + if (commonLength < startRowPrefixLength) { + return true; + } + rowPrefix = Bytes.copy(row, 0, startRowPrefixLength); + } + return passesGeneralBloomFilter(rowPrefix, 0, rowPrefix.length, null, 0, 0); + } + /** * Checks whether the given scan rowkey range overlaps with the current storefile's * @param scan the scan specification. Used to determine the rowkey range. @@ -1529,6 +1659,13 @@ public class StoreFile { bloomFilterType = BloomType.valueOf(Bytes.toString(b)); } + byte[] p = fi.get(BLOOM_FILTER_PARAM_KEY); + if (bloomFilterType == BloomType.ROWPREFIX_FIXED_LENGTH) { + prefixLength = Bytes.toInt(p); + } else if (bloomFilterType == BloomType.ROWPREFIX_DELIMITED) { + delimiter = p; + } + lastBloomKey = fi.get(LAST_BLOOM_KEY); byte[] cnt = fi.get(DELETE_FAMILY_COUNT); if (cnt != null) { @@ -1710,6 +1847,14 @@ public class StoreFile { public long getMaxTimestamp() { return timeRange == null ? TimeRange.INITIAL_MAX_TIMESTAMP: timeRange.getMax(); } + + public int getPrefixLength() { + return prefixLength; + } + + public byte[] getDelimiter() { + return delimiter; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java new file mode 100644 index 0000000000..d31012c44a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.BloomType; + +/** + * Utility methods related to BloomFilters + */ +@InterfaceAudience.Private +public final class BloomFilterUtil { + + public static final String PREFIX_LENGTH_KEY = "RowPrefixBloomFilter.prefix_length"; + public static final String DELIMITER_KEY = "RowPrefixDelimitedBloomFilter.delimiter"; + + /** + * Private constructor to keep this class from being instantiated. + */ + private BloomFilterUtil() { + } + + public static byte[] getBloomFilterParam(BloomType bloomFilterType, Configuration conf) + throws IllegalArgumentException{ + byte[] bloomParam = null; + String message = "Bloom filter type is " + bloomFilterType + ", "; + switch (bloomFilterType) { + case ROWPREFIX_FIXED_LENGTH: + String prefixLengthString = conf.get(PREFIX_LENGTH_KEY); + if (prefixLengthString == null) { + message += PREFIX_LENGTH_KEY + " not specified."; + throw new IllegalArgumentException(message); + } + int prefixLength; + try { + prefixLength = Integer.parseInt(prefixLengthString); + if (prefixLength <= 0 || prefixLength > HConstants.MAX_ROW_LENGTH) { + message += "the value of " + PREFIX_LENGTH_KEY + + " must >=0 and < " + HConstants.MAX_ROW_LENGTH; + throw new IllegalArgumentException(message); + } + } catch (NumberFormatException nfe) { + message = "Number format exception when parsing " + PREFIX_LENGTH_KEY + " for BloomType " + + bloomFilterType.toString() + ":" + + prefixLengthString; + throw new IllegalArgumentException(message, nfe); + } + bloomParam = Bytes.toBytes(prefixLength); + break; + case ROWPREFIX_DELIMITED: + String delimiterString = conf.get(DELIMITER_KEY); + if (delimiterString == null || delimiterString.length() == 0) { + message += DELIMITER_KEY + " not specified."; + throw new IllegalArgumentException(message); + } + bloomParam = Bytes.toBytes(delimiterString); + break; + default: + break; + } + return bloomParam; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java index ac92f4f32f..fbf68dffa8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -76,7 +77,9 @@ public class TestSeekBeforeWithInlineBlocks { @Test public void testMultiIndexLevelRandomHFileWithBlooms() throws IOException { conf = TEST_UTIL.getConfiguration(); - + TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#"); + // Try out different HFile versions to ensure reverse scan works on each version for (int hfileVersion = HFile.MIN_FORMAT_VERSION_WITH_TAGS; hfileVersion <= HFile.MAX_FORMAT_VERSION; hfileVersion++) { @@ -97,6 +100,8 @@ public class TestSeekBeforeWithInlineBlocks { conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, indexBlockSize); conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); + conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + conf.set(BloomFilterUtil.DELIMITER_KEY, "#"); Cell[] cells = new Cell[NUM_KV]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java index bcba1f9e99..7504ce92d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.io.BytesWritable; /** @@ -64,6 +65,7 @@ public class CreateRandomStoreFile { private static final String VALUE_SIZE_OPTION = "v"; private static final String COMPRESSION_OPTION = "c"; private static final String BLOOM_FILTER_OPTION = "bf"; + private static final String BLOOM_FILTER_PARAM_OPTION = "bfp"; private static final String BLOCK_SIZE_OPTION = "bs"; private static final String BLOOM_BLOCK_SIZE_OPTION = "bfbs"; private static final String INDEX_BLOCK_SIZE_OPTION = "ibs"; @@ -103,6 +105,8 @@ public class CreateRandomStoreFile { options.addOption(BLOOM_FILTER_OPTION, "bloom_filter", true, "Bloom filter type, one of " + Arrays.toString(BloomType.values())); + options.addOption(BLOOM_FILTER_PARAM_OPTION, "bloom_param", true, + "the parameter of the bloom filter"); options.addOption(BLOCK_SIZE_OPTION, "block_size", true, "HFile block size"); options.addOption(BLOOM_BLOCK_SIZE_OPTION, "bloom_block_size", true, @@ -169,6 +173,25 @@ public class CreateRandomStoreFile { BLOOM_FILTER_OPTION)); } + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + if (!cmdLine.hasOption(BLOOM_FILTER_PARAM_OPTION)) { + LOG.error("the parameter of bloom filter is not specified"); + return false; + } else { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, + cmdLine.getOptionValue(BLOOM_FILTER_PARAM_OPTION)); + } + } + + if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + if (!cmdLine.hasOption(BLOOM_FILTER_PARAM_OPTION)) { + LOG.error("the parameter of bloom filter is not specified"); + return false; + } else { + conf.set(BloomFilterUtil.DELIMITER_KEY, cmdLine.getOptionValue(BLOOM_FILTER_PARAM_OPTION)); + } + } + int blockSize = HConstants.DEFAULT_BLOCKSIZE; if (cmdLine.hasOption(BLOCK_SIZE_OPTION)) blockSize = Integer.valueOf(cmdLine.getOptionValue(BLOCK_SIZE_OPTION)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java index 5a491723bd..f11025d2f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.runners.Parameterized.Parameter; @@ -136,6 +137,8 @@ public abstract class TestMultiColumnScanner { @Test public void testMultiColumnScanner() throws IOException { + TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#"); HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME, new HColumnDescriptor(FAMILY) .setCompressionType(comprAlgo) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java new file mode 100644 index 0000000000..bffed430ff --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test TestRowPrefixBloomFilter + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestRowPrefixBloomFilter { + + private static final Logger LOG = LoggerFactory.getLogger(TestRowPrefixBloomFilter.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + private static final ChecksumType CKTYPE = ChecksumType.CRC32C; + private static final int CKBYTES = 512; + private boolean localfs = false; + private static Configuration conf; + private static FileSystem fs; + private static Path testDir; + private static final int BLOCKSIZE_SMALL = 8192; + private static final float err = (float) 0.01; + private static final int prefixLength = 10; + private static final String delimiter = "#"; + private static final String invalidFormatter = "%08d"; + private static final String prefixFormatter = "%010d"; + private static final String suffixFormatter = "%010d"; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err); + conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true); + conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, prefixLength); + conf.set(BloomFilterUtil.DELIMITER_KEY, delimiter); + + localfs = + (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0); + + if (fs == null) { + fs = FileSystem.get(conf); + } + try { + if (localfs) { + testDir = TEST_UTIL.getDataTestDir("TestRowPrefixBloomFilter"); + if (fs.exists(testDir)) { + fs.delete(testDir, true); + } + } else { + testDir = FSUtils.getRootDir(conf); + } + } catch (Exception e) { + LOG.error("error during setup", e); + throw e; + } + } + + @After + public void tearDown() throws Exception { + try { + if (localfs) { + if (fs.exists(testDir)) { + fs.delete(testDir, true); + } + } + } catch (Exception e) { + LOG.error("error during tear down", e); + } + } + + private static StoreFileScanner getStoreFileScanner(StoreFile.Reader reader) { + return reader.getStoreFileScanner(false, false, false, 0, 0, false); + } + + private void writeStoreFile(final Path f, BloomType bt, int expKeys, int prefixRowCount, + int suffixRowCount) throws IOException { + HFileContext meta = new HFileContextBuilder() + .withBlockSize(BLOCKSIZE_SMALL) + .withChecksumType(CKTYPE) + .withBytesPerCheckSum(CKBYTES) + .build(); + // Make a store file and write data to it. + StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs) + .withFilePath(f) + .withBloomType(bt) + .withMaxKeyCount(expKeys) + .withFileContext(meta) + .build(); + long now = EnvironmentEdgeManager.currentTime(); + try { + //Put with valid row style + for (int i = 0; i < prefixRowCount; i += 2) { // prefix rows + String prefixRow = String.format(prefixFormatter, i); + for (int j = 0; j < suffixRowCount; j++) { // suffix rows + String row = prefixRow + "#" + String.format(suffixFormatter, j); + KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"), + Bytes.toBytes("col"), now, Bytes.toBytes("value")); + writer.append(kv); + } + } + + //Put with invalid row style + for (int i = prefixRowCount; i < prefixRowCount*2; i += 2) { // prefix rows + String row = String.format(invalidFormatter, i); + KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"), + Bytes.toBytes("col"), now, Bytes.toBytes("value")); + writer.append(kv); + } + } finally { + writer.close(); + } + } + + @Test + public void testRowPrefixBloomFilter() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + float expErr = 2*prefixRowCount*suffixRowCount*err; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + // read the file + StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + //check basic param + assertEquals(bt[x], reader.getBloomFilterType()); + if (bt[x] == BloomType.ROWPREFIX_FIXED_LENGTH) { + assertEquals(prefixLength, reader.getPrefixLength()); + assertEquals("null", Bytes.toStringBinary(reader.getDelimiter())); + } else if (bt[x] == BloomType.ROWPREFIX_DELIMITED){ + assertEquals(-1, reader.getPrefixLength()); + assertEquals(delimiter, Bytes.toStringBinary(reader.getDelimiter())); + } + assertEquals(expKeys, reader.getGeneralBloomFilter().getKeyCount()); + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getFamily()).thenReturn(new HColumnDescriptor("family")); + // check false positives rate + int falsePos = 0; + int falseNeg = 0; + for (int i = 0; i < prefixRowCount; i++) { // prefix rows + String prefixRow = String.format(prefixFormatter, i); + for (int j = 0; j < suffixRowCount; j++) { // suffix rows + String startRow = prefixRow + "#" + String.format(suffixFormatter, j); + String stopRow = prefixRow + "#" + String.format(suffixFormatter, j+1); + Scan scan = new Scan().setStartRow(Bytes.toBytes(startRow)) + .setStopRow(Bytes.toBytes(stopRow)); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + boolean shouldPrefixRowExist = i % 2 == 0; + if (shouldPrefixRowExist) { + if (!exists) { + falseNeg++; + } + } else { + if (exists) { + falsePos++; + } + } + } + } + + for (int i = prefixRowCount; i < prefixRowCount * 2; i++) { // prefix rows + String row = String.format(invalidFormatter, i); + Scan scan = new Scan(new Get(Bytes.toBytes(row))); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + boolean shouldPrefixRowExist = i % 2 == 0; + if (shouldPrefixRowExist) { + if (!exists) { + falseNeg++; + } + } else { + if (exists) { + falsePos++; + } + } + } + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + assertEquals("False negatives: " + falseNeg, 0, falseNeg); + int maxFalsePos = (int) (2 * expErr); + assertTrue("Too many false positives: " + falsePos + + " (err=" + err + ", expected no more than " + maxFalsePos + ")", + falsePos <= maxFalsePos); + } + } + + @Test + public void testRowPrefixBloomFilterWithGet() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getFamily()).thenReturn(new HColumnDescriptor("family")); + + //Get with valid row style + //prefix row in bloom + String prefixRow = String.format(prefixFormatter, prefixRowCount-2); + String row = prefixRow + "#" + String.format(suffixFormatter, 0); + Scan scan = new Scan(new Get(Bytes.toBytes(row))); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // prefix row not in bloom + prefixRow = String.format(prefixFormatter, prefixRowCount-1); + row = prefixRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + // Get with invalid row style + // ROWPREFIX: the length of row is less than prefixLength + // ROWPREFIX_DELIMITED: Row does not contain delimiter + // row in bloom + row = String.format(invalidFormatter, prefixRowCount+2); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // row not in bloom + row = String.format(invalidFormatter, prefixRowCount+1); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + } + } + + @Test + public void testRowPrefixBloomFilterWithScan() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX_FIXED_LENGTH, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + StoreFile.Reader reader = new StoreFile.Reader(fs, f, cacheConf, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getFamily()).thenReturn(new HColumnDescriptor("family")); + + //Scan with valid row style. startRow and stopRow have a common prefix. + //And the length of the common prefix is no less than prefixLength. + //prefix row in bloom + String prefixRow = String.format(prefixFormatter, prefixRowCount-2); + String startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + String stopRow = prefixRow + "#" + String.format(suffixFormatter, 1); + Scan scan = new Scan().setStartRow(Bytes.toBytes(startRow)) + .setStopRow(Bytes.toBytes(stopRow)); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // prefix row not in bloom + prefixRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixRow + "#" + String.format(suffixFormatter, 1); + scan = new Scan().setStartRow(Bytes.toBytes(startRow)) + .setStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + // There is no common prefix between startRow and stopRow. + prefixRow = String.format(prefixFormatter, prefixRowCount-2); + startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().setStartRow(Bytes.toBytes(startRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + if (bt[x] == BloomType.ROWPREFIX_FIXED_LENGTH) { + // startRow and stopRow have a common prefix. + // But the length of the common prefix is less than prefixLength. + String prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + String prefixStopRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixStartRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().setStartRow(Bytes.toBytes(startRow)) + .setStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + }else if (bt[x] == BloomType.ROWPREFIX_DELIMITED) { + // startRow does not contain delimiter + String prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + String prefixStopRow = String.format(prefixFormatter, prefixRowCount-2); + startRow = prefixStartRow + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().setStartRow(Bytes.toBytes(startRow)) + .setStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // startRow contains delimiter, but stopRow does not have the same prefix as startRow. + prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + prefixStopRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixStartRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().setStartRow(Bytes.toBytes(startRow)) + .setStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + } + + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java index 7682024a32..fe2ceb1a54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; @@ -98,6 +99,8 @@ public class TestScanWithBloomError { public void setUp() throws IOException{ conf = TEST_UTIL.getConfiguration(); fs = FileSystem.get(conf); + conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + conf.set(BloomFilterUtil.DELIMITER_KEY, "#"); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java index 9de0b774b6..03cab69ab8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; @@ -135,6 +136,8 @@ public class TestSeekOptimizations { public void setUp() { rand = new Random(91238123L); expectedKVs.clear(); + TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); + TEST_UTIL.getConfiguration().set(BloomFilterUtil.DELIMITER_KEY, "#"); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 8506d440dd..3b8b9410c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -127,6 +127,7 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_VERBOSE = "verbose"; public static final String OPT_BLOOM = "bloom"; + public static final String OPT_BLOOM_PARAM = "bloom_param"; public static final String OPT_COMPRESSION = "compression"; public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; @@ -327,6 +328,7 @@ public class LoadTestTool extends AbstractHBaseTool { addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); + addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type"); addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE); addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + @@ -522,6 +524,22 @@ public class LoadTestTool extends AbstractHBaseTool { bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr); + if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) { + if (!cmd.hasOption(OPT_BLOOM_PARAM)) { + LOG.error("the parameter of bloom filter " + bloomType.name() + " is not specified"); + } else { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); + } + } + + if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + if (!cmd.hasOption(OPT_BLOOM_PARAM)) { + LOG.error("the parameter of bloom filter " + bloomType.name() + " is not specified"); + } else { + conf.set(BloomFilterUtil.DELIMITER_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); + } + } + inMemoryCF = cmd.hasOption(OPT_INMEMORY); if (cmd.hasOption(OPT_ENCRYPTION)) { cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); -- 2.20.1