From 0859d8c47dea22d7731f542198eac5195ed45b82 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 14 Apr 2017 17:11:44 +0800 Subject: [PATCH] HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction --- .../hadoop/hbase/io/FSDataInputStreamWrapper.java | 38 +++-- .../java/org/apache/hadoop/hbase/io/FileLink.java | 14 +- .../hadoop/hbase/io/HalfStoreFileReader.java | 13 +- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 9 +- .../hbase/mapreduce/LoadIncrementalHFiles.java | 16 +- .../procedure/SplitTableRegionProcedure.java | 2 +- .../java/org/apache/hadoop/hbase/mob/MobFile.java | 4 +- .../java/org/apache/hadoop/hbase/mob/MobUtils.java | 6 +- .../mob/compactions/PartitionedMobCompactor.java | 5 +- .../regionserver/DefaultStoreFileManager.java | 2 +- .../hadoop/hbase/regionserver/HMobStore.java | 2 +- .../hbase/regionserver/HRegionFileSystem.java | 6 +- .../apache/hadoop/hbase/regionserver/HStore.java | 6 +- .../hadoop/hbase/regionserver/StoreFile.java | 107 ++++++------ .../hadoop/hbase/regionserver/StoreFileInfo.java | 21 +-- .../hadoop/hbase/regionserver/StoreFileReader.java | 89 +++++----- .../hbase/regionserver/StoreFileScanner.java | 44 +++-- .../hbase/regionserver/compactions/Compactor.java | 44 +---- .../hadoop/hbase/io/TestHalfStoreFileReader.java | 190 +++++++++------------ .../hbase/mob/compactions/TestMobCompactor.java | 7 +- .../compactions/TestPartitionedMobCompactor.java | 7 +- .../hbase/regionserver/DataBlockEncodingTool.java | 4 +- .../regionserver/EncodedSeekPerformanceTest.java | 8 +- .../hadoop/hbase/regionserver/MockStoreFile.java | 23 ++- .../regionserver/TestCacheOnWriteInSchema.java | 3 +- .../hbase/regionserver/TestCompactionPolicy.java | 3 - .../regionserver/TestCompoundBloomFilter.java | 3 +- .../hbase/regionserver/TestFSErrorsExposed.java | 4 +- .../hbase/regionserver/TestMobStoreCompaction.java | 3 +- .../hadoop/hbase/regionserver/TestStoreFile.java | 77 ++++++--- .../TestStoreFileScannerWithTagCompression.java | 10 +- .../regionserver/compactions/TestCompactor.java | 3 - .../compactions/TestStripeCompactionPolicy.java | 3 - 33 files changed, 410 insertions(+), 366 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index b06be6b..2bb9fac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -36,6 +36,8 @@ public class FSDataInputStreamWrapper { private final Path path; private final FileLink link; private final boolean doCloseStreams; + private final boolean dropBehind; + private final long readahead; /** Two stream handles, one with and one without FS-level checksum. * HDFS checksum setting is on FS level, not single read level, so you have to keep two @@ -75,43 +77,52 @@ public class FSDataInputStreamWrapper { private volatile int hbaseChecksumOffCount = -1; public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { - this(fs, null, path, false); + this(fs, path, false, -1L); } - public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException { - this(fs, null, path, dropBehind); + public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException { + this(fs, null, path, dropBehind, readahead); } - public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException { - this(fs, link, null, false); - } public FSDataInputStreamWrapper(FileSystem fs, FileLink link, - boolean dropBehind) throws IOException { - this(fs, link, null, dropBehind); + boolean dropBehind, long readahead) throws IOException { + this(fs, link, null, dropBehind, readahead); } - private FSDataInputStreamWrapper(FileSystem fs, FileLink link, - Path path, boolean dropBehind) throws IOException { + private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, + long readahead) throws IOException { assert (path == null) != (link == null); this.path = path; this.link = link; this.doCloseStreams = true; + this.dropBehind = dropBehind; + this.readahead = readahead; // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem // that wraps over the specified fs. In this case, we will not be able to avoid // checksumming inside the filesystem. - this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs); + this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); // Initially we are going to read the tail block. Open the reader w/FS checksum. this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; this.stream = (link != null) ? link.open(hfs) : hfs.open(path); + setStreamOptions(stream); + } + + private void setStreamOptions(FSDataInputStream in) { try { this.stream.setDropBehind(dropBehind); } catch (Exception e) { // Skipped. } + if (readahead >= 0) { + try { + this.stream.setReadahead(readahead); + } catch (Exception e) { + // Skipped. + } + } } - /** * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any * reads finish and before any other reads start (what happens in reality is we read the @@ -127,6 +138,7 @@ public class FSDataInputStreamWrapper { if (useHBaseChecksum) { FileSystem fsNc = hfs.getNoChecksumFs(); this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); + setStreamOptions(streamNoFsChecksum); this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; // Close the checksum stream; we will reopen it if we get an HBase checksum failure. this.stream.close(); @@ -150,6 +162,8 @@ public class FSDataInputStreamWrapper { link = null; hfs = null; useHBaseChecksumConfigured = useHBaseChecksum = false; + dropBehind = false; + readahead = 0; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java index ca0dfbc..8a79efb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java @@ -29,6 +29,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.CanSetDropBehind; +import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; @@ -99,7 +101,7 @@ public class FileLink { * and the alternative locations, when the file is moved. */ private static class FileLinkInputStream extends InputStream - implements Seekable, PositionedReadable { + implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead { private FSDataInputStream in = null; private Path currentPath = null; private long pos = 0; @@ -306,6 +308,16 @@ public class FileLink { } throw new FileNotFoundException("Unable to open link: " + fileLink); } + + @Override + public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException { + in.setReadahead(readahead); + } + + @Override + public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException { + in.setDropBehind(dropCache); + } } private Path[] locations = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index a4a281e..c4dbc39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -72,10 +73,10 @@ public class HalfStoreFileReader extends StoreFileReader { * @param conf Configuration * @throws IOException */ - public HalfStoreFileReader(final FileSystem fs, final Path p, - final CacheConfig cacheConf, final Reference r, final Configuration conf) + public HalfStoreFileReader(FileSystem fs, Path p, CacheConfig cacheConf, Reference r, + boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf) throws IOException { - super(fs, p, cacheConf, conf); + super(fs, p, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf); // This is not actual midkey for this half-file; its just border // around which we split top and bottom. Have to look in files to find // actual last and first keys for bottom and top halves. Half-files don't @@ -99,9 +100,9 @@ public class HalfStoreFileReader extends StoreFileReader { * @throws IOException */ public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, - long size, final CacheConfig cacheConf, final Reference r, final Configuration conf) - throws IOException { - super(fs, p, in, size, cacheConf, conf); + long size, final CacheConfig cacheConf, final Reference r, boolean isPrimaryReplicaStoreFile, + AtomicInteger refCount, boolean shared, final Configuration conf) throws IOException { + super(fs, p, in, size, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf); // This is not actual midkey for this half-file; its just border // around which we split top and bottom. Have to look in files to find // actual last and first keys for bottom and top halves. Half-files don't diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 4db60b5..791445b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -283,11 +283,10 @@ public class CacheConfig { } /** - * Create a block cache configuration with the specified cache and - * configuration parameters. + * Create a block cache configuration with the specified cache and configuration parameters. * @param blockCache reference to block cache, null if completely disabled * @param cacheDataOnRead whether DATA blocks should be cached on read (we always cache INDEX - * blocks and BLOOM blocks; this cannot be disabled). + * blocks and BLOOM blocks; this cannot be disabled). * @param inMemory whether blocks should be flagged as in-memory * @param cacheDataOnWrite whether data blocks should be cached on write * @param cacheIndexesOnWrite whether index blocks should be cached on write @@ -296,7 +295,9 @@ public class CacheConfig { * @param cacheDataCompressed whether to store blocks as compressed in the cache * @param prefetchOnOpen whether to prefetch blocks upon open * @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families - * data blocks up in the L1 tier. + * data blocks up in the L1 tier. + * @param dropBehindCompaction indicate that we should set drop behind to true when open a store + * file reader for compaction */ CacheConfig(final BlockCache blockCache, final boolean cacheDataOnRead, final boolean inMemory, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 19daeed..9e0b847 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.mapreduce; import static java.lang.String.format; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -27,7 +32,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; @@ -63,9 +67,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClientServiceCallable; @@ -99,10 +100,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Tool to load the output of HFileOutputFormat into an existing table. */ @@ -1105,7 +1102,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { HalfStoreFileReader halfReader = null; StoreFileWriter halfWriter = null; try { - halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf); + halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true, + new AtomicInteger(0), true, conf); Map fileInfo = halfReader.loadFileInfo(); int blocksize = familyDescriptor.getBlocksize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java index 3cd6c66..7cdb11c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java @@ -285,7 +285,7 @@ public class SplitTableRegionProcedure @Override protected SplitTableRegionState getState(final int stateId) { - return SplitTableRegionState.valueOf(stateId); + return SplitTableRegionState.forNumber(stateId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java index cd4c079..d16c1da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java @@ -118,9 +118,7 @@ public class MobFile { * @throws IOException */ public void open() throws IOException { - if (sf.getReader() == null) { - sf.createReader(); - } + sf.initReader(true); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index eb75120..a688817 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -697,7 +697,7 @@ public final class MobUtils { return null; } Path dstPath = new Path(targetPath, sourceFile.getName()); - validateMobFile(conf, fs, sourceFile, cacheConfig); + validateMobFile(conf, fs, sourceFile, cacheConfig, true); String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; LOG.info(msg); Path parent = dstPath.getParent(); @@ -718,11 +718,11 @@ public final class MobUtils { * @param cacheConfig The current cache config. */ private static void validateMobFile(Configuration conf, FileSystem fs, Path path, - CacheConfig cacheConfig) throws IOException { + CacheConfig cacheConfig, boolean isPrimaryReplicaStoreFile) throws IOException { StoreFile storeFile = null; try { storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE); - storeFile.createReader(); + storeFile.initReader(isPrimaryReplicaStoreFile); } catch (IOException e) { LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e); throw e; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 987fe51..aee8d2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -343,7 +343,7 @@ public class PartitionedMobCompactor extends MobCompactor { StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); // pre-create reader of a del file to avoid race condition when opening the reader in each // partition. - sf.createReader(); + sf.initReader(true); delPartition.addStoreFile(sf); totalDelFileCount++; } @@ -893,7 +893,8 @@ public class PartitionedMobCompactor extends MobCompactor { for (StoreFile sf : storeFiles) { // the readers will be closed later after the merge. maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); - byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); + sf.initReader(true); + byte[] count = sf.getReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); if (count != null) { maxKeyCount += Bytes.toLong(count); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index c37ae99..da25df5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -135,7 +135,7 @@ class DefaultStoreFileManager implements StoreFileManager { this.compactedfiles = sortCompactedfiles(updatedCompactedfiles); } - // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised + // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized // Let a background thread close the actual reader on these compacted files and also // ensure to evict the blocks from block cache so that they are no longer in // cache diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index b021430..cc0b7df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -294,7 +294,7 @@ public class HMobStore extends HStore { try { storeFile = new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE); - storeFile.createReader(); + storeFile.initReader(isPrimaryReplicaStore()); } catch (IOException e) { LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); throw e; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 144f43b..15d0228 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -294,7 +294,7 @@ public class HRegionFileSystem { */ Path getStoreFilePath(final String familyName, final String fileName) { Path familyDir = getStoreDir(familyName); - return new Path(familyDir, fileName).makeQualified(this.fs); + return new Path(familyDir, fileName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); } /** @@ -675,9 +675,7 @@ public class HRegionFileSystem { if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) { // Check whether the split row lies in the range of the store file // If it is outside the range, return directly. - if (f.getReader() == null) { - f.createReader(); - } + f.initReader(true); try { if (top) { //check if larger than last key. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index a98f89e..a7b0b7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -655,8 +655,7 @@ public class HStore implements Store { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, this.family.getBloomFilterType()); - StoreFileReader r = storeFile.createReader(); - r.setReplicaStoreFile(isPrimaryReplicaStore()); + storeFile.initReader(isPrimaryReplicaStore()); return storeFile; } @@ -2456,8 +2455,9 @@ public class HStore implements Store { LOG.debug("The file " + file + " was closed but still not archived."); } filesToRemove.add(file); + continue; } - if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) { + if (file.isCompactedAway() && !file.isReferencedInReads()) { // Even if deleting fails we need not bother as any new scanners won't be // able to use the compacted file as the status is already compactedAway if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 7aef05e..c42dfed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,6 +65,10 @@ import org.apache.hadoop.hbase.util.Bytes; public class StoreFile { private static final Log LOG = LogFactory.getLog(StoreFile.class.getName()); + public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead"; + + private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false; + // Keys for fileinfo values in HFile /** Max Sequence ID in FileInfo */ @@ -103,6 +108,16 @@ public class StoreFile { // Block cache configuration and reference. private final CacheConfig cacheConf; + // Counter that is incremented every time a scanner is created on the + // store file. It is decremented when the scan on the store file is + // done. + private final AtomicInteger refCount = new AtomicInteger(0); + + private final boolean noReadahead; + + // Indicates if the file got compacted + private volatile boolean compactedAway = false; + // Keys for metadata stored in backing HFile. // Set when we obtain a Reader. private long sequenceid = -1; @@ -116,7 +131,7 @@ public class StoreFile { private Cell lastKey; - private Comparator comparator; + private Comparator comparator; CacheConfig getCacheConf() { return cacheConf; @@ -130,7 +145,7 @@ public class StoreFile { return lastKey; } - public Comparator getComparator() { + public Comparator getComparator() { return comparator; } @@ -218,7 +233,8 @@ public class StoreFile { this.fs = fs; this.fileInfo = fileInfo; this.cacheConf = cacheConf; - + this.noReadahead = + conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD); if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { this.cfBloomType = cfBloomType; } else { @@ -229,25 +245,6 @@ public class StoreFile { } /** - * Clone - * @param other The StoreFile to clone from - */ - public StoreFile(final StoreFile other) { - this.fs = other.fs; - this.fileInfo = other.fileInfo; - this.cacheConf = other.cacheConf; - this.cfBloomType = other.cfBloomType; - this.metadataMap = other.metadataMap; - } - - /** - * Clone a StoreFile for opening private reader. - */ - public StoreFile cloneForReader() { - return new StoreFile(this); - } - - /** * @return the StoreFile object associated to this StoreFile. * null if the StoreFile is not a reference. */ @@ -266,7 +263,7 @@ public class StoreFile { * @return Returns the qualified path of this StoreFile */ public Path getQualifiedPath() { - return this.fileInfo.getPath().makeQualified(fs); + return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory()); } /** @@ -376,15 +373,19 @@ public class StoreFile { @VisibleForTesting public boolean isCompactedAway() { - if (this.reader != null) { - return this.reader.isCompactedAway(); - } - return true; + return compactedAway; } @VisibleForTesting public int getRefCount() { - return this.reader.getRefCount().get(); + return refCount.get(); + } + + /** + * @return true if the file is still used in reads + */ + public boolean isReferencedInReads() { + return refCount.get() != 0; } /** @@ -404,18 +405,18 @@ public class StoreFile { } /** - * Opens reader on this store file. Called by Constructor. - * @return Reader for the store file. + * Opens reader on this store file. Called by Constructor. * @throws IOException * @see #closeReader(boolean) */ - private StoreFileReader open(boolean canUseDropBehind) throws IOException { + private void open(boolean isPrimaryReplicaStoreFile) throws IOException { if (this.reader != null) { throw new IllegalAccessError("Already open"); } // Open the StoreFile.Reader - this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind); + this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L, + isPrimaryReplicaStoreFile, refCount, true); // Load up indices and fileinfo. This also loads Bloom filter type. metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); @@ -513,33 +514,41 @@ public class StoreFile { firstKey = reader.getFirstKey(); lastKey = reader.getLastKey(); comparator = reader.getComparator(); - return this.reader; - } - - public StoreFileReader createReader() throws IOException { - return createReader(false); } /** - * @return Reader for StoreFile. creates if necessary - * @throws IOException + * Initialize the reader used for pread. */ - public StoreFileReader createReader(boolean canUseDropBehind) throws IOException { - if (this.reader == null) { + public void initReader(boolean isPrimaryReplicaStoreFile) throws IOException { + if (reader == null) { try { - this.reader = open(canUseDropBehind); - } catch (IOException e) { + open(isPrimaryReplicaStoreFile); + } catch (Exception e) { try { - boolean evictOnClose = - cacheConf != null? cacheConf.shouldEvictOnClose(): true; + boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; this.closeReader(evictOnClose); } catch (IOException ee) { + LOG.warn("failed to close reader", ee); } throw e; } - } - return this.reader; + } + + private StoreFileReader createStreamReader(boolean isPrimaryReplicaStoreFile, + boolean canUseDropBehind) throws IOException { + initReader(isPrimaryReplicaStoreFile); + StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L, + isPrimaryReplicaStoreFile, refCount, false); + reader.copyFields(this.reader); + return reader; + } + + public StoreFileScanner getStreamScanner(boolean isPrimaryReplicaStoreFile, + boolean canUseDropBehind, boolean cacheBlocks, boolean pread, boolean isCompaction, + long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) throws IOException { + return createStreamReader(isPrimaryReplicaStoreFile, canUseDropBehind).getStoreFileScanner( + cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); } /** @@ -566,9 +575,7 @@ public class StoreFile { * Marks the status of the file as compactedAway. */ public void markCompactedAway() { - if (this.reader != null) { - this.reader.markCompactedAway(); - } + this.compactedAway = true; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 3c12045..c4754a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -21,17 +21,18 @@ package org.apache.hadoop.hbase.regionserver; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; @@ -233,25 +234,24 @@ public class StoreFileInfo { * @param cacheConf The cache configuration and block cache reference. * @return The StoreFile.Reader for the file */ - public StoreFileReader open(final FileSystem fs, - final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException { + public StoreFileReader open(FileSystem fs, CacheConfig cacheConf, boolean canUseDropBehind, + long readahead, boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared) + throws IOException { FSDataInputStreamWrapper in; FileStatus status; final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction(); if (this.link != null) { // HFileLink - in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind); + in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind, readahead); status = this.link.getFileStatus(fs); } else if (this.reference != null) { // HFile Reference Path referencePath = getReferredToFile(this.getPath()); - in = new FSDataInputStreamWrapper(fs, referencePath, - doDropBehind); + in = new FSDataInputStreamWrapper(fs, referencePath, doDropBehind, readahead); status = fs.getFileStatus(referencePath); } else { - in = new FSDataInputStreamWrapper(fs, this.getPath(), - doDropBehind); + in = new FSDataInputStreamWrapper(fs, this.getPath(), doDropBehind, readahead); status = fs.getFileStatus(initialPath); } long length = status.getLen(); @@ -265,9 +265,10 @@ public class StoreFileInfo { if (reader == null) { if (this.reference != null) { reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference, - conf); + isPrimaryReplicaStoreFile, refCount, shared, conf); } else { - reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf, conf); + reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf, + isPrimaryReplicaStoreFile, refCount, shared, conf); } } if (this.coprocessorHost != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index 8f01a93..c1603f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; + import java.io.DataInput; import java.io.IOException; import java.util.Map; @@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; @@ -68,36 +69,50 @@ public class StoreFileReader { private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; private boolean skipResetSeqId = true; - public AtomicInteger getRefCount() { - return refCount; - } - // Counter that is incremented every time a scanner is created on the - // store file. It is decremented when the scan on the store file is - // done. - private AtomicInteger refCount = new AtomicInteger(0); - // Indicates if the file got compacted - private volatile boolean compactedAway = false; + // store file. It is decremented when the scan on the store file is + // done. All StoreFileReader for the same StoreFile will share this counter. + private final AtomicInteger refCount; - public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) - throws IOException { - reader = HFile.createReader(fs, path, cacheConf, conf); + // indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will + // close the internal reader when readCompleted is called. + private final boolean shared; + + private StoreFileReader(HFile.Reader reader, boolean isPrimaryReplicaStoreFile, + AtomicInteger refCount, boolean shared) { + this.reader = reader; + reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile); bloomFilterType = BloomType.NONE; + this.refCount = refCount; + this.shared = shared; } - void markCompactedAway() { - this.compactedAway = true; + public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf, + boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf) + throws IOException { + this(HFile.createReader(fs, path, cacheConf, conf), isPrimaryReplicaStoreFile, refCount, + shared); } public StoreFileReader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size, - CacheConfig cacheConf, Configuration conf) throws IOException { - reader = HFile.createReader(fs, path, in, size, cacheConf, conf); - bloomFilterType = BloomType.NONE; + CacheConfig cacheConf, boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, + boolean shared, Configuration conf) throws IOException { + this(HFile.createReader(fs, path, in, size, cacheConf, conf), isPrimaryReplicaStoreFile, + refCount, shared); } - public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) { - reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile); + void copyFields(StoreFileReader reader) { + this.generalBloomFilter = reader.generalBloomFilter; + this.deleteFamilyBloomFilter = reader.deleteFamilyBloomFilter; + this.bloomFilterType = reader.bloomFilterType; + this.sequenceID = reader.sequenceID; + this.timeRange = reader.timeRange; + this.lastBloomKey = reader.lastBloomKey; + this.bulkLoadResult = reader.bulkLoadResult; + this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV; + this.skipResetSeqId = reader.skipResetSeqId; } + public boolean isPrimaryReplicaReader() { return reader.isPrimaryReplicaReader(); } @@ -105,8 +120,11 @@ public class StoreFileReader { /** * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS */ + @VisibleForTesting StoreFileReader() { + this.refCount = new AtomicInteger(0); this.reader = null; + this.shared = false; } public CellComparator getComparator() { @@ -128,30 +146,23 @@ public class StoreFileReader { boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { // Increment the ref count refCount.incrementAndGet(); - return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, - reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); + return new StoreFileScanner(this, reader.getScanner(cacheBlocks, pread, isCompaction), + !isCompaction, reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); } /** - * Decrement the ref count associated with the reader when ever a scanner associated - * with the reader is closed + * Indicate that the scanner has finished reading with this reader. We need to decrement the ref + * count, and also, if this is not the common pread reader, we should close it. */ - void decrementRefCount() { + void readCompleted() { refCount.decrementAndGet(); - } - - /** - * @return true if the file is still used in reads - */ - public boolean isReferencedInReads() { - return refCount.get() != 0; - } - - /** - * @return true if the file is compacted - */ - public boolean isCompactedAway() { - return this.compactedAway; + if (!shared) { + try { + reader.close(false); + } catch (IOException e) { + LOG.warn("failed to close stream reader", e); + } + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index ab6b0ef..1c52abe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; @@ -126,18 +124,44 @@ public class StoreFileScanner implements KeyValueScanner { boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException { List scanners = new ArrayList<>(files.size()); - List sorted_files = new ArrayList<>(files); - Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID); - for (int i = 0; i < sorted_files.size(); i++) { - StoreFileReader r = sorted_files.get(i).createReader(canUseDrop); - r.setReplicaStoreFile(isPrimaryReplica); - StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt, - i, matcher != null ? !matcher.hasNullColumnInQuery() : false); + List sortedFiles = new ArrayList<>(files); + Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID); + for (int i = 0, n = sortedFiles.size(); i < n; i++) { + StoreFile sf = sortedFiles.get(i); + sf.initReader(isPrimaryReplica); + StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread, + isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false); scanners.add(scanner); } return scanners; } + /** + * Get scanners for compaction. We will create a separated reader for each store file to avoid + * contention with normal read request. + */ + public static List getScannersForCompaction(Collection files, + boolean canUseDropBehind, long readPt) throws IOException { + List scanners = new ArrayList<>(files.size()); + List sortedFiles = new ArrayList<>(files); + Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID); + boolean succ = false; + try { + for (int i = 0, n = sortedFiles.size(); i < n; i++) { + scanners.add(sortedFiles.get(i).getStreamScanner(true, canUseDropBehind, false, false, true, + readPt, i, false)); + } + succ = true; + } finally { + if (!succ) { + for (StoreFileScanner scanner : scanners) { + scanner.close(); + } + } + } + return scanners; + } + public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, @@ -262,7 +286,7 @@ public class StoreFileScanner implements KeyValueScanner { cur = null; this.hfs.close(); if (this.reader != null) { - this.reader.decrementRefCount(); + this.reader.readCompleted(); } closed = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index d72529a..0ba500a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import com.google.common.io.Closeables; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -59,8 +60,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; -import com.google.common.io.Closeables; - /** * A compactor is a compaction algorithm associated a given policy. Base class also contains * reusable parts for implementing compactors (what is common and what isn't is evolving). @@ -216,15 +215,9 @@ public abstract class Compactor { * @param filesToCompact Files. * @return Scanners. */ - protected List createFileScanners( - final Collection filesToCompact, - long smallestReadPoint, - boolean useDropBehind) throws IOException { - return StoreFileScanner.getScannersForStoreFiles(filesToCompact, - /* cache blocks = */ false, - /* use pread = */ false, - /* is compaction */ true, - /* use Drop Behind */ useDropBehind, + protected List createFileScanners(Collection filesToCompact, + long smallestReadPoint, boolean useDropBehind) throws IOException { + return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, smallestReadPoint); } @@ -281,8 +274,6 @@ public abstract class Compactor { // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); - List scanners; - Collection readersToClose; T writer = null; boolean dropCache; if (request.isMajor() || request.isAllFiles()) { @@ -291,22 +282,8 @@ public abstract class Compactor { dropCache = this.dropCacheMinor; } - if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { - // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, - // HFiles, and their readers - readersToClose = new ArrayList<>(request.getFiles().size()); - for (StoreFile f : request.getFiles()) { - StoreFile clonedStoreFile = f.cloneForReader(); - // create the reader after the store file is cloned in case - // the sequence id is used for sorting in scanners - clonedStoreFile.createReader(); - readersToClose.add(clonedStoreFile); - } - scanners = createFileScanners(readersToClose, smallestReadPoint, dropCache); - } else { - readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint, dropCache); - } + List scanners = + createFileScanners(request.getFiles(), smallestReadPoint, dropCache); InternalScanner scanner = null; boolean finished = false; try { @@ -336,13 +313,6 @@ public abstract class Compactor { } } finally { Closeables.close(scanner, true); - for (StoreFile f : readersToClose) { - try { - f.closeReader(true); - } catch (IOException e) { - LOG.warn("Exception closing " + f, e); - } - } if (!finished && writer != null) { abortWriter(writer); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java index 6a0921f..cadb3ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -49,7 +50,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({IOTests.class, SmallTests.class}) +@Category({ IOTests.class, SmallTests.class }) public class TestHalfStoreFileReader { private static HBaseTestingUtility TEST_UTIL; @@ -64,19 +65,14 @@ public class TestHalfStoreFileReader { } /** - * Test the scanner and reseek of a half hfile scanner. The scanner API - * demands that seekTo and reseekTo() only return < 0 if the key lies - * before the start of the file (with no position on the scanner). Returning - * 0 if perfect match (rare), and return > 1 if we got an imperfect match. - * - * The latter case being the most common, we should generally be returning 1, - * and if we do, there may or may not be a 'next' in the scanner/file. - * - * A bug in the half file scanner was returning -1 at the end of the bottom - * half, and that was causing the infrastructure above to go null causing NPEs - * and other problems. This test reproduces that failure, and also tests - * both the bottom and top of the file while we are at it. - * + * Test the scanner and reseek of a half hfile scanner. The scanner API demands that seekTo and + * reseekTo() only return < 0 if the key lies before the start of the file (with no position on + * the scanner). Returning 0 if perfect match (rare), and return > 1 if we got an imperfect match. + * The latter case being the most common, we should generally be returning 1, and if we do, there + * may or may not be a 'next' in the scanner/file. A bug in the half file scanner was returning -1 + * at the end of the bottom half, and that was causing the infrastructure above to go null causing + * NPEs and other problems. This test reproduces that failure, and also tests both the bottom and + * top of the file while we are at it. * @throws IOException */ @Test @@ -88,10 +84,8 @@ public class TestHalfStoreFileReader { FileSystem fs = FileSystem.get(conf); CacheConfig cacheConf = new CacheConfig(conf); HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build(); - HFile.Writer w = HFile.getWriterFactory(conf, cacheConf) - .withPath(fs, p) - .withFileContext(meta) - .create(); + HFile.Writer w = + HFile.getWriterFactory(conf, cacheConf).withPath(fs, p).withFileContext(meta).create(); // write some things. List items = genSomeKeys(); @@ -105,7 +99,7 @@ public class TestHalfStoreFileReader { Cell midKV = r.midkey(); byte[] midkey = CellUtil.cloneRow(midKV); - //System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey)); + // System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey)); Reference bottom = new Reference(midkey, Reference.Range.bottom); doTestOfScanAndReseek(p, fs, bottom, cacheConf); @@ -116,11 +110,10 @@ public class TestHalfStoreFileReader { r.close(); } - private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, - CacheConfig cacheConf) + private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf) throws IOException { - final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, - cacheConf, bottom, TEST_UTIL.getConfiguration()); + final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, cacheConf, bottom, true, + new AtomicInteger(0), true, TEST_UTIL.getConfiguration()); halfreader.loadFileInfo(); final HFileScanner scanner = halfreader.getScanner(false, false); @@ -128,110 +121,103 @@ public class TestHalfStoreFileReader { Cell curr; do { curr = scanner.getCell(); - KeyValue reseekKv = - getLastOnCol(curr); + KeyValue reseekKv = getLastOnCol(curr); int ret = scanner.reseekTo(reseekKv); assertTrue("reseek to returned: " + ret, ret > 0); - //System.out.println(curr + ": " + ret); + // System.out.println(curr + ": " + ret); } while (scanner.next()); int ret = scanner.reseekTo(getLastOnCol(curr)); - //System.out.println("Last reseek: " + ret); - assertTrue( ret > 0 ); + // System.out.println("Last reseek: " + ret); + assertTrue(ret > 0); halfreader.close(true); } - // Tests the scanner on an HFile that is backed by HalfStoreFiles @Test public void testHalfScanner() throws IOException { - String root_dir = TEST_UTIL.getDataTestDir().toString(); - Path p = new Path(root_dir, "test"); - Configuration conf = TEST_UTIL.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - CacheConfig cacheConf = new CacheConfig(conf); - HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build(); - HFile.Writer w = HFile.getWriterFactory(conf, cacheConf) - .withPath(fs, p) - .withFileContext(meta) - .create(); - - // write some things. - List items = genSomeKeys(); - for (KeyValue kv : items) { - w.append(kv); - } - w.close(); + String root_dir = TEST_UTIL.getDataTestDir().toString(); + Path p = new Path(root_dir, "test"); + Configuration conf = TEST_UTIL.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + CacheConfig cacheConf = new CacheConfig(conf); + HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build(); + HFile.Writer w = + HFile.getWriterFactory(conf, cacheConf).withPath(fs, p).withFileContext(meta).create(); + // write some things. + List items = genSomeKeys(); + for (KeyValue kv : items) { + w.append(kv); + } + w.close(); - HFile.Reader r = HFile.createReader(fs, p, cacheConf, conf); - r.loadFileInfo(); - Cell midKV = r.midkey(); - byte[] midkey = CellUtil.cloneRow(midKV); + HFile.Reader r = HFile.createReader(fs, p, cacheConf, conf); + r.loadFileInfo(); + Cell midKV = r.midkey(); + byte[] midkey = CellUtil.cloneRow(midKV); - Reference bottom = new Reference(midkey, Reference.Range.bottom); - Reference top = new Reference(midkey, Reference.Range.top); + Reference bottom = new Reference(midkey, Reference.Range.bottom); + Reference top = new Reference(midkey, Reference.Range.top); - // Ugly code to get the item before the midkey - KeyValue beforeMidKey = null; - for (KeyValue item : items) { - if (CellComparator.COMPARATOR.compare(item, midKV) >= 0) { - break; - } - beforeMidKey = item; + // Ugly code to get the item before the midkey + KeyValue beforeMidKey = null; + for (KeyValue item : items) { + if (CellComparator.COMPARATOR.compare(item, midKV) >= 0) { + break; } - System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey)); - System.out.println("beforeMidKey: " + beforeMidKey); - + beforeMidKey = item; + } + System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey)); + System.out.println("beforeMidKey: " + beforeMidKey); - // Seek on the splitKey, should be in top, not in bottom - Cell foundKeyValue = doTestOfSeekBefore(p, fs, bottom, midKV, cacheConf); - assertEquals(beforeMidKey, foundKeyValue); + // Seek on the splitKey, should be in top, not in bottom + Cell foundKeyValue = doTestOfSeekBefore(p, fs, bottom, midKV, cacheConf); + assertEquals(beforeMidKey, foundKeyValue); - // Seek tot the last thing should be the penultimate on the top, the one before the midkey on the bottom. - foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(items.size() - 1), cacheConf); - assertEquals(items.get(items.size() - 2), foundKeyValue); + // Seek tot the last thing should be the penultimate on the top, the one before the midkey on + // the bottom. + foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(items.size() - 1), cacheConf); + assertEquals(items.get(items.size() - 2), foundKeyValue); - foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(items.size() - 1), cacheConf); - assertEquals(beforeMidKey, foundKeyValue); + foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(items.size() - 1), cacheConf); + assertEquals(beforeMidKey, foundKeyValue); - // Try and seek before something that is in the bottom. - foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(0), cacheConf); - assertNull(foundKeyValue); + // Try and seek before something that is in the bottom. + foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(0), cacheConf); + assertNull(foundKeyValue); - // Try and seek before the first thing. - foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(0), cacheConf); - assertNull(foundKeyValue); + // Try and seek before the first thing. + foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(0), cacheConf); + assertNull(foundKeyValue); - // Try and seek before the second thing in the top and bottom. - foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(1), cacheConf); - assertNull(foundKeyValue); + // Try and seek before the second thing in the top and bottom. + foundKeyValue = doTestOfSeekBefore(p, fs, top, items.get(1), cacheConf); + assertNull(foundKeyValue); - foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(1), cacheConf); - assertEquals(items.get(0), foundKeyValue); + foundKeyValue = doTestOfSeekBefore(p, fs, bottom, items.get(1), cacheConf); + assertEquals(items.get(0), foundKeyValue); - // Try to seek before the splitKey in the top file - foundKeyValue = doTestOfSeekBefore(p, fs, top, midKV, cacheConf); - assertNull(foundKeyValue); - } + // Try to seek before the splitKey in the top file + foundKeyValue = doTestOfSeekBefore(p, fs, top, midKV, cacheConf); + assertNull(foundKeyValue); + } private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell seekBefore, - CacheConfig cacheConfig) - throws IOException { - final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, - cacheConfig, bottom, TEST_UTIL.getConfiguration()); - halfreader.loadFileInfo(); - final HFileScanner scanner = halfreader.getScanner(false, false); - scanner.seekBefore(seekBefore); - return scanner.getCell(); + CacheConfig cacheConfig) throws IOException { + final HalfStoreFileReader halfreader = new HalfStoreFileReader(fs, p, cacheConfig, bottom, true, + new AtomicInteger(0), true, TEST_UTIL.getConfiguration()); + halfreader.loadFileInfo(); + final HFileScanner scanner = halfreader.getScanner(false, false); + scanner.seekBefore(seekBefore); + return scanner.getCell(); } private KeyValue getLastOnCol(Cell curr) { - return KeyValueUtil.createLastOnRow( - curr.getRowArray(), curr.getRowOffset(), curr.getRowLength(), - curr.getFamilyArray(), curr.getFamilyOffset(), curr.getFamilyLength(), - curr.getQualifierArray(), curr.getQualifierOffset(), curr.getQualifierLength()); + return KeyValueUtil.createLastOnRow(curr.getRowArray(), curr.getRowOffset(), + curr.getRowLength(), curr.getFamilyArray(), curr.getFamilyOffset(), curr.getFamilyLength(), + curr.getQualifierArray(), curr.getQualifierOffset(), curr.getQualifierLength()); } static final int SIZE = 1000; @@ -244,18 +230,10 @@ public class TestHalfStoreFileReader { List ret = new ArrayList<>(SIZE); for (int i = 0; i < SIZE; i++) { KeyValue kv = - new KeyValue( - _b(String.format("row_%04d", i)), - _b("family"), - _b("qualifier"), - 1000, // timestamp + new KeyValue(_b(String.format("row_%04d", i)), _b("family"), _b("qualifier"), 1000, // timestamp _b("value")); ret.add(kv); } return ret; } - - - } - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java index 83936aa..fd176b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java @@ -772,9 +772,7 @@ public class TestMobCompactor { ResultScanner results = table.getScanner(scan); int count = 0; for (Result res : results) { - for (Cell cell : res.listCells()) { - count++; - } + count += res.size(); } results.close(); return count; @@ -818,7 +816,8 @@ public class TestMobCompactor { CacheConfig cacheConf = new CacheConfig(conf); StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), path, conf, cacheConf, BloomType.NONE); - HFile.Reader reader = sf.createReader().getHFileReader(); + sf.initReader(true); + HFile.Reader reader = sf.getReader().getHFileReader(); byte[] encryptionKey = reader.getTrailer().getEncryptionKey(); Assert.assertTrue(null != encryptionKey); Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java index 290e6f4..3d45421 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -518,7 +518,7 @@ public class TestPartitionedMobCompactor { StoreFile sf = new StoreFile(fs, newDelPath, conf, this.cacheConfig, BloomType.NONE); // pre-create reader of a del file to avoid race condition when opening the reader in each // partition. - sf.createReader(); + sf.initReader(true); delPartition.addStoreFile(sf); } } @@ -768,7 +768,6 @@ public class TestPartitionedMobCompactor { * @param delPartitions all del partitions */ private void compareDelFiles(List delPartitions) { - int i = 0; Map delMap = new HashMap<>(); for (CompactionDelPartition delPartition : delPartitions) { for (Path f : delPartition.listDelFiles()) { @@ -854,8 +853,8 @@ public class TestPartitionedMobCompactor { StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); sfs.add(sf); } - List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, - false, false, HConstants.LATEST_TIMESTAMP); + List scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs, + false, true, false, false, HConstants.LATEST_TIMESTAMP)); Scan scan = new Scan(); scan.setMaxVersions(hcd.getMaxVersions()); long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java index a074a9a..323ae11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java @@ -594,8 +594,8 @@ public class DataBlockEncodingTool { FileSystem fs = FileSystem.get(conf); StoreFile hsf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); - - StoreFileReader reader = hsf.createReader(); + hsf.initReader(true); + StoreFileReader reader = hsf.getReader(); reader.loadFileInfo(); KeyValueScanner scanner = reader.getStoreFileScanner(true, true, false, 0, 0, false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java index eb77c28..2eafe33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java @@ -61,8 +61,8 @@ public class EncodedSeekPerformanceTest { // read all of the key values StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(), path, configuration, cacheConf, BloomType.NONE); - - StoreFileReader reader = storeFile.createReader(); + storeFile.initReader(true); + StoreFileReader reader = storeFile.getReader(); StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false); Cell current; @@ -91,10 +91,10 @@ public class EncodedSeekPerformanceTest { // read all of the key values StoreFile storeFile = new StoreFile(testingUtility.getTestFileSystem(), path, configuration, cacheConf, BloomType.NONE); - + storeFile.initReader(true); long totalSize = 0; - StoreFileReader reader = storeFile.createReader(); + StoreFileReader reader = storeFile.getReader(); StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false); long startReadingTime = System.nanoTime(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java index 1169434..d2025e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java @@ -126,6 +126,11 @@ public class MockStoreFile extends StoreFile { } @Override + public boolean isCompactedAway() { + return compactedAway; + } + + @Override public long getModificationTimeStamp() { return modificationTime; } @@ -136,11 +141,22 @@ public class MockStoreFile extends StoreFile { } @Override + public void initReader(boolean isPrimaryReplicaStoreFile) throws IOException { + } + + @Override + public StoreFileScanner getStreamScanner(boolean isPrimaryReplicaStoreFile, + boolean canUseDropBehind, boolean cacheBlocks, boolean pread, boolean isCompaction, + long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) throws IOException { + return getReader().getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, scannerOrder, + canOptimizeForNonNullColumn); + } + + @Override public StoreFileReader getReader() { final long len = this.length; final TimeRangeTracker timeRangeTracker = this.timeRangeTracker; final long entries = this.entryCount; - final boolean compactedAway = this.compactedAway; return new StoreFileReader() { @Override public long length() { @@ -158,11 +174,6 @@ public class MockStoreFile extends StoreFile { } @Override - public boolean isCompactedAway() { - return compactedAway; - } - - @Override public void close(boolean evictOnClose) throws IOException { // no-op } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 9fed202..d56ab45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -220,7 +220,8 @@ public class TestCacheOnWriteInSchema { BlockCache cache = cacheConf.getBlockCache(); StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.ROWCOL); - HFile.Reader reader = sf.createReader().getHFileReader(); + sf.initReader(true); + HFile.Reader reader = sf.getReader().getHFileReader(); try { // Open a scanner with (on read) caching disabled HFileScanner scanner = reader.getScanner(false, false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java index 7154511..58dbe8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java @@ -40,15 +40,12 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.experimental.categories.Category; -@Category(SmallTests.class) public class TestCompactionPolicy { private final static Log LOG = LogFactory.getLog(TestCompactionPolicy.class); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java index dfea761..f02156f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java @@ -201,7 +201,8 @@ public class TestCompoundBloomFilter { private void readStoreFile(int t, BloomType bt, List kvs, Path sfPath) throws IOException { StoreFile sf = new StoreFile(fs, sfPath, conf, cacheConf, bt); - StoreFileReader r = sf.createReader(); + sf.initReader(true); + StoreFileReader r = sf.getReader(); final boolean pread = true; // does not really matter StoreFileScanner scanner = r.getStoreFileScanner(true, pread, false, 0, 0, false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 9f0975d..ecffed6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -96,8 +96,8 @@ public class TestFSErrorsExposed { StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, BloomType.NONE); - - StoreFileReader reader = sf.createReader(); + sf.initReader(true); + StoreFileReader reader = sf.getReader(); HFileScanner scanner = reader.getScanner(false, true); FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java index a50dc42..c8c5646 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java @@ -294,7 +294,8 @@ public class TestMobStoreCompaction { FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath); for (FileStatus file : files) { StoreFile sf = new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE); - Map fileInfo = sf.createReader().loadFileInfo(); + sf.initReader(true); + Map fileInfo = sf.getReader().loadFileInfo(); byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT); assertTrue(count != null); mobCellsCount += Bytes.toLong(count); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index 7e4ebd8..a06e19b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -27,6 +27,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -171,7 +172,8 @@ public class TestStoreFile extends HBaseTestCase { Path hsfPath = regionFs.commitStoreFile(TEST_FAMILY, writer.getPath()); StoreFile hsf = new StoreFile(this.fs, hsfPath, conf, cacheConf, BloomType.NONE); - StoreFileReader reader = hsf.createReader(); + hsf.initReader(true); + StoreFileReader reader = hsf.getReader(); // Split on a row, not in middle of row. Midkey returned by reader // may be in middle of row. Create new one with empty column and // timestamp. @@ -186,9 +188,10 @@ public class TestStoreFile extends HBaseTestCase { Path refPath = splitStoreFile(regionFs, splitHri, TEST_FAMILY, hsf, midRow, true); StoreFile refHsf = new StoreFile(this.fs, refPath, conf, cacheConf, BloomType.NONE); + refHsf.initReader(true); // Now confirm that I can read from the reference and that it only gets // keys from top half of the file. - HFileScanner s = refHsf.createReader().getScanner(false, false); + HFileScanner s = refHsf.getReader().getScanner(false, false); for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) { ByteBuffer bb = ByteBuffer.wrap(((KeyValue) s.getKey()).getKey()); kv = KeyValueUtil.createKeyValueFromKey(bb); @@ -245,10 +248,11 @@ public class TestStoreFile extends HBaseTestCase { StoreFile hsf = new StoreFile(this.fs, storeFileInfo, testConf, cacheConf, BloomType.NONE); assertTrue(storeFileInfo.isLink()); + hsf.initReader(true); // Now confirm that I can read from the link int count = 1; - HFileScanner s = hsf.createReader().getScanner(false, false); + HFileScanner s = hsf.getReader().getScanner(false, false); s.seekTo(); while (s.next()) { count++; @@ -296,7 +300,7 @@ public class TestStoreFile extends HBaseTestCase { HRegionInfo splitHriA = new HRegionInfo(hri.getTable(), null, SPLITKEY); HRegionInfo splitHriB = new HRegionInfo(hri.getTable(), SPLITKEY, null); StoreFile f = new StoreFile(fs, linkFilePath, testConf, cacheConf, BloomType.NONE); - f.createReader(); + f.initReader(true); Path pathA = splitStoreFile(cloneRegionFs, splitHriA, TEST_FAMILY, f, SPLITKEY, true); // top Path pathB = splitStoreFile(cloneRegionFs, splitHriB, TEST_FAMILY, f, SPLITKEY, false);// bottom f.closeReader(true); @@ -309,10 +313,11 @@ public class TestStoreFile extends HBaseTestCase { // Try to open store file from link StoreFile hsfA = new StoreFile(this.fs, pathA, testConf, cacheConf, BloomType.NONE); + hsfA.initReader(true); // Now confirm that I can read from the ref to link int count = 1; - HFileScanner s = hsfA.createReader().getScanner(false, false); + HFileScanner s = hsfA.getReader().getScanner(false, false); s.seekTo(); while (s.next()) { count++; @@ -322,9 +327,10 @@ public class TestStoreFile extends HBaseTestCase { // Try to open store file from link StoreFile hsfB = new StoreFile(this.fs, pathB, testConf, cacheConf, BloomType.NONE); + hsfB.initReader(true); // Now confirm that I can read from the ref to link - HFileScanner sB = hsfB.createReader().getScanner(false, false); + HFileScanner sB = hsfB.getReader().getScanner(false, false); sB.seekTo(); //count++ as seekTo() will advance the scanner @@ -339,7 +345,8 @@ public class TestStoreFile extends HBaseTestCase { private void checkHalfHFile(final HRegionFileSystem regionFs, final StoreFile f) throws IOException { - Cell midkey = f.createReader().midkey(); + f.initReader(true); + Cell midkey = f.getReader().midkey(); KeyValue midKV = (KeyValue)midkey; byte [] midRow = CellUtil.cloneRow(midKV); // Create top split. @@ -351,10 +358,12 @@ public class TestStoreFile extends HBaseTestCase { midRow, null); Path bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, midRow, false); // Make readers on top and bottom. - StoreFileReader top = new StoreFile( - this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader(); - StoreFileReader bottom = new StoreFile( - this.fs, bottomPath, conf, cacheConf, BloomType.NONE).createReader(); + StoreFile topF = new StoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE); + topF.initReader(true); + StoreFileReader top = topF.getReader(); + StoreFile bottomF = new StoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE); + bottomF.initReader(true); + StoreFileReader bottom = bottomF.getReader(); ByteBuffer previous = null; LOG.info("Midkey: " + midKV.toString()); ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midKV.getKey()); @@ -412,7 +421,9 @@ public class TestStoreFile extends HBaseTestCase { assertNull(bottomPath); - top = new StoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE).createReader(); + topF = new StoreFile(this.fs, topPath, conf, cacheConf, BloomType.NONE); + topF.initReader(true); + top = topF.getReader(); // Now read from the top. first = true; topScanner = top.getScanner(false, false); @@ -449,8 +460,10 @@ public class TestStoreFile extends HBaseTestCase { topPath = splitStoreFile(regionFs,topHri, TEST_FAMILY, f, badmidkey, true); bottomPath = splitStoreFile(regionFs, bottomHri, TEST_FAMILY, f, badmidkey, false); assertNull(topPath); - bottom = new StoreFile(this.fs, bottomPath, conf, cacheConf, - BloomType.NONE).createReader(); + + bottomF = new StoreFile(this.fs, bottomPath, conf, cacheConf, BloomType.NONE); + bottomF.initReader(true); + bottom = bottomF.getReader(); first = true; bottomScanner = bottom.getScanner(false, false); while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) || @@ -502,7 +515,8 @@ public class TestStoreFile extends HBaseTestCase { } writer.close(); - StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf); + StoreFileReader reader = + new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf); reader.loadFileInfo(); reader.loadBloomfilter(); StoreFileScanner scanner = getStoreFileScanner(reader, false, false); @@ -590,7 +604,8 @@ public class TestStoreFile extends HBaseTestCase { } writer.close(); - StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf); + StoreFileReader reader = + new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf); reader.loadFileInfo(); reader.loadBloomfilter(); @@ -635,7 +650,8 @@ public class TestStoreFile extends HBaseTestCase { writeStoreFile(writer); writer.close(); - StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf); + StoreFileReader reader = + new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf); // Now do reseek with empty KV to position to the beginning of the file @@ -695,7 +711,8 @@ public class TestStoreFile extends HBaseTestCase { } writer.close(); - StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf); + StoreFileReader reader = + new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf); reader.loadFileInfo(); reader.loadBloomfilter(); StoreFileScanner scanner = getStoreFileScanner(reader, false, false); @@ -849,7 +866,8 @@ public class TestStoreFile extends HBaseTestCase { HColumnDescriptor hcd = mock(HColumnDescriptor.class); when(hcd.getName()).thenReturn(family); when(store.getFamily()).thenReturn(hcd); - StoreFileReader reader = hsf.createReader(); + hsf.initReader(true); + StoreFileReader reader = hsf.getReader(); StoreFileScanner scanner = getStoreFileScanner(reader, false, false); TreeSet columns = new TreeSet<>(Bytes.BYTES_COMPARATOR); columns.add(qualifier); @@ -905,7 +923,8 @@ public class TestStoreFile extends HBaseTestCase { LOG.debug(hsf.getPath().toString()); // Read this file, we should see 3 misses - StoreFileReader reader = hsf.createReader(); + hsf.initReader(true); + StoreFileReader reader = hsf.getReader(); reader.loadFileInfo(); StoreFileScanner scanner = getStoreFileScanner(reader, true, true); scanner.seek(KeyValue.LOWESTKEY); @@ -926,7 +945,8 @@ public class TestStoreFile extends HBaseTestCase { BloomType.NONE); // Read this file, we should see 3 hits - reader = hsf.createReader(); + hsf.initReader(true); + reader = hsf.getReader(); scanner = getStoreFileScanner(reader, true, true); scanner.seek(KeyValue.LOWESTKEY); while (scanner.next() != null); @@ -940,13 +960,15 @@ public class TestStoreFile extends HBaseTestCase { // Let's read back the two files to ensure the blocks exactly match hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE); - StoreFileReader readerOne = hsf.createReader(); + hsf.initReader(true); + StoreFileReader readerOne = hsf.getReader(); readerOne.loadFileInfo(); StoreFileScanner scannerOne = getStoreFileScanner(readerOne, true, true); scannerOne.seek(KeyValue.LOWESTKEY); hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE); - StoreFileReader readerTwo = hsf.createReader(); + hsf.initReader(true); + StoreFileReader readerTwo = hsf.getReader(); readerTwo.loadFileInfo(); StoreFileScanner scannerTwo = getStoreFileScanner(readerTwo, true, true); scannerTwo.seek(KeyValue.LOWESTKEY); @@ -979,7 +1001,8 @@ public class TestStoreFile extends HBaseTestCase { cacheConf = new CacheConfig(conf); hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf, BloomType.NONE); - reader = hsf.createReader(); + hsf.initReader(true); + reader = hsf.getReader(); reader.close(cacheConf.shouldEvictOnClose()); // We should have 3 new evictions but the evict count stat should not change. Eviction because @@ -993,7 +1016,8 @@ public class TestStoreFile extends HBaseTestCase { cacheConf = new CacheConfig(conf); hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf, BloomType.NONE); - reader = hsf.createReader(); + hsf.initReader(true); + reader = hsf.getReader(); reader.close(cacheConf.shouldEvictOnClose()); // We expect no changes @@ -1080,7 +1104,8 @@ public class TestStoreFile extends HBaseTestCase { StoreFile storeFile = new StoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE); - StoreFileReader reader = storeFile.createReader(); + storeFile.initReader(true); + StoreFileReader reader = storeFile.getReader(); Map fileInfo = reader.loadFileInfo(); byte[] value = fileInfo.get(HFileDataBlockEncoder.DATA_BLOCK_ENCODING); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java index d628dc8..3d3c79c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java @@ -23,23 +23,24 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.BeforeClass; import org.junit.Test; @@ -74,7 +75,8 @@ public class TestStoreFileScannerWithTagCompression { writeStoreFile(writer); writer.close(); - StoreFileReader reader = new StoreFileReader(fs, f, cacheConf, conf); + StoreFileReader reader = + new StoreFileReader(fs, f, cacheConf, true, new AtomicInteger(0), true, conf); StoreFileScanner s = reader.getStoreFileScanner(false, false, false, 0, 0, false); try { // Now do reseek with empty KV to position to the beginning of the file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java index dff6919..170fba2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java @@ -67,9 +67,6 @@ public class TestCompactor { when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), anyBoolean())).thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); - when(sf.createReader()).thenReturn(r); - when(sf.createReader(anyBoolean())).thenReturn(r); - when(sf.cloneForReader()).thenReturn(sf); when(sf.getMaxSequenceId()).thenReturn(maxSequenceId); return sf; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index f2d00b3..b839fc3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -753,9 +753,6 @@ public class TestStripeCompactionPolicy { when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), anyBoolean())).thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); - when(sf.createReader(anyBoolean())).thenReturn(r); - when(sf.createReader()).thenReturn(r); - when(sf.cloneForReader()).thenReturn(sf); return sf; } -- 2.7.4