From 957197d802166bbc6b78a673a3ddb0ea743129a5 Mon Sep 17 00:00:00 2001 From: Guangxu Cheng Date: Thu, 24 May 2018 19:08:10 +0800 Subject: [PATCH] HBASE-20636 Introduce two bloom filter type : ROWPREFIX and ROWPREFIX_DELIMITED --- .../hadoop/hbase/regionserver/BloomType.java | 10 +- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 51 +++ .../org/apache/hadoop/hbase/util/LoadTestTool.java | 18 + .../hbase/io/hfile/CompoundBloomFilterWriter.java | 6 +- .../org/apache/hadoop/hbase/master/HMaster.java | 14 + .../hadoop/hbase/regionserver/HStoreFile.java | 3 + .../hadoop/hbase/regionserver/StoreFileReader.java | 120 ++++++- .../hadoop/hbase/regionserver/StoreFileWriter.java | 45 ++- .../apache/hadoop/hbase/util/BloomFilterChunk.java | 6 +- .../apache/hadoop/hbase/util/BloomFilterUtil.java | 50 ++- .../hadoop/hbase/util/RowPrefixBloomContext.java | 59 +++ .../hbase/util/RowPrefixDelimiterBloomContext.java | 62 ++++ .../hbase/regionserver/CreateRandomStoreFile.java | 24 ++ .../hadoop/hbase/regionserver/TestHStoreFile.java | 2 +- .../regionserver/TestRowPrefixBloomFilter.java | 399 +++++++++++++++++++++ 15 files changed, 834 insertions(+), 35 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixBloomContext.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java index 7787842325..b98c929812 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/regionserver/BloomType.java @@ -34,5 +34,13 @@ public enum BloomType { /** * Bloom enabled with Table row & column (family+qualifier) as Key */ - ROWCOL + ROWCOL, + /** + * Bloom enabled with Table row prefix as Key, specify the length of the prefix + */ + ROWPREFIX, + /** + * Bloom enabled with Table row prefix as Key, specify the delimiter of the prefix + */ + ROWPREFIX_DELIMITED } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index a403455d6d..f327623fba 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -149,6 +150,8 @@ public class HFileOutputFormat2 "hbase.hfileoutputformat.families.compression"; static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; + static final String BLOOM_PARAM_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.bloomparam"; static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = @@ -216,6 +219,7 @@ public class HFileOutputFormat2 // create a map from column family to the compression algorithm final Map compressionMap = createFamilyCompressionMap(conf); final Map bloomTypeMap = createFamilyBloomTypeMap(conf); + final Map bloomParamMap = createFamilyBloomParamMap(conf); final Map blockSizeMap = createFamilyBlockSizeMap(conf); String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); @@ -399,6 +403,12 @@ public class HFileOutputFormat2 compression = compression == null ? defaultCompression : compression; BloomType bloomType = bloomTypeMap.get(tableAndFamily); bloomType = bloomType == null ? BloomType.NONE : bloomType; + String bloomParam = bloomParamMap.get(tableAndFamily); + if (bloomType == BloomType.ROWPREFIX) { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, bloomParam); + } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + conf.set(BloomFilterUtil.DELIMITER_KEY, bloomParam); + } Integer blockSize = blockSizeMap.get(tableAndFamily); blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; DataBlockEncoding encoding = overriddenEncoding; @@ -667,6 +677,8 @@ public class HFileOutputFormat2 tableDescriptors)); conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, tableDescriptors)); + conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomParamDetails, + tableDescriptors)); conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); @@ -694,6 +706,8 @@ public class HFileOutputFormat2 serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); + conf.set(BLOOM_PARAM_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(bloomParamDetails, singleTableDescriptor)); conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); @@ -742,6 +756,19 @@ public class HFileOutputFormat2 } /** + * Runs inside the task to deserialize column family to bloom filter param + * map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to the the configured bloom filter param + */ + @VisibleForTesting + static Map createFamilyBloomParamMap(Configuration conf) { + return createFamilyConfValueMap(conf, BLOOM_PARAM_FAMILIES_CONF_KEY); + } + + + /** * Runs inside the task to deserialize column family to block size * map from the configuration. * @@ -909,6 +936,30 @@ public class HFileOutputFormat2 }; /** + * Serialize column family to bloom type map to configuration. Invoked while + * configuring the MR job for incremental load. + * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into + * + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static Function bloomParamDetails = familyDescriptor -> { + BloomType bloomType = familyDescriptor.getBloomFilterType(); + String bloomParam = null; + if (bloomType == BloomType.ROWPREFIX) { + bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.PREFIX_LENGTH_KEY); + } else if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + bloomParam = familyDescriptor.getConfigurationValue(BloomFilterUtil.DELIMITER_KEY); + } + return bloomParam; + }; + + /** * Serialize column family to data block encoding map to configuration. * Invoked while configuring the MR job for incremental load. * diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 0f7c5c0046..50085e8ce0 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -114,6 +114,7 @@ public class LoadTestTool extends AbstractHBaseTool { "one of " + Arrays.toString(Compression.Algorithm.values()); public static final String OPT_BLOOM = "bloom"; + public static final String OPT_BLOOM_PARAM = "bloom_param"; public static final String OPT_COMPRESSION = "compression"; public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; @@ -324,6 +325,7 @@ public class LoadTestTool extends AbstractHBaseTool { addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); + addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type"); addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE); addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + @@ -544,6 +546,22 @@ public class LoadTestTool extends AbstractHBaseTool { bloomType = bloomStr == null ? BloomType.ROW : BloomType.valueOf(bloomStr); + if (bloomType == BloomType.ROWPREFIX) { + if (!cmd.hasOption(OPT_BLOOM_PARAM)) { + LOG.error("the parameter of bloom filter is not specified"); + } else { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); + } + } + + if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + if (!cmd.hasOption(OPT_BLOOM_PARAM)) { + LOG.error("the parameter of bloom filter is not specified"); + } else { + conf.set(BloomFilterUtil.DELIMITER_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM)); + } + } + inMemoryCF = cmd.hasOption(OPT_INMEMORY); if (cmd.hasOption(OPT_ENCRYPTION)) { cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java index d63a824832..a4a805e8c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java @@ -174,12 +174,12 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase + Bytes.toStringBinary(firstKeyInChunk)); } // This will be done only once per chunk - if (bloomType == BloomType.ROW) { - firstKeyInChunk = CellUtil.copyRow(cell); - } else { + if (bloomType == BloomType.ROWCOL) { firstKeyInChunk = PrivateCellUtil .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell)); + } else { + firstKeyInChunk = CellUtil.copyRow(cell); } allocateNewChunk(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 6c41b8eb72..97335612d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; +import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -179,6 +180,7 @@ import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EncryptionTest; @@ -1920,6 +1922,8 @@ public class HMaster extends HRegionServer implements MasterServices { // check replication scope checkReplicationScope(hcd); + // check bloom filter type + checkBloomFilterType(hcd); // check data replication factor, it can be 0(default value) when user has not explicitly // set the value, in this case we use default replication factor set in the file system. @@ -2002,6 +2006,16 @@ public class HMaster extends HRegionServer implements MasterServices { } } + private static void checkBloomFilterType(ColumnFamilyDescriptor cfd) + throws IOException { + Configuration conf = new CompoundConfiguration().addStringMap(cfd.getConfiguration()); + try { + BloomFilterUtil.getBloomFilterParam(cfd.getBloomFilterType(), conf); + } catch (IllegalArgumentException e) { + throw new DoNotRetryIOException("Failed to get bloom filter param", e); + } + } + // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, String message, Exception cause) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 1950a72105..a4e5f0bac2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -80,6 +80,9 @@ public class HStoreFile implements StoreFile { /** Bloom filter Type in FileInfo */ public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); + /** Bloom filter param in FileInfo */ + public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM"); + /** Delete Family Count in FileInfo */ public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index db7b4f92fa..a0272bb507 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; import static org.apache.hadoop.hbase.regionserver.HStoreFile.LAST_BLOOM_KEY; @@ -74,6 +75,8 @@ public class StoreFileReader { private boolean bulkLoadResult = false; private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; private boolean skipResetSeqId = true; + private int prefixLength = -1; + private byte[] delimiter = null; // Counter that is incremented every time a scanner is created on the // store file. It is decremented when the scan on the store file is @@ -115,6 +118,8 @@ public class StoreFileReader { this.bulkLoadResult = reader.bulkLoadResult; this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV; this.skipResetSeqId = reader.skipResetSeqId; + this.prefixLength = reader.prefixLength; + this.delimiter = reader.delimiter; } public boolean isPrimaryReplicaReader() { @@ -209,7 +214,7 @@ public class StoreFileReader { /** * Check if this storeFile may contain keys within the TimeRange that * have not expired (i.e. not older than oldestUnexpiredTS). - * @param timeRange the timeRange to restrict + * @param tr the timeRange to restrict * @param oldestUnexpiredTS the oldest timestamp that is not expired, as * determined by the column family's TTL * @return false if queried keys definitely don't exist in this StoreFile @@ -236,18 +241,18 @@ public class StoreFileReader { * False if the Bloom filter is applicable and the scan fails it. */ boolean passesBloomFilter(Scan scan, final SortedSet columns) { - // Multi-column non-get scans will use Bloom filters through the - // lower-level API function that this function calls. - if (!scan.isGetScan()) { - return true; - } - byte[] row = scan.getStartRow(); switch (this.bloomFilterType) { case ROW: + if (!scan.isGetScan()) { + return true; + } return passesGeneralRowBloomFilter(row, 0, row.length); case ROWCOL: + if (!scan.isGetScan()) { + return true; + } if (columns != null && columns.size() == 1) { byte[] column = columns.first(); // create the required fake key @@ -258,7 +263,10 @@ public class StoreFileReader { // For multi-column queries the Bloom filter is checked from the // seekExact operation. return true; - + case ROWPREFIX: + return passesGeneralRowPrefixBloomFilter(scan); + case ROWPREFIX_DELIMITED: + return passesGeneralDelimitedRowPrefixBloomFilter(scan); default: return true; } @@ -299,7 +307,7 @@ public class StoreFileReader { * * @return True if passes */ - public boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) { + private boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) { BloomFilter bloomFilter = this.generalBloomFilter; if (bloomFilter == null) { return true; @@ -309,7 +317,7 @@ public class StoreFileReader { byte[] key = null; if (rowOffset != 0 || rowLen != row.length) { throw new AssertionError( - "For row-only Bloom filters the row " + "must occupy the whole array"); + "For row-only Bloom filters the row must occupy the whole array"); } key = row; return checkGeneralBloomFilter(key, null, bloomFilter); @@ -339,6 +347,76 @@ public class StoreFileReader { return checkGeneralBloomFilter(null, kvKey, bloomFilter); } + /** + * A method for checking Bloom filters. Called directly from + * StoreFileScanner in case of a multi-column query. + * + * @return True if passes + */ + private boolean passesGeneralRowPrefixBloomFilter(Scan scan) { + BloomFilter bloomFilter = this.generalBloomFilter; + if (bloomFilter == null) { + return true; + } + + byte[] row = scan.getStartRow(); + byte[] rowPrefix; + if (scan.isGetScan()) { + rowPrefix = Bytes.copy(row, 0, Math.min(prefixLength, row.length)); + } else { + // For non-get scans + // Find out the common prefix of startRow and stopRow. + int commonIndex = Bytes.findCommonPrefix(scan.getStartRow(), scan.getStopRow(), + scan.getStartRow().length, scan.getStopRow().length, 0, 0); + // startRow and stopRow don't have the common prefix. + // Or the common prefix length is less than prefixLength + if (commonIndex <= 0 || commonIndex < prefixLength) { + return true; + } + rowPrefix = Bytes.copy(row, 0, prefixLength); + } + return checkGeneralBloomFilter(rowPrefix, null, bloomFilter); + } + + /** + * A method for checking Bloom filters. Called directly from + * StoreFileScanner in case of a multi-column query. + * + * @return True if passes + */ + private boolean passesGeneralDelimitedRowPrefixBloomFilter(Scan scan) { + BloomFilter bloomFilter = this.generalBloomFilter; + if (bloomFilter == null) { + return true; + } + + byte[] row = scan.getStartRow(); + byte[] rowPrefix; + if (scan.isGetScan()) { + int rowPrefixLength = Bytes.indexOf(row, delimiter); + if (rowPrefixLength <= 0) { + rowPrefix = row; + } else { + rowPrefix = Bytes.copy(row, 0, rowPrefixLength); + } + } else { + // For non-get scans + // If startRow does not contain delimiter, return true directly. + int startRowPrefixLength = Bytes.indexOf(row, delimiter); + if (startRowPrefixLength <= 0) { + return true; + } + // If stopRow does not have the same prefix as startRow, return true directly. + int commonIndex = Bytes.findCommonPrefix(scan.getStartRow(), scan.getStopRow(), + startRowPrefixLength, scan.getStopRow().length, 0, 0); + if (commonIndex < startRowPrefixLength) { + return true; + } + rowPrefix = Bytes.copy(row, 0, startRowPrefixLength); + } + return checkGeneralBloomFilter(rowPrefix, null, bloomFilter); + } + private boolean checkGeneralBloomFilter(byte[] key, Cell kvKey, BloomFilter bloomFilter) { // Empty file if (reader.getTrailer().getEntryCount() == 0) { @@ -367,10 +445,10 @@ public class StoreFileReader { // hbase:meta does not have blooms. So we need not have special interpretation // of the hbase:meta cells. We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom if (keyIsAfterLast) { - if (bloomFilterType == BloomType.ROW) { - keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0); - } else { + if (bloomFilterType == BloomType.ROWCOL) { keyIsAfterLast = (CellComparator.getInstance().compare(kvKey, lastBloomKeyOnlyKV)) > 0; + } else { + keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0); } } @@ -446,6 +524,13 @@ public class StoreFileReader { bloomFilterType = BloomType.valueOf(Bytes.toString(b)); } + byte[] p = fi.get(BLOOM_FILTER_PARAM_KEY); + if (bloomFilterType == BloomType.ROWPREFIX) { + prefixLength = Bytes.toInt(p); + } else if (bloomFilterType == BloomType.ROWPREFIX_DELIMITED) { + delimiter = p; + } + lastBloomKey = fi.get(LAST_BLOOM_KEY); if(bloomFilterType == BloomType.ROWCOL) { lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(lastBloomKey, 0, lastBloomKey.length); @@ -638,4 +723,13 @@ public class StoreFileReader { void setSkipResetSeqId(boolean skipResetSeqId) { this.skipResetSeqId = skipResetSeqId; } + + + public int getPrefixLength() { + return prefixLength; + } + + public byte[] getDelimiter() { + return delimiter; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index 2782e6f5d0..af4ba2f9ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; @@ -44,11 +45,14 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.util.BloomContext; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.RowBloomContext; import org.apache.hadoop.hbase.util.RowColBloomContext; +import org.apache.hadoop.hbase.util.RowPrefixBloomContext; +import org.apache.hadoop.hbase.util.RowPrefixDelimiterBloomContext; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +69,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { private final BloomFilterWriter generalBloomFilterWriter; private final BloomFilterWriter deleteFamilyBloomFilterWriter; private final BloomType bloomType; + private byte[] bloomParam = null; private long earliestPutTs = HConstants.LATEST_TIMESTAMP; private long deleteFamilyCnt = 0; private BloomContext bloomContext = null; @@ -110,21 +115,32 @@ public class StoreFileWriter implements CellSink, ShipperListener { if (generalBloomFilterWriter != null) { this.bloomType = bloomType; + this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); if (LOG.isTraceEnabled()) { - LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", " + - generalBloomFilterWriter.getClass().getSimpleName()); + LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " + + (bloomType == BloomType.ROWPREFIX? + Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam)) + + ", " + generalBloomFilterWriter.getClass().getSimpleName()); } // init bloom context switch (bloomType) { - case ROW: - bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator); - break; - case ROWCOL: - bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator); - break; - default: - throw new IOException( - "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL expected)"); + case ROW: + bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator); + break; + case ROWCOL: + bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator); + break; + case ROWPREFIX: + bloomContext = new RowPrefixBloomContext(generalBloomFilterWriter, comparator, + Bytes.toInt(bloomParam)); + break; + case ROWPREFIX_DELIMITED: + bloomContext = new RowPrefixDelimiterBloomContext(generalBloomFilterWriter, comparator, + bloomParam); + break; + default: + throw new IOException("Invalid Bloom filter type: " + + bloomType + " (ROW or ROWCOL or ROWPREFIX or ROWPREFIX_DELIMITED expected)"); } } else { // Not using Bloom filters. @@ -206,9 +222,11 @@ public class StoreFileWriter implements CellSink, ShipperListener { * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp * - * 2 Types of Filtering: + * 4 Types of Filtering: * 1. Row = Row * 2. RowCol = Row + Qualifier + * 3. RowPrefix = Row Prefix + * 4. RowPrefixDelimiter = Delimited Row Prefix */ bloomContext.writeBloom(cell); } @@ -280,6 +298,9 @@ public class StoreFileWriter implements CellSink, ShipperListener { if (hasGeneralBloom) { writer.addGeneralBloomFilter(generalBloomFilterWriter); writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); + if (bloomParam != null) { + writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); + } bloomContext.addLastBloomKey(writer); } return hasGeneralBloom; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java index d8fc3fb3c9..06cf699e34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java @@ -187,12 +187,12 @@ public class BloomFilterChunk implements BloomFilterBase { int hash1; int hash2; HashKey hashKey; - if (this.bloomType == BloomType.ROW) { - hashKey = new RowBloomHashKey(cell); + if (this.bloomType == BloomType.ROWCOL) { + hashKey = new RowColBloomHashKey(cell); hash1 = this.hash.hash(hashKey, 0); hash2 = this.hash.hash(hashKey, hash1); } else { - hashKey = new RowColBloomHashKey(cell); + hashKey = new RowBloomHashKey(cell); hash1 = this.hash.hash(hashKey, 0); hash2 = this.hash.hash(hashKey, hash1); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java index b1b3bccdcb..e3a317b261 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.util; import java.text.NumberFormat; import java.util.Random; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.regionserver.BloomType; @@ -44,6 +46,9 @@ public final class BloomFilterUtil { * estimate the ideal false positive rate. */ private static Random randomGeneratorForTest; + + public static final String PREFIX_LENGTH_KEY = "RowPrefixBloomFilter.prefix_length"; + public static final String DELIMITER_KEY = "RowPrefixDelimitedBloomFilter.delimiter"; /** Bit-value lookup array to prevent doing the same work over and over */ public static final byte [] bitvals = { @@ -232,8 +237,8 @@ public final class BloomFilterUtil { public static boolean contains(Cell cell, ByteBuff bloomBuf, int bloomOffset, int bloomSize, Hash hash, int hashCount, BloomType type) { - HashKey hashKey = type == BloomType.ROW ? new RowBloomHashKey(cell) - : new RowColBloomHashKey(cell); + HashKey hashKey = type == BloomType.ROWCOL ? new RowColBloomHashKey(cell) + : new RowBloomHashKey(cell); return contains(bloomBuf, bloomOffset, bloomSize, hash, hashCount, hashKey); } @@ -277,4 +282,45 @@ public final class BloomFilterUtil { return formatStats(bloomFilter) + STATS_RECORD_SEP + "Actual error rate: " + String.format("%.8f", bloomFilter.actualErrorRate()); } + + public static byte[] getBloomFilterParam(BloomType bloomFilterType, Configuration conf) + throws IllegalArgumentException{ + byte[] bloomParam = null; + String message = "Bloom filter type is " + bloomFilterType + ", "; + switch (bloomFilterType) { + case ROWPREFIX: + String prefixLengthString = conf.get(PREFIX_LENGTH_KEY); + if (prefixLengthString == null) { + message += PREFIX_LENGTH_KEY + " not specified."; + throw new IllegalArgumentException(message); + } + int prefixLength; + try { + prefixLength = Integer.parseInt(prefixLengthString); + if (prefixLength <= 0 || prefixLength > HConstants.MAX_ROW_LENGTH) { + message += "the value of " + PREFIX_LENGTH_KEY + + " must >=0 and < " + HConstants.MAX_ROW_LENGTH; + throw new IllegalArgumentException(message); + } + } catch (NumberFormatException nfe) { + message = "Number format exception when parsing " + PREFIX_LENGTH_KEY + " for BloomType " + + bloomFilterType.toString() + ":" + + prefixLengthString; + throw new IllegalArgumentException(message, nfe); + } + bloomParam = Bytes.toBytes(prefixLength); + break; + case ROWPREFIX_DELIMITED: + String delimiterString = conf.get(DELIMITER_KEY); + if (delimiterString == null || delimiterString.length() == 0) { + message += DELIMITER_KEY + " not specified."; + throw new IllegalArgumentException(message); + } + bloomParam = Bytes.toBytes(delimiterString); + break; + default: + break; + } + return bloomParam; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixBloomContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixBloomContext.java new file mode 100644 index 0000000000..677ca9ea79 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixBloomContext.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; + +/** + * Handles ROWPREFIX bloom related context. + * It works with both ByteBufferedCell and byte[] backed cells + */ +@InterfaceAudience.Private +public class RowPrefixBloomContext extends RowBloomContext { + private final int prefixLength; + + public RowPrefixBloomContext(BloomFilterWriter bloomFilterWriter, + CellComparator comparator, int prefixLength) { + super(bloomFilterWriter, comparator); + this.prefixLength = prefixLength; + } + + public void writeBloom(Cell cell) throws IOException { + super.writeBloom(getRowPrefixCell(cell)); + } + + /** + * @param cell the cell + * @return the new cell created by row prefix + */ + private Cell getRowPrefixCell(Cell cell) { + byte[] row = CellUtil.copyRow(cell); + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(row, 0, Math.min(prefixLength, row.length)) + .setType(Cell.Type.Put) + .build(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java new file mode 100644 index 0000000000..81287c3304 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowPrefixDelimiterBloomContext.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; + +/** + * Handles ROWPREFIX_DELIMITED bloom related context. + * It works with both ByteBufferedCell and byte[] backed cells + */ +@InterfaceAudience.Private +public class RowPrefixDelimiterBloomContext extends RowBloomContext { + private final byte[] delimiter; + + public RowPrefixDelimiterBloomContext(BloomFilterWriter bloomFilterWriter, + CellComparator comparator, byte[] delimiter) { + super(bloomFilterWriter, comparator); + this.delimiter = delimiter; + } + + public void writeBloom(Cell cell) throws IOException { + super.writeBloom(getDelimitedRowPrefixCell(cell)); + } + + /** + * @param cell the new cell + * @return the new cell created by delimited row prefix + */ + private Cell getDelimitedRowPrefixCell(Cell cell) { + byte[] row = CellUtil.copyRow(cell); + int prefixLength = Bytes.indexOf(row, delimiter); + if (prefixLength <= 0) { + return cell; + } + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(row, 0, Math.min(prefixLength, row.length)) + .setType(Cell.Type.Put) + .build(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java index aa3acc4346..e3c7a75d79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Random; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,6 +35,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,7 @@ public class CreateRandomStoreFile { private static final String VALUE_SIZE_OPTION = "v"; private static final String COMPRESSION_OPTION = "c"; private static final String BLOOM_FILTER_OPTION = "bf"; + private static final String BLOOM_FILTER_PARAM_OPTION = "bfp"; private static final String BLOCK_SIZE_OPTION = "bs"; private static final String BLOOM_BLOCK_SIZE_OPTION = "bfbs"; private static final String INDEX_BLOCK_SIZE_OPTION = "ibs"; @@ -103,6 +107,8 @@ public class CreateRandomStoreFile { options.addOption(BLOOM_FILTER_OPTION, "bloom_filter", true, "Bloom filter type, one of " + Arrays.toString(BloomType.values())); + options.addOption(BLOOM_FILTER_PARAM_OPTION, "bloom_param", true, + "the parameter of the bloom filter"); options.addOption(BLOCK_SIZE_OPTION, "block_size", true, "HFile block size"); options.addOption(BLOOM_BLOCK_SIZE_OPTION, "bloom_block_size", true, @@ -169,6 +175,24 @@ public class CreateRandomStoreFile { BLOOM_FILTER_OPTION)); } + if (bloomType == BloomType.ROWPREFIX) { + if (!cmdLine.hasOption(BLOOM_FILTER_PARAM_OPTION)) { + LOG.error("the parameter of bloom filter is not specified"); + return false; + } else { + conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmdLine.getOptionValue(BLOOM_FILTER_PARAM_OPTION)); + } + } + + if (bloomType == BloomType.ROWPREFIX_DELIMITED) { + if (!cmdLine.hasOption(BLOOM_FILTER_PARAM_OPTION)) { + LOG.error("the parameter of bloom filter is not specified"); + return false; + } else { + conf.set(BloomFilterUtil.DELIMITER_KEY, cmdLine.getOptionValue(BLOOM_FILTER_PARAM_OPTION)); + } + } + int blockSize = HConstants.DEFAULT_BLOCKSIZE; if (cmdLine.hasOption(BLOCK_SIZE_OPTION)) blockSize = Integer.valueOf(cmdLine.getOptionValue(BLOCK_SIZE_OPTION)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index 72da1a3f91..d0e187fe5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -718,7 +718,7 @@ public class TestHStoreFile extends HBaseTestCase { reader.loadFileInfo(); reader.loadBloomfilter(); StoreFileScanner scanner = getStoreFileScanner(reader, false, false); - assertEquals(expKeys[x], reader.generalBloomFilter.getKeyCount()); + assertEquals(expKeys[x], reader.getGeneralBloomFilter().getKeyCount()); HStore store = mock(HStore.class); when(store.getColumnFamilyDescriptor()) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java new file mode 100644 index 0000000000..6a35783778 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.BloomFilterUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test TestRowPrefixBloomFilter + */ +@Category({RegionServerTests.class, SmallTests.class}) +public class TestRowPrefixBloomFilter { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowPrefixBloomFilter.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestRowPrefixBloomFilter.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + private static final ChecksumType CKTYPE = ChecksumType.CRC32C; + private static final int CKBYTES = 512; + private boolean localfs = false; + private static Configuration conf; + private static FileSystem fs; + private static Path testDir; + private static final int BLOCKSIZE_SMALL = 8192; + private static final float err = (float) 0.01; + private static final int prefixLength = 10; + private static final String delimiter = "#"; + private static final String invalidFormatter = "%08d"; + private static final String prefixFormatter = "%010d"; + private static final String suffixFormatter = "%010d"; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err); + conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true); + conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, prefixLength); + conf.set(BloomFilterUtil.DELIMITER_KEY, delimiter); + + localfs = + (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0); + + if (fs == null) { + fs = FileSystem.get(conf); + } + try { + if (localfs) { + testDir = TEST_UTIL.getDataTestDir("TestRowPrefixBloomFilter"); + if (fs.exists(testDir)) { + fs.delete(testDir, true); + } + } else { + testDir = FSUtils.getRootDir(conf); + } + } catch (Exception e) { + LOG.error(HBaseMarkers.FATAL, "error during setup", e); + throw e; + } + } + + @After + public void tearDown() throws Exception { + try { + if (localfs) { + if (fs.exists(testDir)) { + fs.delete(testDir, true); + } + } + } catch (Exception e) { + LOG.error(HBaseMarkers.FATAL, "error during tear down", e); + } + } + + private static StoreFileScanner getStoreFileScanner(StoreFileReader reader) { + return reader.getStoreFileScanner(false, false, false, 0, 0, false); + } + + private void writeStoreFile(final Path f, BloomType bt, int expKeys, int prefixRowCount, + int suffixRowCount) throws IOException { + HFileContext meta = new HFileContextBuilder() + .withBlockSize(BLOCKSIZE_SMALL) + .withChecksumType(CKTYPE) + .withBytesPerCheckSum(CKBYTES) + .build(); + // Make a store file and write data to it. + StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withFilePath(f) + .withBloomType(bt) + .withMaxKeyCount(expKeys) + .withFileContext(meta) + .build(); + long now = System.currentTimeMillis(); + try { + //Put with valid row style + for (int i = 0; i < prefixRowCount; i += 2) { // prefix rows + String prefixRow = String.format(prefixFormatter, i); + for (int j = 0; j < suffixRowCount; j++) { // suffix rows + String row = prefixRow + "#" + String.format(suffixFormatter, j); + KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"), + Bytes.toBytes("col"), now, Bytes.toBytes("value")); + writer.append(kv); + } + } + + //Put with invalid row style + for (int i = prefixRowCount; i < prefixRowCount*2; i += 2) { // prefix rows + String row = String.format(invalidFormatter, i); + KeyValue kv = new KeyValue(Bytes.toBytes(row), Bytes.toBytes("family"), + Bytes.toBytes("col"), now, Bytes.toBytes("value")); + writer.append(kv); + } + } finally { + writer.close(); + } + } + + @Test + public void testRowPrefixBloomFilter() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + float expErr = 2*prefixRowCount*suffixRowCount*err; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + // read the file + StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true, + new AtomicInteger(0), true, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + //check basic param + assertEquals(bt[x], reader.getBloomFilterType()); + if (bt[x] == BloomType.ROWPREFIX) { + assertEquals(prefixLength, reader.getPrefixLength()); + assertEquals("null", Bytes.toStringBinary(reader.getDelimiter())); + } else if (bt[x] == BloomType.ROWPREFIX_DELIMITED){ + assertEquals(-1, reader.getPrefixLength()); + assertEquals(delimiter, Bytes.toStringBinary(reader.getDelimiter())); + } + assertEquals(expKeys, reader.getGeneralBloomFilter().getKeyCount()); + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getColumnFamilyDescriptor()) + .thenReturn(ColumnFamilyDescriptorBuilder.of("family")); + // check false positives rate + int falsePos = 0; + int falseNeg = 0; + for (int i = 0; i < prefixRowCount; i++) { // prefix rows + String prefixRow = String.format(prefixFormatter, i); + for (int j = 0; j < suffixRowCount; j++) { // suffix rows + String startRow = prefixRow + "#" + String.format(suffixFormatter, j); + String stopRow = prefixRow + "#" + String.format(suffixFormatter, j+1); + Scan scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + boolean shouldPrefixRowExist = i % 2 == 0; + if (shouldPrefixRowExist) { + if (!exists) { + falseNeg++; + } + } else { + if (exists) { + falsePos++; + } + } + } + } + + for (int i = prefixRowCount; i < prefixRowCount * 2; i++) { // prefix rows + String row = String.format(invalidFormatter, i); + Scan scan = new Scan(new Get(Bytes.toBytes(row))); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + boolean shouldPrefixRowExist = i % 2 == 0; + if (shouldPrefixRowExist) { + if (!exists) { + falseNeg++; + } + } else { + if (exists) { + falsePos++; + } + } + } + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + assertEquals("False negatives: " + falseNeg, 0, falseNeg); + int maxFalsePos = (int) (2 * expErr); + assertTrue("Too many false positives: " + falsePos + + " (err=" + err + ", expected no more than " + maxFalsePos + ")", + falsePos <= maxFalsePos); + } + } + + @Test + public void testRowPrefixBloomFilterWithGet() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true, + new AtomicInteger(0), true, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getColumnFamilyDescriptor()) + .thenReturn(ColumnFamilyDescriptorBuilder.of("family")); + + //Get with valid row style + //prefix row in bloom + String prefixRow = String.format(prefixFormatter, prefixRowCount-2); + String row = prefixRow + "#" + String.format(suffixFormatter, 0); + Scan scan = new Scan(new Get(Bytes.toBytes(row))); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // prefix row not in bloom + prefixRow = String.format(prefixFormatter, prefixRowCount-1); + row = prefixRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + // Get with invalid row style + // ROWPREFIX: the length of row is less than prefixLength + // ROWPREFIX_DELIMITED: Row does not contain delimiter + // row in bloom + row = String.format(invalidFormatter, prefixRowCount+2); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // row not in bloom + row = String.format(invalidFormatter, prefixRowCount+1); + scan = new Scan(new Get(Bytes.toBytes(row))); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + } + } + + @Test + public void testRowPrefixBloomFilterWithScan() throws Exception { + FileSystem fs = FileSystem.getLocal(conf); + BloomType[] bt = {BloomType.ROWPREFIX, BloomType.ROWPREFIX_DELIMITED}; + int prefixRowCount = 50; + int suffixRowCount = 10; + int expKeys = 50; + for (int x : new int[]{0,1}) { + // write the file + Path f = new Path(testDir, name.getMethodName()); + writeStoreFile(f, bt[x], expKeys, prefixRowCount, suffixRowCount); + + StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, true, + new AtomicInteger(0), true, conf); + reader.loadFileInfo(); + reader.loadBloomfilter(); + + StoreFileScanner scanner = getStoreFileScanner(reader); + HStore store = mock(HStore.class); + when(store.getColumnFamilyDescriptor()) + .thenReturn(ColumnFamilyDescriptorBuilder.of("family")); + + //Scan with valid row style. startRow and stopRow have a common prefix. + //And the length of the common prefix is no less than prefixLength. + //prefix row in bloom + String prefixRow = String.format(prefixFormatter, prefixRowCount-2); + String startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + String stopRow = prefixRow + "#" + String.format(suffixFormatter, 1); + Scan scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + boolean exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // prefix row not in bloom + prefixRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixRow + "#" + String.format(suffixFormatter, 1); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertFalse(exists); + + // There is no common prefix between startRow and stopRow. + prefixRow = String.format(prefixFormatter, prefixRowCount-2); + startRow = prefixRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + if (bt[x] == BloomType.ROWPREFIX) { + // startRow and stopRow have a common prefix. + // But the length of the common prefix is less than prefixLength. + String prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + String prefixStopRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixStartRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + }else if (bt[x] == BloomType.ROWPREFIX_DELIMITED) { + // startRow does not contain delimiter + String prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + String prefixStopRow = String.format(prefixFormatter, prefixRowCount-2); + startRow = prefixStartRow + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + + // startRow contains delimiter, but stopRow does not have the same prefix as startRow. + prefixStartRow = String.format(prefixFormatter, prefixRowCount-2); + prefixStopRow = String.format(prefixFormatter, prefixRowCount-1); + startRow = prefixStartRow + "#" + String.format(suffixFormatter, 0); + stopRow = prefixStopRow + "#" + String.format(suffixFormatter, 0); + scan = new Scan().withStartRow(Bytes.toBytes(startRow)) + .withStopRow(Bytes.toBytes(stopRow)); + exists = scanner.shouldUseScanner(scan, store, Long.MIN_VALUE); + assertTrue(exists); + } + + reader.close(true); // evict because we are about to delete the file + fs.delete(f, true); + } + } +} -- 2.13.0