diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index b8303b8..d476679 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1402,7 +1402,6 @@ public class HFileBlock implements Cacheable { private static class PrefetchedHeader { long offset = -1; byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; - final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); } /** Reads version 2 blocks from the filesystem. */ @@ -1416,13 +1415,7 @@ public class HFileBlock implements Cacheable { /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ private final HFileBlockDefaultDecodingContext defaultDecodingCtx; - private ThreadLocal prefetchedHeaderForThread = - new ThreadLocal() { - @Override - public PrefetchedHeader initialValue() { - return new PrefetchedHeader(); - } - }; + private final PrefetchedHeader prefetchedHeader; public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, HFileContext fileContext) throws IOException { @@ -1432,6 +1425,7 @@ public class HFileBlock implements Cacheable { this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); encodedBlockDecodingCtx = defaultDecodingCtx; + this.prefetchedHeader = new PrefetchedHeader(); } /** @@ -1566,11 +1560,14 @@ public class HFileBlock implements Cacheable { // read this block's header as part of the previous read's look-ahead. // And we also want to skip reading the header again if it has already // been read. - // TODO: How often does this optimization fire? Has to be same thread so the thread local - // is pertinent and we have to be reading next block as in a big scan. - PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null; - + ByteBuffer headerBuf = null; + synchronized (prefetchedHeader) { + if(prefetchedHeader.offset == offset) { + byte[] buf = new byte[hdrSize]; + System.arraycopy(prefetchedHeader.header, 0, buf, 0, hdrSize); + headerBuf = ByteBuffer.wrap(buf); + } + } // Allocate enough space to fit the next block's header too. int nextBlockOnDiskSize = 0; byte[] onDiskBlock = null; @@ -1671,12 +1668,13 @@ public class HFileBlock implements Cacheable { b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; - // Set prefetched header - if (b.hasNextBlockHeader()) { - prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize); + synchronized (prefetchedHeader) { + // Set prefetched header + if (b.hasNextBlockHeader()) { + prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); + System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize); + } } - b.offset = offset; b.fileContext.setIncludesTags(this.fileContext.isIncludesTags()); b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 766ddf9..c66efd4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -277,6 +277,96 @@ public class TestHFileBlock { testReaderV2Internals(); } + @Test + public void testPrefectedReader() throws IOException { + testPrefectedReaderInternals(); + } + private Callable call(final HFileBlock.FSReaderImpl reader, final long offset, final long onDiskSize, + final int uncompressedSize, final boolean pread) { + + return new Callable() { + @Override + public HFileBlock call() throws Exception { + Random random = defaultRandom(); + Thread.sleep(random.nextInt(500)); + return reader.readBlockData(offset, onDiskSize, uncompressedSize, pread); + } + }; + } + + protected void testPrefectedReaderInternals() throws IOException { + if(includesTag) { + TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + } + for(boolean pread : new boolean[] {false, true}) { + Path path = new Path(TEST_UTIL.getDataTestDir(), "block_prefected"); + FSDataOutputStream os = fs.create(path); + HFileContext meta = new HFileContextBuilder() + .withCompression(NONE) + .withIncludesMvcc(includesMemstoreTS) + .withIncludesTags(includesTag) + .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) + .build(); + HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); + long totalSize = 0; + for (int blockId = 0; blockId < 3; ++blockId) { + DataOutputStream dos = hbw.startWriting(BlockType.DATA); + for (int i = 0; i < 1234; ++i) + dos.writeInt(i); + hbw.writeHeaderAndData(os); + totalSize += hbw.getOnDiskSizeWithHeader(); + } + os.close(); + + FSDataInputStream is = fs.open(path); + meta = new HFileContextBuilder() + .withHBaseCheckSum(true) + .withIncludesMvcc(includesMemstoreTS) + .withIncludesTags(includesTag) + .withCompression(NONE).build(); + + HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); + //read the first + HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + //then next check what + HFileBlock b1 = hbr.readBlockData(b.getOnDiskSizeWithHeader(), b.getNextBlockOnDiskSizeWithHeader(), -1, pread); + + HFileBlock b2 = hbr.readBlockData(b1.getOnDiskSizeWithHeader() + b1.getOffset(), + b1.getNextBlockOnDiskSizeWithHeader(), -1, pread); + + is.close(); + //check them + java.util.concurrent.ExecutorService es = java.util.concurrent.Executors.newFixedThreadPool(NUM_READER_THREADS * 2); + try { + List> future1 = new ArrayList<>(NUM_READER_THREADS); + List> future2 = new ArrayList<>(NUM_READER_THREADS); + is = fs.open(path); + hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); + b = hbr.readBlockData(0, -1, -1, pread); + for (int i = 0; i < NUM_READER_THREADS; i++) { + Callable call1 = call(hbr, b.getOnDiskSizeWithHeader(), + b.getNextBlockOnDiskSizeWithHeader(), -1, pread); + Callable call2 = call(hbr, b1.getOffset() + b1.getOnDiskDataSizeWithHeader(), + b1.getNextBlockOnDiskSizeWithHeader(), -1, pread); + Future f1 = es.submit(call1); + Future f2 = es.submit(call2); + future1.add(f1); + future2.add(f2); + } + for(Future f1 : future1) { + assertEquals(b1, f1.get()); + } + + for(Future f2 : future2) { + assertEquals(b2, f2.get()); + } + is.close(); + } catch (Exception ex) { + throw new IOException(ex); + } + } + } + protected void testReaderV2Internals() throws IOException { if(includesTag) { TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);