diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 2d24a27..3b4abb1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -309,6 +309,12 @@ public class HalfStoreFileReader extends StoreFile.Reader { } return this.delegate.seekBefore(key); } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } }; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FileReaderContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FileReaderContext.java new file mode 100644 index 0000000..d3c66cb --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FileReaderContext.java @@ -0,0 +1,347 @@ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.MovingAverageWithoutOutliersPredictor; + +/** + * This class represents HFileScanner private context, which is used + * in all calls to underlying HFile.Reader (HFileReaderV2 or V3) + * This context holds local data buffer (4MB in size by default) for prefetched + * data and the current offset of this prefetched buffer in an underlying HFile + * The implementation tries to manage resources. Not all context objects + * will be able to allocate data buffers to avoid potential OOME event. + * The default settings are quite conservatives ones: 50 active buffers 4MB each (200MB total) + * If you can afford - allocate more active buffers. + * + * To avoid OOME (or frequent GCs) - close scanners when you are done + * + * TODO: Make data buffer prefetch size dynamically adjustable: + * + * TODO: If scanner makes many skips - decrease prefetch size ( down to the size of a single HFile block) + * + */ +@InterfaceAudience.Private +public class FileReaderContext implements Closeable{ + public static final Log LOG = LogFactory.getLog(FileReaderContext.class); + // TODO Move these constants to HConstants ? + public final static String HFILE_READAHEAD_ACTIVE_BUFFERS = "hfile.readahead.active.buffers"; + public final static String HFILE_READAHEAD_BUFFER_SIZE = "hfile.readahead.buffer.size"; + public final static String HFILE_READAHEAD_BUFFER_EXPIRE_TIMEOUT = "hfile.readahead.buffer.expire.timeout"; + public final static String HFILE_READAHEAD_CLEANER_INTERVAL = "hfile.readahead.cleaner.interval"; + public final static String HFILE_READAHEAD_ENABLED = "hfile.readahead.enabled"; + public final static String HFILE_READAHEAD_MIN_BLOCKS= "hfile.readahead.min.blocks"; + + /** + * Default values + */ + + private static final int DEFAULT_HFILE_PSCAN_ACTIVE_BUFFERS = 50; + private static final int DEFAULT_HFILE_PSCAN_BUFFER_SIZE = 4 * 1024 * 1024; + private static final long DEFAULT_HFILE_PSCAN_BUFFER_EXPIRE_TIMEOUT = + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; // 60 seconds + + /** + * Use read-ahead buffer if we predict that + * no less than this # of blocks will be read from the buffer + * + **/ + private static final int DEFAULT_HFILE_READAHEAD_MIN_BLOCKS = 8; + + private static int maxActiveBuffers; + private static int bufferSize; + private static long bufferExpirationTimeout; + private static int minBlocks; + + private static AtomicInteger totalBuffersAllocated = new AtomicInteger(0); + + /** + * Use these counters to calculate average size of requested HFile blocks + */ + // Total bytes actually read from read-ahead buffer + private long totalBytesRead = 0; + // Total # of times read-ahead buffer was accessed + private long totalReadAccesses = 0; + /** + * Keeps last offset in a file to calculate + * skip sizes + */ + private long lastOffsetAccessed = -1; + + /** + * Byte Buffer list (bounded to avoid OOME) + */ + + private static LinkedBlockingQueue bufferList ; + + /** + * Buffer to hold the pre-fetched data + */ + private ByteBuffer data; + + /** + * Current offset of a reader in a underlying HFile + */ + private long offset; + private long lastAccessTime; + private ReentrantLock lock = new ReentrantLock(); + + /** + * Smart next skip predictor, based on MA predictor with additional + * processing logic for outliers. + */ + private MovingAverageWithoutOutliersPredictor skipPredictor; + /** + * The current recommended read-ahead size (either full buffer or -1) + */ + private int readAhead; + + public synchronized static void initIfNotInited(Configuration config) + { + if ( bufferList != null) { + // Initialized already + return; + } + maxActiveBuffers = config.getInt(HFILE_READAHEAD_ACTIVE_BUFFERS, DEFAULT_HFILE_PSCAN_ACTIVE_BUFFERS); + bufferSize = config.getInt(HFILE_READAHEAD_BUFFER_SIZE, DEFAULT_HFILE_PSCAN_BUFFER_SIZE); + bufferExpirationTimeout = config.getLong(HFILE_READAHEAD_BUFFER_EXPIRE_TIMEOUT, DEFAULT_HFILE_PSCAN_BUFFER_EXPIRE_TIMEOUT); + minBlocks = config.getInt(HFILE_READAHEAD_MIN_BLOCKS, DEFAULT_HFILE_READAHEAD_MIN_BLOCKS); + bufferList = new LinkedBlockingQueue(maxActiveBuffers); + } + + + public FileReaderContext() + { + // Do not initialize buffer data yet + // set initial read-ahead equals to buffer size + readAhead = bufferSize; + //refToThis = new SoftReference(this); + //TODO custom configuration for predictor + skipPredictor = new MovingAverageWithoutOutliersPredictor(); + } + + /** + * Returns read - ahead suggested size. + * If < 0 - it means RA is not recommended ( disabled). + * @return recommended r-a size (length) + */ + public int getReadAhead() + { + adjustReadAhead(); + return readAhead; + } + + private void adjustReadAhead() { + double nextSkip = skipPredictor.getMovingAverageNoOutliers(true); + if (totalBytesRead == 0 || totalReadAccesses == 0) return; + if (skipPredictor.isReady() == false) { + readAhead = bufferSize; + return; + } + + double avgBlockSize = (double)(totalBytesRead)/ totalReadAccesses; + + if( (minBlocks - 1) * nextSkip + avgBlockSize < bufferSize){ + readAhead = bufferSize; + } else{ + readAhead = -1; + } + } + + + private final void updatePredictor(long offset) + { + if( offset == lastOffsetAccessed){ + // do nothing + return; + } + if(lastOffsetAccessed >= 0){ + skipPredictor.add(offset - lastOffsetAccessed); + } + lastOffsetAccessed = offset; + } + + public boolean isEmpty() + { + return data == null; + } + // TODO Check this method + // it is not MT-safe + // Q: should we enforce MT-safeness? + // + public boolean containsData(long offset, int len) + { + if(isEmpty()) return false; + return offset >= this.offset && (offset + len <= data.limit() + this.offset); + } + + public int getBufferSize() + { + return bufferSize; + } + + + public void updateContext(long offset, int len) + { + if(len > 0 && data != null){ + data.limit(len); + data.position(0); + } + this.offset = offset; + + } + + /** + * Positional read from internal buffer + * @param offset + * @param buffer + * @param bufOffset + * @param len + * @return true on success, false otherwise + */ + public boolean read(long offset, byte[] buffer, int bufOffset, int len) + { + totalBytesRead += len; + totalReadAccesses ++; + updatePredictor(offset); + ByteBuffer buf = lockBuffer(); + try{ + if(buf == null){ + return false; + } + if(containsData(offset, len) == false){ + return false; + } + buf.position((int)(offset - this.offset)); + buf.get(buffer, bufOffset, len); + }finally{ + if(buf != null) { + buf.position(0); + } + releaseBuffer(buf); + } + return true; + } + /** + * Call this method to get access to data buffer + * @return data buffer, or null if buffer can + * not be allocated due to resource limitation. + */ + public ByteBuffer lockBuffer(){ + lock.lock(); + // If data is null - try to allocate buffer + if(data == null){ + data = bufferList.poll(); + // Otherwise, allocate new buffer + if( data == null && totalBuffersAllocated.get() < maxActiveBuffers){ + totalBuffersAllocated.incrementAndGet(); + data = ByteBuffer.allocate(bufferSize); + // set limit to 0 - it means that there is no data yet in + // this buffer + data.limit(0); + } else if ( data != null){ + data.clear(); + data.limit(0); + } + } + // We update access time on lockBuffer + updateAccessTime(); + return data; + } + + private void updateAccessTime() { + lastAccessTime = System.currentTimeMillis(); + } + + + /** + * This is used by cleanQueue method. We do not want to block until + * lock is available because if lock is not available it means + * that context is currently in use and is not eligible for + * cleaning. + * @return data buffer or null if this object locked + */ + public ByteBuffer tryLockBuffer() + { + if( lock.tryLock() == false) return null; + return data; + + } + /** + * Call this method when you are done with the data buffer + * @param buffer + */ + public void releaseBuffer(ByteBuffer buffer) + { + if(buffer == null){ + // probably buffer was deallocated + // make sure we nullify it + data = null; + } + if(lock.isHeldByCurrentThread()){ + // Do unlock if we hold the lock only + lock.unlock(); + } + } + + public long getLastAccessTime(){ + return lastAccessTime; + } + + + public long getOffset() + { + return offset; + } + + public void setOffset(long offset) + { + this.offset = offset; + } + + + public boolean isExpired(){ + return System.currentTimeMillis() - lastAccessTime > bufferExpirationTimeout; + } + + @Override + /** + * Make sure we close context + * Modify StoreFileScanner close() to call + * HFileScanner close() + * The best place is to add this functionality is in + * AbstractScannerV2 + */ + public synchronized void close() throws IOException { + if( isEmpty()) return; + ByteBuffer data = lockBuffer(); + try{ + if(data != null){ + // Return data back to queue + bufferList.offer(data); + } + } finally{ + releaseBuffer(data); + } + } + + public boolean ensureDataBuffer() { + ByteBuffer buffer = null; + try{ + buffer = lockBuffer(); + } finally{ + releaseBuffer(buffer); + } + return data != null; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index c6733b9..8132f88 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -363,13 +363,14 @@ public class HFile { * check only applies to data blocks and can be set to null when * the caller is expecting to read a non-data block and has set * expectedBlockType accordingly. + * @param context file reader's context * @return Block wrapped in a ByteBuffer. * @throws IOException */ HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread, final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType, - DataBlockEncoding expectedDataBlockEncoding) + DataBlockEncoding expectedDataBlockEncoding, FileReaderContext context) throws IOException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index a29103e..92623c1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1195,7 +1195,7 @@ public class HFileBlock implements Cacheable { * @return the newly read block */ HFileBlock readBlockData(long offset, long onDiskSize, - int uncompressedSize, boolean pread) throws IOException; + int uncompressedSize, boolean pread, FileReaderContext context) throws IOException; /** * Creates a block iterator over the given portion of the {@link HFile}. @@ -1264,7 +1264,7 @@ public class HFileBlock implements Cacheable { public HFileBlock nextBlock() throws IOException { if (offset >= endOffset) return null; - HFileBlock b = readBlockData(offset, -1, -1, false); + HFileBlock b = readBlockData(offset, -1, -1, false, null); offset += b.getOnDiskSizeWithHeader(); return b.unpack(fileContext, owner); } @@ -1297,60 +1297,179 @@ public class HFileBlock implements Cacheable { * -1 if it could not be determined * @throws IOException */ - protected int readAtOffset(FSDataInputStream istream, - byte[] dest, int destOffset, int size, - boolean peekIntoNextBlock, long fileOffset, boolean pread) - throws IOException { - if (peekIntoNextBlock && - destOffset + size + hdrSize > dest.length) { - // We are asked to read the next block's header as well, but there is - // not enough room in the array. - throw new IOException("Attempted to read " + size + " bytes and " + - hdrSize + " bytes of next header into a " + dest.length + - "-byte array at offset " + destOffset); - } - - if (!pread && streamLock.tryLock()) { - // Seek + read. Better for scanning. - try { - istream.seek(fileOffset); - - long realOffset = istream.getPos(); - if (realOffset != fileOffset) { - throw new IOException("Tried to seek to " + fileOffset + " to " - + "read " + size + " bytes, but pos=" + realOffset - + " after seek"); - } - - if (!peekIntoNextBlock) { - IOUtils.readFully(istream, dest, destOffset, size); - return -1; - } - - // Try to read the next block header. - if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) - return -1; - } finally { - streamLock.unlock(); - } - } else { - // Positional read. Better for random reads; or when the streamLock is already locked. - int extraSize = peekIntoNextBlock ? hdrSize : 0; - int ret = istream.read(fileOffset, dest, destOffset, size + extraSize); - if (ret < size) { - throw new IOException("Positional read of " + size + " bytes " + - "failed at offset " + fileOffset + " (returned " + ret + ")"); - } - - if (ret == size || ret < size + extraSize) { - // Could not read the next block's header, or did not try. - return -1; - } - } - - assert peekIntoNextBlock; - return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; - } + + + protected int readAtOffset(FSDataInputStream istream, byte[] dest, + int destOffset, int size, boolean peekIntoNextBlock, + long fileOffset, boolean pread, FileReaderContext context) + throws IOException { + + if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) { + // We are asked to read the next block's header as well, but + // there is + // not enough room in the array. + throw new IOException("Attempted to read " + size + + " bytes and " + hdrSize + + " bytes of next header into a " + dest.length + + "-byte array at offset " + destOffset); + } + + if (!pread && (context == null || context.isEmpty()) + && streamLock.tryLock()) { + // Seek + read. Better for scanning. + // Read-ahead disabled or read context is empty and lock + // successful? + // Run old stream mode. + try { + istream.seek(fileOffset); + + long realOffset = istream.getPos(); + if (realOffset != fileOffset) { + throw new IOException("Tried to seek to " + fileOffset + + " to " + "read " + size + " bytes, but pos=" + + realOffset + " after seek"); + } + + if (!peekIntoNextBlock) { + IOUtils.readFully(istream, dest, destOffset, size); + return -1; + } + + // Try to read the next block header. + if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) + return -1; + } finally { + // No need to releaseLock + streamLock.unlock(); + } + } else if (pread || context == null /* read-ahead is disabled */) { + // Positional read. Better for random reads; or when the + // streamLock is already locked. + // Or read-ahead disabled and stream lock is not accessible + int extraSize = peekIntoNextBlock ? hdrSize : 0; + + int ret = istream.read(fileOffset, dest, destOffset, size + + extraSize); + if (ret < size) { + throw new IOException("Positional read of " + size + + " bytes " + "failed at offset " + fileOffset + + " (returned " + ret + ")"); + } + + if (ret == size || ret < size + extraSize) { + // Could not read the next block's header, or did not try. + return -1; + } + } else { + // Streaming read && read-ahead is enabled + // Positional read with context. + int extraSize = peekIntoNextBlock ? hdrSize : 0; + + boolean fromContextBuffer = context.read(fileOffset, dest, + destOffset, size + extraSize); + + if (fromContextBuffer == false) { + + // Read data into temporary buffer if buffer is large enough + int readAhead = context.getReadAhead(); + int toRead = Math.max(readAhead, size + extraSize); + + if (toRead > context.getBufferSize() || context.isEmpty() + || readAhead < 0) { + // No read-ahead if requested size > buffer size or + // there is no data buffer (no resources) - just load + // only single block + // or read-ahead is not advised (read-ahead size < 0) + int ret = istream.read(fileOffset, dest, destOffset, + size + extraSize); + if (ret < size) { + throw new IOException("Positional read of " + size + + " bytes " + "failed at offset " + + fileOffset + " (returned " + ret + ")"); + } + + if (ret == size || ret < size + extraSize) { + // Could not read the next block's header, or did + // not try. + return -1; + } + + } else { + // Read - ahead and do pre-fetch + ByteBuffer buffer = context.lockBuffer(); + boolean success = false; + if (buffer != null) { + + byte[] tmpBuffer = buffer.array(); + int ret = 0; + try { + + ret = istream.read(fileOffset, tmpBuffer, 0, + toRead); + + if (ret < size) { + throw new IOException("Positional read of " + + size + " bytes " + + "failed at offset " + fileOffset + + " (returned " + ret + ")"); + } + success = true; + } finally { + if (buffer != null) { + // I do not think we will ever + // have have null + // release operation may fail if there is no + // read-ahead buffer (was deallocated + // by a cleaner thread) + // success = + // context.releaseTmpBuffer(fileOffset, + // tmpBuffer, ret); + context.updateContext(fileOffset, ret); + context.releaseBuffer(buffer); + } + } + } + + if (success == true) { + // Repeat read from context buffer + fromContextBuffer = context.read(fileOffset, dest, + destOffset, size + extraSize); + if (!fromContextBuffer || !peekIntoNextBlock) { + // Could not read the next block's header, or + // did not try. + return -1; + } + } else { + // if buffer is null - do clean + // positional read + int ret = istream.read(fileOffset, dest, destOffset, + size + extraSize); + if (ret < size) { + throw new IOException("Positional read of " + + size + " bytes " + + "failed at offset " + fileOffset + + " (returned " + ret + ")"); + } + + if (ret == size || ret < size + extraSize) { + // Could not read the next block's header, or + // did not try. + return -1; + } + } + } + } else { + // Return -1 if we did not read next block header + if (peekIntoNextBlock == false) + return -1; + } + } + + assert peekIntoNextBlock; + return Bytes + .toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + + hdrSize; + } } @@ -1414,7 +1533,7 @@ public class HFileBlock implements Cacheable { */ @Override public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, - int uncompressedSize, boolean pread) throws IOException { + int uncompressedSize, boolean pread, FileReaderContext context) throws IOException { // get a copy of the current state of whether to validate // hbase checksums or not for this read call. This is not @@ -1427,7 +1546,7 @@ public class HFileBlock implements Cacheable { HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, uncompressedSize, pread, - doVerificationThruHBaseChecksum); + doVerificationThruHBaseChecksum, context); if (blk == null) { HFile.LOG.warn("HBase checksum verification failed for file " + path + " at offset " + @@ -1455,7 +1574,7 @@ public class HFileBlock implements Cacheable { doVerificationThruHBaseChecksum = false; blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, uncompressedSize, pread, - doVerificationThruHBaseChecksum); + doVerificationThruHBaseChecksum, context); if (blk != null) { HFile.LOG.warn("HDFS checksum verification suceeded for file " + path + " at offset " + @@ -1496,7 +1615,7 @@ public class HFileBlock implements Cacheable { */ private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread, - boolean verifyChecksum) throws IOException { + boolean verifyChecksum, FileReaderContext context) throws IOException { if (offset < 0) { throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" + onDiskSizeWithHeaderL @@ -1544,7 +1663,7 @@ public class HFileBlock implements Cacheable { // next block's header nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, - true, offset + preReadHeaderSize, pread); + true, offset + preReadHeaderSize, pread, context); if (headerBuf != null) { // the header has been read when reading the previous block, copy // to this block's header @@ -1593,14 +1712,14 @@ public class HFileBlock implements Cacheable { // to the block index. This is costly and should happen very rarely. headerBuf = ByteBuffer.allocate(hdrSize); readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), - hdrSize, false, offset, pread); + hdrSize, false, offset, pread, context); } b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() - - hdrSize, true, offset + hdrSize, pread); + - hdrSize, true, offset + hdrSize, pread, context); onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 1073e93..0a0dbed 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -178,11 +178,12 @@ public class HFileBlockIndex { * @throws IOException */ public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks, - boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding) + boolean pread, boolean isCompaction, + DataBlockEncoding expectedDataBlockEncoding, FileReaderContext context) throws IOException { BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock, cacheBlocks, - pread, isCompaction, expectedDataBlockEncoding); + pread, isCompaction, expectedDataBlockEncoding, context); if (blockWithScanInfo == null) { return null; } else { @@ -210,8 +211,8 @@ public class HFileBlockIndex { * @throws IOException */ public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock, - boolean cacheBlocks, - boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding) + boolean cacheBlocks, boolean pread, boolean isCompaction, + DataBlockEncoding expectedDataBlockEncoding, FileReaderContext context) throws IOException { int rootLevelIndex = rootBlockContainingKey(key); if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) { @@ -259,7 +260,7 @@ public class HFileBlockIndex { } block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread, isCompaction, true, - expectedBlockType, expectedDataBlockEncoding); + expectedBlockType, expectedDataBlockEncoding, context); } if (block == null) { @@ -338,7 +339,7 @@ public class HFileBlockIndex { // Caching, using pread, assuming this is not a compaction. HFileBlock midLeafBlock = cachingBlockReader.readBlock( midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true, - BlockType.LEAF_INDEX, null); + BlockType.LEAF_INDEX, null, null); ByteBuffer b = midLeafBlock.getBufferWithoutHeader(); int numDataBlocks = b.getInt(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java index ee5fbb6..7f9dd74 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java @@ -291,7 +291,7 @@ public class HFilePrettyPrinter extends Configured implements Tool { HFileBlock block; while (offset <= max) { block = reader.readBlock(offset, -1, /* cacheBlock */ false, /* pread */ false, - /* isCompaction */ false, /* updateCacheMetrics */ false, null, null); + /* isCompaction */ false, /* updateCacheMetrics */ false, null, null, null); offset += block.getOnDiskSizeWithHeader(); System.out.println(block); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 6f67df3..3b36db6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -197,7 +197,7 @@ public class HFileReaderV2 extends AbstractHFileReader { onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); } HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, - null, null); + null, null, null); prevBlock = block; offset += block.getOnDiskSizeWithHeader(); } @@ -353,7 +353,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true).unpack(hfileContext, fsBlockReader); + blockSize, -1, true, null).unpack(hfileContext, fsBlockReader); // Cache the block if (cacheBlock) { @@ -391,7 +391,7 @@ public class HFileReaderV2 extends AbstractHFileReader { public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock, boolean pread, final boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, - DataBlockEncoding expectedDataBlockEncoding) + DataBlockEncoding expectedDataBlockEncoding, FileReaderContext context) throws IOException { if (dataBlockIndexReader == null) { throw new IOException("Block index not loaded"); @@ -453,7 +453,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } // Load block from filesystem. HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, - pread); + pread, context); validateBlockType(hfileBlock, expectedBlockType); HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); @@ -479,6 +479,8 @@ public class HFileReaderV2 extends AbstractHFileReader { } } + + @Override public boolean hasMVCCInfo() { return includesMemstoreTS && decodeMemstoreTS; @@ -561,6 +563,7 @@ public class HFileReaderV2 extends AbstractHFileReader { protected abstract static class AbstractScannerV2 extends AbstractHFileReader.Scanner { protected HFileBlock block; + protected FileReaderContext context; /** * The next indexed key is to keep track of the indexed key of the next data block. @@ -574,8 +577,33 @@ public class HFileReaderV2 extends AbstractHFileReader { public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks, final boolean pread, final boolean isCompaction) { super(r, cacheBlocks, pread, isCompaction); + if( pread == false && isScannerReadAheadEnabled(r.getConf())){ + // Initialize FileReaderContext if not done yet + FileReaderContext.initIfNotInited(r.getConf()); + // Create context only if we open scanner in a streaming mode + context = new FileReaderContext(); + } } - + + /** + * Check if scanner's read-ahead enabled + * @param cfg - HBase configuration object + * @return true if enabled, false - otherwise + * + */ + private final boolean isScannerReadAheadEnabled(Configuration cfg) + { + return cfg.getBoolean(FileReaderContext.HFILE_READAHEAD_ENABLED, false) + || System.getProperty(FileReaderContext.HFILE_READAHEAD_ENABLED) != null; + } + + @Override + public void close() throws IOException { + if(context != null){ + context.close(); + } + } + protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock); protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey, @@ -648,7 +676,7 @@ public class HFileReaderV2 extends AbstractHFileReader { public int seekTo(Cell key, boolean rewind) throws IOException { HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block, - cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding()); + cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding(), context); if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { // This happens if the key e.g. falls before the beginning of the file. return -1; @@ -666,7 +694,7 @@ public class HFileReaderV2 extends AbstractHFileReader { public boolean seekBefore(Cell key) throws IOException { HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block, cacheBlocks, pread, isCompaction, - ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction)); + ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction), context); if (seekToBlock == null) { return false; } @@ -688,7 +716,8 @@ public class HFileReaderV2 extends AbstractHFileReader { // figure out the size. seekToBlock = reader.readBlock(previousBlockOffset, seekToBlock.getOffset() - previousBlockOffset, cacheBlocks, - pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + pread, isCompaction, true, BlockType.DATA, + getEffectiveDataBlockEncoding(), context); // TODO shortcut: seek forward in this block to the last key of the // block. } @@ -724,7 +753,7 @@ public class HFileReaderV2 extends AbstractHFileReader { curBlock = reader.readBlock(curBlock.getOffset() + curBlock.getOnDiskSizeWithHeader(), curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, - isCompaction, true, null, getEffectiveDataBlockEncoding()); + isCompaction, true, null, getEffectiveDataBlockEncoding(), context); } while (!curBlock.getBlockType().isData()); return curBlock; @@ -888,7 +917,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding(), context); if (block.getOffset() < 0) { throw new IOException("Invalid block offset: " + block.getOffset()); } @@ -1183,7 +1212,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding(), context); if (block.getOffset() < 0) { throw new IOException("Invalid block offset: " + block.getOffset()); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index b951fab..083904d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,7 +39,7 @@ import org.apache.hadoop.hbase.KeyValue; * getValue. */ @InterfaceAudience.Private -public interface HFileScanner { +public interface HFileScanner extends Closeable{ /** * SeekTo or just before the passed key. Examine the return * code to figure whether we found the key or not. @@ -157,4 +158,6 @@ public interface HFileScanner { * Otherwise returns false. */ boolean isSeeked(); + + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 2784845..0783c5e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -233,6 +233,13 @@ public class StoreFileScanner implements KeyValueScanner { public void close() { // Nothing to close on HFileScanner? cur = null; + if(hfs != null){ + try { + hfs.close(); + } catch (IOException e) { + LOG.error( hfs.getClass().getName() + " instance failed to close", e); + } + } } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java index 8e87132..ce92a1e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java @@ -99,7 +99,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase // We cache the block and use a positional read. bloomBlock = reader.readBlock(index.getRootBlockOffset(block), index.getRootBlockDataSize(block), true, true, false, true, - BlockType.BLOOM_CHUNK, null); + BlockType.BLOOM_CHUNK, null, null); } catch (IOException ex) { // The Bloom filter is broken, turn it off. throw new IllegalArgumentException( diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/MovingAverageWithoutOutliersPredictor.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/MovingAverageWithoutOutliersPredictor.java new file mode 100644 index 0000000..0823d55 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/MovingAverageWithoutOutliersPredictor.java @@ -0,0 +1,203 @@ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; + +/** + * + * This class is the enhanced version of + * Moving Average Predictor, which filters out + * all outliers based on IQR rule (interquartile range): + * The interquartile range is often used to find outliers in data. + * Outliers are observations that fall below Q1 - 1.5(IQR) or + * above Q3 + 1.5(IQR) + * + * and then performs MA calculation. This allows us to filter out + * long range occasional skips (both: forward ones and backward ones). + * + * + */ +public class MovingAverageWithoutOutliersPredictor { + + private static double DEFAULT_RANGE = 1.5d; + private static int DEFAULT_PREDICTOR_SIZE = 20; + private final double[] values; + private final double[] tmp; + + int index = 0; + // Total count of readings + long count = 0; + double range = DEFAULT_RANGE; + + double q1 = 0; // First quartile + double q3 = 0; // Third quartile + + public MovingAverageWithoutOutliersPredictor(int predictorSize, double outlierRange) + { + values = new double[predictorSize]; + tmp = new double[predictorSize]; + range = outlierRange; + } + + public MovingAverageWithoutOutliersPredictor(int predictorSize) + { + this(predictorSize, DEFAULT_RANGE); + } + + public MovingAverageWithoutOutliersPredictor() + { + this(DEFAULT_PREDICTOR_SIZE, DEFAULT_RANGE); + } + + public void add(double v) + { + values[index] = v; + count++; + index = (index +1) % values.length; + } + + public long getCount() + { + return count; + } + /** + * + * @return true, if predictor is ready to work + */ + public boolean isReady() + { + return count >= getPredictorSize(); + } + + public double getMovingAverage() + { + int limit = 0; + if(count >= values.length){ + limit = values.length; + } else{ + limit = index; + } + if(limit == 0) return 0; + double sum = 0; + for(int i=0; i < limit; i ++){ + sum += values[i]; + } + return sum / limit; + } + + + public double getMovingAverageNoOutliers(boolean calculate) + { + int size = (count < values.length)? (int) count: values.length; + + if(calculate == true){ + // Identify q1 and q3 first + System.arraycopy(values, 0, tmp, 0, tmp.length); + Arrays.sort(tmp, 0, size); + int off25 = size / 4; + int off75 = (3 * size)/ 4; + q1 = tmp[off25]; + q3 = tmp[off75]; + } + + double sum = 0; + int notOutliers = 0; + for(int i = 0; i < size; i++ ) + { + if (isOutlier(tmp[i])) continue; + sum += tmp[i]; + notOutliers ++; + } + if(notOutliers > 0) return sum / notOutliers; + return Double.NaN; + } + /** + * Gets predictor size. + * @return p[redictor's size + */ + public int getPredictorSize() + { + return values.length; + } + + public int getOutliersNumber() + { + // Identify q1 and q3 first + System.arraycopy(values, 0, tmp, 0, tmp.length); + int size = (count < values.length)? (int) count: values.length; + Arrays.sort(tmp, 0, size); + int off25 = size / 4; + int off75 = (3 * size)/ 4; + q1 = tmp[off25]; + q3 = tmp[off75]; + int noOutliers = 0; + for(int i = 0; i < size; i++ ) + { + if (isOutlier(tmp[i])){ + noOutliers ++; + continue; + } + } + + return noOutliers; + } + + private final boolean isOutlier(double v) + { + double iqr = q3 - q1; + return (v < q1 - range * iqr) || (v > q3 + range * iqr); + } + + + public static void main(String[] args) + { + System.out.println("Start "); + MovingAverageWithoutOutliersPredictor predictor = new MovingAverageWithoutOutliersPredictor(16); + + predictor.add(2000); + predictor.add(2100); + predictor.add(2000); + predictor.add(2050); + predictor.add(2000); + predictor.add(2200); + predictor.add(2500); + predictor.add(-500); + predictor.add(2000); + predictor.add(2100); + predictor.add(2000); + predictor.add(2050); + predictor.add(2000); + predictor.add(2200); + predictor.add(2500); + predictor.add(-500); + + System.out.println("avg ="+predictor.getMovingAverage()+ + " avg.no.out="+predictor.getMovingAverageNoOutliers(true)); + + + System.out.println("Second predictor starts ..."); + predictor = new MovingAverageWithoutOutliersPredictor(); + + for (int i=0; i < 10; i ++){ + predictor.add(2000); + predictor.add(2100); + predictor.add(2000); + predictor.add(2050); + predictor.add(2000); + predictor.add(2200); + predictor.add(3000); + predictor.add(-1000); + predictor.add(2000); + predictor.add(2100); + predictor.add(2000); + predictor.add(2050); + predictor.add(2000); + predictor.add(2200); + predictor.add(2700); + predictor.add(-600); + } + + System.out.println("Size="+predictor.getPredictorSize()+" outliers="+predictor.getOutliersNumber()+ " avg ="+predictor.getMovingAverage()+ + " avg.no.out.no.calc="+predictor.getMovingAverageNoOutliers(false) + + " avg.no.out="+predictor.getMovingAverageNoOutliers(true)); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index fbbccf2..a2a5f12 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -256,7 +256,7 @@ public class TestCacheOnWrite { // Flags: don't cache the block, use pread, this is not a compaction. // Also, pass null for expected block type to avoid checking it. HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, - false, true, null, encodingInCache); + false, true, null, encodingInCache, null); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 00c6aa5..e258e22 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -119,7 +119,7 @@ public class TestChecksum { .withHBaseCheckSum(true) .build(); HFileBlock.FSReader hbr = new FSReaderV2Test(is, totalSize, fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread, null); b.sanityCheck(); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, @@ -140,17 +140,17 @@ public class TestChecksum { // requests. Verify that this is correct. for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread, null); assertEquals(0, HFile.getChecksumFailuresCount()); } // The next read should have hbase checksum verification reanabled, // we verify this by assertng that there was a hbase-checksum failure. - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread, null); assertEquals(1, HFile.getChecksumFailuresCount()); // Since the above encountered a checksum failure, we switch // back to not checking hbase checksums. - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread, null); assertEquals(0, HFile.getChecksumFailuresCount()); is.close(); @@ -161,7 +161,7 @@ public class TestChecksum { assertEquals(false, newfs.useHBaseChecksum()); is = new FSDataInputStreamWrapper(newfs, path); hbr = new FSReaderV2Test(is, totalSize, newfs, path, meta); - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, -1, pread, null); is.close(); b.sanityCheck(); b = b.unpack(meta, hbr); @@ -245,7 +245,7 @@ public class TestChecksum { .build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper( is, nochecksum), totalSize, hfs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread, null); is.close(); b.sanityCheck(); assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 254af72..af5cb23 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -314,7 +314,7 @@ public class TestHFileBlock { .withIncludesTags(includesTag) .withCompression(algo).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(is, totalSize, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread, null); is.close(); assertEquals(0, HFile.getChecksumFailuresCount()); @@ -328,12 +328,12 @@ public class TestHFileBlock { is = fs.open(path); hbr = new HFileBlock.FSReaderV2(is, totalSize, meta); b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + - b.totalChecksumBytes(), -1, pread); + b.totalChecksumBytes(), -1, pread, null); assertEquals(expected, b); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize - + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread); + + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread, null); fail("Exception expected"); } catch (IOException ex) { String expectedPrefix = "On-disk size without header provided is " @@ -418,7 +418,7 @@ public class TestHFileBlock { HFileBlock blockFromHFile, blockUnpacked; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - blockFromHFile = hbr.readBlockData(pos, -1, -1, pread); + blockFromHFile = hbr.readBlockData(pos, -1, -1, pread, null); assertEquals(0, HFile.getChecksumFailuresCount()); blockFromHFile.sanityCheck(); pos += blockFromHFile.getOnDiskSizeWithHeader(); @@ -552,7 +552,7 @@ public class TestHFileBlock { if (detailedLogging) { LOG.info("Reading block #" + i + " at offset " + curOffset); } - HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread); + HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread, null); if (detailedLogging) { LOG.info("Block #" + i + ": " + b); } @@ -567,7 +567,7 @@ public class TestHFileBlock { // Now re-load this block knowing the on-disk size. This tests a // different branch in the loader. HFileBlock b2 = hbr.readBlockData(curOffset, - b.getOnDiskSizeWithHeader(), -1, pread); + b.getOnDiskSizeWithHeader(), -1, pread, null); b2.sanityCheck(); assertEquals(b.getBlockType(), b2.getBlockType()); @@ -676,7 +676,7 @@ public class TestHFileBlock { HFileBlock b; try { long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; - b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread); + b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread, null); } catch (IOException ex) { LOG.error("Error in client " + clientId + " trying to read block at " + offset + ", pread=" + pread + ", withOnDiskSize=" + diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index da6761e..105393e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -199,7 +199,7 @@ public class TestHFileBlockCompatibility { .build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, -1, pread, null); is.close(); b.sanityCheck(); @@ -213,12 +213,12 @@ public class TestHFileBlockCompatibility { hbr = new HFileBlock.FSReaderV2(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta); b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + - b.totalChecksumBytes(), -1, pread); + b.totalChecksumBytes(), -1, pread, null); assertEquals(expected, b); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize - + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread); + + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread, null); fail("Exception expected"); } catch (IOException ex) { String expectedPrefix = "On-disk size without header provided is " @@ -300,7 +300,7 @@ public class TestHFileBlockCompatibility { HFileBlock b; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - b = hbr.readBlockData(pos, -1, -1, pread); + b = hbr.readBlockData(pos, -1, -1, pread, null); b.sanityCheck(); if (meta.isCompressedOrEncrypted()) { assertFalse(b.isUnpacked()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index d30b2cf..821b4eb 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -168,7 +168,7 @@ public class TestHFileBlockIndex { public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, - DataBlockEncoding expectedDataBlockEncoding) + DataBlockEncoding expectedDataBlockEncoding, FileReaderContext context) throws IOException { if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread == prevPread) { @@ -178,13 +178,15 @@ public class TestHFileBlockIndex { missCount += 1; prevBlock = realReader.readBlockData(offset, onDiskSize, - -1, pread); + -1, pread, context); prevOffset = offset; prevOnDiskSize = onDiskSize; prevPread = pread; return prevBlock; } + + } public void readIndex(boolean useTags) throws IOException { @@ -219,7 +221,7 @@ public class TestHFileBlockIndex { assertTrue(indexReader != null); HFileBlock b = indexReader.seekToDataBlock(new KeyValue.KeyOnlyKeyValue(key, 0, key.length), null, - true, true, false, null); + true, true, false, null, null); if (Bytes.BYTES_RAWCOMPARATOR.compare(key, firstKeyInFile) < 0) { assertTrue(b == null); ++i; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java index b1a4d57..206f9bb 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java @@ -95,7 +95,7 @@ public class TestHFileEncryption { private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderV2 hbr, int size) throws IOException { - HFileBlock b = hbr.readBlockData(pos, -1, -1, false); + HFileBlock b = hbr.readBlockData(pos, -1, -1, false, null); assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); assertFalse(b.isUnpacked()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index 12b0639..1472320 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -191,7 +191,7 @@ public class TestHFileWriterV2 { fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false, null); assertEquals(BlockType.DATA, block.getBlockType()); if (meta.isCompressedOrEncrypted()) { assertFalse(block.isUnpacked()); @@ -238,7 +238,7 @@ public class TestHFileWriterV2 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false, null) .unpack(meta, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index 471a44d..a66cb38 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -220,7 +220,7 @@ public class TestHFileWriterV3 { fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false, null) .unpack(context, blockReader); assertEquals(BlockType.DATA, block.getBlockType()); ByteBuffer buf = block.getBufferWithoutHeader(); @@ -279,7 +279,7 @@ public class TestHFileWriterV3 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false, null) .unpack(context, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java index 2fd3684..6a512d9 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java @@ -127,7 +127,7 @@ public class TestLazyDataBlockDecompression { HFileBlock block; while (offset <= max) { block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false, - /* isCompaction */ false, /* updateCacheMetrics */ true, null, null); + /* isCompaction */ false, /* updateCacheMetrics */ true, null, null, null); offset += block.getOnDiskSizeWithHeader(); blocks.add(block); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 4ceafb4..4c6d015 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -88,7 +88,7 @@ public class TestPrefetch { onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); } HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null, - null); + null, null); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; if (block.getBlockType() == BlockType.DATA || diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 1f63a48..3753bde 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -235,7 +235,7 @@ public class TestCacheOnWriteInSchema { // Flags: don't cache the block, use pread, this is not a compaction. // Also, pass null for expected block type to avoid checking it. HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, - false, true, null, DataBlockEncoding.NONE); + false, true, null, DataBlockEncoding.NONE, null); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); boolean isCached = cache.getBlock(blockCacheKey, true, false, true) != null;