diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java index 56510f0..4502983 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java @@ -238,7 +238,7 @@ public class FixedFileTrailer { BlockType.TRAILER.readAndCheck(inputStream); if (majorVersion > 2 - || (majorVersion == 2 && minorVersion >= HFileReaderV2.PBUF_TRAILER_MINOR_VERSION)) { + || (majorVersion == 2 && minorVersion >= HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION)) { deserializeFromPB(inputStream); } else { deserializeFromWritable(inputStream); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 2b88f81..a00bb41 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -293,7 +293,7 @@ public class HFile { "filesystem/path or path"); } if (path != null) { - ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes); + ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes); } return createWriter(fs, path, ostream, comparator, fileContext); @@ -439,6 +439,12 @@ public class HFile { * Return the file context of the HFile this reader belongs to */ HFileContext getFileContext(); + + boolean shouldIncludeMemstoreTS(); + + boolean isDecodeMemstoreTS(); + + DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction); } /** @@ -462,9 +468,9 @@ public class HFile { trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size); switch (trailer.getMajorVersion()) { case 2: - return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf); + throw new UnsupportedOperationException("Only v3+ supported; migrate to hbase 1.0.0"); case 3 : - return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf); + return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf); default: throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 6341f2d..4913789 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1179,6 +1179,16 @@ public class HFileBlock implements Cacheable { HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException; } + /** + * We always prefetch the header of the next block, so that we know its + * on-disk size in advance and can read it in one operation. + */ + private static class PrefetchedHeader { + long offset = -1; + byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; + ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); + } + /** A full-fledged reader with iteration ability. */ public interface FSReader { @@ -1215,13 +1225,16 @@ public class HFileBlock implements Cacheable { /** Get the default decoder for blocks from this file. */ HFileBlockDecodingContext getDefaultBlockDecodingContext(); + + void setIncludesMemstoreTS(boolean includesMemstoreTS); + void setDataBlockEncoder(HFileDataBlockEncoder encoder); } /** * A common implementation of some methods of {@link FSReader} and some * tools for implementing HFile format version-specific block readers. */ - private abstract static class AbstractFSReader implements FSReader { + static class FSReaderImpl implements FSReader { /** Compression algorithm used by the {@link HFile} */ /** The size of the file we are reading from, or -1 if unknown. */ @@ -1241,20 +1254,41 @@ public class HFileBlock implements Cacheable { /** The default buffer size for our buffered streams */ public static final int DEFAULT_BUFFER_SIZE = 1 << 20; + /** The file system stream of the underlying {@link HFile} that + * does or doesn't do checksum validations in the filesystem */ + protected FSDataInputStreamWrapper streamWrapper; + protected HFileContext fileContext; - public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext) - throws IOException { + private HFileBlockDecodingContext encodedBlockDecodingCtx; + + /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ + private final HFileBlockDefaultDecodingContext defaultDecodingCtx; + + FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, + HFileContext fileContext) throws IOException { this.fileSize = fileSize; this.hfs = hfs; this.path = path; this.fileContext = fileContext; this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); + + this.streamWrapper = stream; + // Older versions of HBase didn't support checksum. + this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); + defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); + encodedBlockDecodingCtx = defaultDecodingCtx; } - @Override - public BlockIterator blockRange(final long startOffset, - final long endOffset) { + /** + * A constructor that reads files with the latest minor version. + * This is used by unit tests only. + */ + FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext) throws IOException { + this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext); + } + + public BlockIterator blockRange(final long startOffset, final long endOffset) { final FSReader owner = this; // handle for inner class return new BlockIterator() { private long offset = startOffset; @@ -1351,29 +1385,6 @@ public class HFileBlock implements Cacheable { return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; } - } - - /** - * We always prefetch the header of the next block, so that we know its - * on-disk size in advance and can read it in one operation. - */ - private static class PrefetchedHeader { - long offset = -1; - byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; - ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); - } - - /** Reads version 2 blocks from the filesystem. */ - static class FSReaderV2 extends AbstractFSReader { - /** The file system stream of the underlying {@link HFile} that - * does or doesn't do checksum validations in the filesystem */ - protected FSDataInputStreamWrapper streamWrapper; - - private HFileBlockDecodingContext encodedBlockDecodingCtx; - - /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ - private final HFileBlockDefaultDecodingContext defaultDecodingCtx; - private ThreadLocal prefetchedHeaderForThread = new ThreadLocal() { @Override @@ -1382,24 +1393,6 @@ public class HFileBlock implements Cacheable { } }; - public FSReaderV2(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, - HFileContext fileContext) throws IOException { - super(fileSize, hfs, path, fileContext); - this.streamWrapper = stream; - // Older versions of HBase didn't support checksum. - this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); - defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); - encodedBlockDecodingCtx = defaultDecodingCtx; - } - - /** - * A constructor that reads files with the latest minor version. - * This is used by unit tests only. - */ - FSReaderV2(FSDataInputStream istream, long fileSize, HFileContext fileContext) throws IOException { - this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext); - } - /** * Reads a version 2 block. Tries to do as little memory allocation as * possible, using the provided on-disk size. @@ -1633,11 +1626,11 @@ public class HFileBlock implements Cacheable { return b; } - void setIncludesMemstoreTS(boolean includesMemstoreTS) { + public void setIncludesMemstoreTS(boolean includesMemstoreTS) { this.fileContext.setIncludesMvcc(includesMemstoreTS); } - void setDataBlockEncoder(HFileDataBlockEncoder encoder) { + public void setDataBlockEncoder(HFileDataBlockEncoder encoder) { encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 8c1e7b9..7e34153 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -18,117 +18,120 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; +import java.security.Key; +import java.security.KeyException; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Cipher; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.security.EncryptionUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.io.WritableUtils; +import org.htrace.Trace; +import org.htrace.TraceScope; + +import com.google.common.annotations.VisibleForTesting; /** - * Common functionality needed by all versions of {@link HFile} readers. + * Implementation that can handle all hfile versions of {@link HFile.Reader}. */ @InterfaceAudience.Private @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") -public abstract class AbstractHFileReader - implements HFile.Reader, Configurable { - /** Stream to read from. Does checksum verifications in file system */ - protected FSDataInputStream istream; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD - - /** The file system stream of the underlying {@link HFile} that - * does not do checksum verification in the file system */ - protected FSDataInputStream istreamNoFsChecksum; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD +public class HFileReaderImpl implements HFile.Reader, Configurable { + // This class is HFileReaderV3 + HFileReaderV2 + AbstractHFileReader all squashed together into + // one file. Ditto for all the HFileReader.ScannerV? implementations. I was running up against + // the MaxInlineLevel limit because too many tiers involved reading from an hfile. Was also hard + // to navigate the source code when so many classes participating in read. + private static final Log LOG = LogFactory.getLog(HFileReaderImpl.class); /** Data block index reader keeping the root data index in memory */ - protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader; + private HFileBlockIndex.BlockIndexReader dataBlockIndexReader; /** Meta block index reader -- always single level */ - protected HFileBlockIndex.BlockIndexReader metaBlockIndexReader; + private HFileBlockIndex.BlockIndexReader metaBlockIndexReader; - protected final FixedFileTrailer trailer; + private final FixedFileTrailer trailer; /** Filled when we read in the trailer. */ - protected final Compression.Algorithm compressAlgo; + private final Compression.Algorithm compressAlgo; /** * What kind of data block encoding should be used while reading, writing, * and handling cache. */ - protected HFileDataBlockEncoder dataBlockEncoder = - NoOpDataBlockEncoder.INSTANCE; + private HFileDataBlockEncoder dataBlockEncoder = NoOpDataBlockEncoder.INSTANCE; /** Last key in the file. Filled in when we read in the file info */ - protected byte [] lastKey = null; + private byte [] lastKey = null; /** Average key length read from file info */ - protected int avgKeyLen = -1; + private int avgKeyLen = -1; /** Average value length read from file info */ - protected int avgValueLen = -1; + private int avgValueLen = -1; /** Key comparator */ - protected KVComparator comparator = new KVComparator(); + private KVComparator comparator = new KVComparator(); /** Size of this file. */ - protected final long fileSize; + private final long fileSize; /** Block cache configuration. */ - protected final CacheConfig cacheConf; + private final CacheConfig cacheConf; /** Path of file */ - protected final Path path; + private final Path path; /** File name to be used for block names */ - protected final String name; + private final String name; - protected FileInfo fileInfo; + private FileInfo fileInfo; - /** The filesystem used for accesing data */ - protected HFileSystem hfs; + private Configuration conf; - protected Configuration conf; - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") - protected AbstractHFileReader(Path path, FixedFileTrailer trailer, - final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs, - final Configuration conf) { - this.trailer = trailer; - this.compressAlgo = trailer.getCompressionCodec(); - this.cacheConf = cacheConf; - this.fileSize = fileSize; - this.path = path; - this.name = path.getName(); - this.hfs = hfs; // URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD - this.conf = conf; - } + private HFileContext hfileContext; @SuppressWarnings("serial") - public static class BlockIndexNotLoadedException - extends IllegalStateException { + public static class BlockIndexNotLoadedException extends IllegalStateException { public BlockIndexNotLoadedException() { // Add a message in case anyone relies on it as opposed to class name. super("Block index not loaded"); } } - protected String toStringFirstKey() { + private String toStringFirstKey() { return KeyValue.keyToString(getFirstKey()); } - protected String toStringLastKey() { + private String toStringLastKey() { return KeyValue.keyToString(getLastKey()); } - public abstract boolean isFileInfoLoaded(); - @Override public String toString() { return "reader=" + path.toString() + @@ -149,23 +152,6 @@ public abstract class AbstractHFileReader } /** - * Create a Scanner on this file. No seeks or reads are done on creation. Call - * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is - * nothing to clean up in a Scanner. Letting go of your references to the - * scanner is sufficient. NOTE: Do not use this overload of getScanner for - * compactions. - * - * @param cacheBlocks True if we should cache blocks read in by this scanner. - * @param pread Use positional read rather than seek+read if true (pread is - * better for random reads, seek+read is better scanning). - * @return Scanner on this file. - */ - @Override - public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { - return getScanner(cacheBlocks, pread, false); - } - - /** * @return the first key in the file. May be null if file has no entries. Note * that this is not the first row key, but rather the byte form of the * first KeyValue. @@ -188,9 +174,7 @@ public abstract class AbstractHFileReader @Override public byte[] getFirstRowKey() { byte[] firstKey = getFirstKey(); - if (firstKey == null) - return null; - return KeyValue.createKeyValueFromKey(firstKey).getRow(); + return firstKey == null? null: KeyValue.createKeyValueFromKey(firstKey).getRow(); } /** @@ -202,9 +186,7 @@ public abstract class AbstractHFileReader @Override public byte[] getLastRowKey() { byte[] lastKey = getLastKey(); - if (lastKey == null) - return null; - return KeyValue.createKeyValueFromKey(lastKey).getRow(); + return lastKey == null? null: KeyValue.createKeyValueFromKey(lastKey).getRow(); } /** @return number of KV entries in this HFile */ @@ -266,23 +248,21 @@ public abstract class AbstractHFileReader } } - protected static abstract class Scanner implements HFileScanner { - protected ByteBuffer blockBuffer; - - protected boolean cacheBlocks; + protected static class HFileScannerImpl implements HFileScanner { + private ByteBuffer blockBuffer; + protected final boolean cacheBlocks; protected final boolean pread; protected final boolean isCompaction; - - protected int currKeyLen; - protected int currValueLen; - protected int currMemstoreTSLen; - protected long currMemstoreTS; - - protected int blockFetches; - + private int currKeyLen; + private int currValueLen; + private int currMemstoreTSLen; + private long currMemstoreTS; + // Updated but never read? + protected volatile int blockFetches; protected final HFile.Reader reader; + private int currTagsLen; - public Scanner(final HFile.Reader reader, final boolean cacheBlocks, + public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { this.reader = reader; this.cacheBlocks = cacheBlocks; @@ -324,10 +304,540 @@ public abstract class AbstractHFileReader public HFile.Reader getReader() { return reader; } - } - /** For testing */ - abstract HFileBlock.FSReader getUncachedBlockReader(); + protected int getCellBufSize() { + int kvBufSize = KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen; + if (this.reader.getFileContext().isIncludesTags()) { + kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen; + } + return kvBufSize; + } + + protected int getNextCellStartPosition() { + int nextKvPos = blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen + + currMemstoreTSLen; + if (this.reader.getFileContext().isIncludesTags()) { + nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen; + } + return nextKvPos; + } + + protected void readKeyValueLen() { + blockBuffer.mark(); + currKeyLen = blockBuffer.getInt(); + currValueLen = blockBuffer.getInt(); + if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit() + || currValueLen > blockBuffer.limit()) { + throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen " + + currValueLen + ". Block offset: " + + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)."); + } + ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen); + if (this.reader.getFileContext().isIncludesTags()) { + // Read short as unsigned, high byte first + currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); + if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) { + throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: " + + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)."); + } + ByteBufferUtils.skip(blockBuffer, currTagsLen); + } + readMvccVersion(); + blockBuffer.reset(); + } + + /** + * Within a loaded block, seek looking for the last key that is smaller than + * (or equal to?) the key we are interested in. + * A note on the seekBefore: if you have seekBefore = true, AND the first + * key in the block = key, then you'll get thrown exceptions. The caller has + * to check for that case and load the previous block as appropriate. + * @param key + * the key to find + * @param seekBefore + * find the key before the given key in case of exact match. + * @return 0 in case of an exact key match, 1 in case of an inexact match, + * -2 in case of an inexact match and furthermore, the input key + * less than the first key of current block(e.g. using a faked index + * key) + */ + protected int blockSeek(Cell key, boolean seekBefore) { + int klen, vlen, tlen = 0; + long memstoreTS = 0; + int memstoreTSLen = 0; + int lastKeyValueSize = -1; + KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue(); + do { + blockBuffer.mark(); + klen = blockBuffer.getInt(); + vlen = blockBuffer.getInt(); + if (klen < 0 || vlen < 0 || klen > blockBuffer.limit() + || vlen > blockBuffer.limit()) { + throw new IllegalStateException("Invalid klen " + klen + " or vlen " + + vlen + ". Block offset: " + + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)."); + } + ByteBufferUtils.skip(blockBuffer, klen + vlen); + if (this.reader.getFileContext().isIncludesTags()) { + // Read short as unsigned, high byte first + tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); + if (tlen < 0 || tlen > blockBuffer.limit()) { + throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " + + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)."); + } + ByteBufferUtils.skip(blockBuffer, tlen); + } + if (this.reader.shouldIncludeMemstoreTS()) { + if (this.reader.isDecodeMemstoreTS()) { + try { + memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position()); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstore timestamp", e); + } + } else { + memstoreTS = 0; + memstoreTSLen = 1; + } + } + blockBuffer.reset(); + int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2); + keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen); + int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv); + + if (comp == 0) { + if (seekBefore) { + if (lastKeyValueSize < 0) { + throw new IllegalStateException("blockSeek with seekBefore " + + "at the first key of the block: key=" + + CellUtil.getCellKey(key) + + ", blockOffset=" + block.getOffset() + ", onDiskSize=" + + block.getOnDiskSizeWithHeader()); + } + blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + readKeyValueLen(); + return 1; // non exact match. + } + currKeyLen = klen; + currValueLen = vlen; + currTagsLen = tlen; + if (this.reader.shouldIncludeMemstoreTS()) { + currMemstoreTS = memstoreTS; + currMemstoreTSLen = memstoreTSLen; + } + return 0; // indicate exact match + } else if (comp < 0) { + if (lastKeyValueSize > 0) + blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + readKeyValueLen(); + if (lastKeyValueSize == -1 && blockBuffer.position() == 0) { + return HConstants.INDEX_KEY_MAGIC; + } + return 1; + } + + // The size of this key/value tuple, including key/value length fields. + lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; + // include tag length also if tags included with KV + if (this.reader.getFileContext().isIncludesTags()) { + lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT; + } + blockBuffer.position(blockBuffer.position() + lastKeyValueSize); + } while (blockBuffer.remaining() > 0); + + // Seek to the last key we successfully read. This will happen if this is + // the last key/value pair in the file, in which case the following call + // to next() has to return false. + blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + readKeyValueLen(); + return 1; // didn't exactly find it. + } + + protected HFileBlock block; + + /** + * The next indexed key is to keep track of the indexed key of the next data block. + * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the + * current data block is the last data block. + * + * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet. + */ + protected byte[] nextIndexedKey; + + @Override + public int seekTo(byte[] key, int offset, int length) throws IOException { + // Always rewind to the first key of the block, because the given key + // might be before or after the current key. + return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length)); + } + + @Override + public int reseekTo(byte[] key, int offset, int length) throws IOException { + return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length)); + } + + @Override + public int seekTo(Cell key) throws IOException { + return seekTo(key, true); + } + + @Override + public int reseekTo(Cell key) throws IOException { + int compared; + if (isSeeked()) { + compared = compareKey(reader.getComparator(), key); + if (compared < 1) { + // If the required key is less than or equal to current key, then + // don't do anything. + return compared; + } else { + // The comparison with no_next_index_key has to be checked + if (this.nextIndexedKey != null && + (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader + .getComparator() + .compareOnlyKeyPortion(key, + new KeyValue.KeyOnlyKeyValue(nextIndexedKey, 0, + nextIndexedKey.length)) < 0)) { + // The reader shall continue to scan the current data block instead + // of querying the + // block index as long as it knows the target key is strictly + // smaller than + // the next indexed key or the current data block is the last data + // block. + return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false); + } + } + } + // Don't rewind on a reseek operation, because reseek implies that we are + // always going forward in the file. + return seekTo(key, false); + } + + + /** + * An internal API function. Seek to the given key, optionally rewinding to + * the first key of the block before doing the seek. + * + * @param key - a cell representing the key that we need to fetch + * @param rewind whether to rewind to the first key of the block before + * doing the seek. If this is false, we are assuming we never go + * back, otherwise the result is undefined. + * @return -1 if the key is earlier than the first key of the file, + * 0 if we are at the given key, 1 if we are past the given key + * -2 if the key is earlier than the first key of the file while + * using a faked index key + * @throws IOException + */ + public int seekTo(Cell key, boolean rewind) throws IOException { + HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); + BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block, + cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding()); + if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { + // This happens if the key e.g. falls before the beginning of the file. + return -1; + } + return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(), + blockWithScanInfo.getNextIndexedKey(), rewind, key, false); + } + + @Override + public boolean seekBefore(byte[] key, int offset, int length) throws IOException { + return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length)); + } + + @Override + public boolean seekBefore(Cell key) throws IOException { + HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block, + cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction)); + if (seekToBlock == null) { + return false; + } + ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock); + + if (reader.getComparator() + .compareOnlyKeyPortion( + new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(), + firstKey.limit()), key) >= 0) { + long previousBlockOffset = seekToBlock.getPrevBlockOffset(); + // The key we are interested in + if (previousBlockOffset == -1) { + // we have a 'problem', the key we want is the first of the file. + return false; + } + + // It is important that we compute and pass onDiskSize to the block + // reader so that it does not have to read the header separately to + // figure out the size. + seekToBlock = reader.readBlock(previousBlockOffset, + seekToBlock.getOffset() - previousBlockOffset, cacheBlocks, + pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + // TODO shortcut: seek forward in this block to the last key of the + // block. + } + byte[] firstKeyInCurrentBlock = Bytes.getBytes(firstKey); + loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true); + return true; + } + + /** + * Scans blocks in the "scanned" section of the {@link HFile} until the next + * data block is found. + * + * @return the next block, or null if there are no more data blocks + * @throws IOException + */ + protected HFileBlock readNextDataBlock() throws IOException { + long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); + if (block == null) + return null; + + HFileBlock curBlock = block; + + do { + if (curBlock.getOffset() >= lastDataBlockOffset) + return null; + + if (curBlock.getOffset() < 0) { + throw new IOException("Invalid block file offset: " + block); + } + + // We are reading the next block without block type validation, because + // it might turn out to be a non-data block. + curBlock = reader.readBlock(curBlock.getOffset() + + curBlock.getOnDiskSizeWithHeader(), + curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, + isCompaction, true, null, getEffectiveDataBlockEncoding()); + } while (!curBlock.getBlockType().isData()); + + return curBlock; + } + + public DataBlockEncoding getEffectiveDataBlockEncoding() { + return this.reader.getEffectiveEncodingInCache(isCompaction); + } + + @Override + public Cell getKeyValue() { + if (!isSeeked()) + return null; + + KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + if (this.reader.shouldIncludeMemstoreTS()) { + ret.setSequenceId(currMemstoreTS); + } + return ret; + } + + @Override + public ByteBuffer getKey() { + assertSeeked(); + return ByteBuffer.wrap( + blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE, currKeyLen).slice(); + } + + public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { + return comparator.compareFlatKey(key, offset, length, blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen); + } + + @Override + public ByteBuffer getValue() { + assertSeeked(); + return ByteBuffer.wrap( + blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice(); + } + + protected void setNonSeekedState() { + block = null; + blockBuffer = null; + currKeyLen = 0; + currValueLen = 0; + currMemstoreTS = 0; + currMemstoreTSLen = 0; + currTagsLen = 0; + } + + /** + * Go to the next key/value in the block section. Loads the next block if + * necessary. If successful, {@link #getKey()} and {@link #getValue()} can + * be called. + * + * @return true if successfully navigated to the next key/value + */ + @Override + public boolean next() throws IOException { + assertSeeked(); + + try { + blockBuffer.position(getNextCellStartPosition()); + } catch (IllegalArgumentException e) { + LOG.error("Current pos = " + blockBuffer.position() + + "; currKeyLen = " + currKeyLen + "; currValLen = " + + currValueLen + "; block limit = " + blockBuffer.limit() + + "; HFile name = " + reader.getName() + + "; currBlock currBlockOffset = " + block.getOffset()); + throw e; + } + + if (blockBuffer.remaining() <= 0) { + long lastDataBlockOffset = + reader.getTrailer().getLastDataBlockOffset(); + + if (block.getOffset() >= lastDataBlockOffset) { + setNonSeekedState(); + return false; + } + + // read the next block + HFileBlock nextBlock = readNextDataBlock(); + if (nextBlock == null) { + setNonSeekedState(); + return false; + } + + updateCurrBlock(nextBlock); + return true; + } + + // We are still in the same block. + readKeyValueLen(); + return true; + } + + /** + * Positions this scanner at the start of the file. + * + * @return false if empty file; i.e. a call to next would return false and + * the current key and value are undefined. + * @throws IOException + */ + @Override + public boolean seekTo() throws IOException { + if (reader == null) { + return false; + } + + if (reader.getTrailer().getEntryCount() == 0) { + // No data blocks. + return false; + } + + long firstDataBlockOffset = + reader.getTrailer().getFirstDataBlockOffset(); + if (block != null && block.getOffset() == firstDataBlockOffset) { + blockBuffer.rewind(); + readKeyValueLen(); + return true; + } + + block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, + isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + if (block.getOffset() < 0) { + throw new IOException("Invalid block offset: " + block.getOffset()); + } + updateCurrBlock(block); + return true; + } + + protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey, + boolean rewind, Cell key, boolean seekBefore) throws IOException { + if (block == null || block.getOffset() != seekToBlock.getOffset()) { + updateCurrBlock(seekToBlock); + } else if (rewind) { + blockBuffer.rewind(); + } + + // Update the nextIndexedKey + this.nextIndexedKey = nextIndexedKey; + return blockSeek(key, seekBefore); + } + + /** + * Updates the current block to be the given {@link HFileBlock}. Seeks to + * the the first key/value pair. + * + * @param newBlock the block to make current + */ + protected void updateCurrBlock(HFileBlock newBlock) { + block = newBlock; + + // sanity check + if (block.getBlockType() != BlockType.DATA) { + throw new IllegalStateException("ScannerV2 works only on data " + + "blocks, got " + block.getBlockType() + "; " + + "fileName=" + reader.getName() + ", " + + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + + "isCompaction=" + isCompaction); + } + + blockBuffer = block.getBufferWithoutHeader(); + readKeyValueLen(); + blockFetches++; + + // Reset the next indexed key + this.nextIndexedKey = null; + } + + protected void readMvccVersion() { + if (this.reader.shouldIncludeMemstoreTS()) { + if (this.reader.isDecodeMemstoreTS()) { + try { + currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position()); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstore timestamp", e); + } + } else { + currMemstoreTS = 0; + currMemstoreTSLen = 1; + } + } + } + + protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) { + ByteBuffer buffer = curBlock.getBufferWithoutHeader(); + // It is safe to manipulate this buffer because we own the buffer object. + buffer.rewind(); + int klen = buffer.getInt(); + buffer.getInt(); + ByteBuffer keyBuff = buffer.slice(); + keyBuff.limit(klen); + keyBuff.rewind(); + return keyBuff; + } + + @Override + public String getKeyString() { + return Bytes.toStringBinary(blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE, currKeyLen); + } + + @Override + public String getValueString() { + return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen, + currValueLen); + } + + public int compareKey(KVComparator comparator, Cell key) { + return comparator.compareOnlyKeyPortion( + key, + new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen)); + } + } public Path getPath() { return path; @@ -338,8 +848,6 @@ public abstract class AbstractHFileReader return dataBlockEncoder.getDataBlockEncoding(); } - public abstract int getMajorVersion(); - @Override public Configuration getConf() { return conf; @@ -349,4 +857,836 @@ public abstract class AbstractHFileReader public void setConf(Configuration conf) { this.conf = conf; } -} + + /** Minor versions in HFile V2 starting with this number have hbase checksums */ + public static final int MINOR_VERSION_WITH_CHECKSUM = 1; + /** In HFile V2 minor version that does not support checksums */ + public static final int MINOR_VERSION_NO_CHECKSUM = 0; + + /** HFile minor version that introduced pbuf filetrailer */ + public static final int PBUF_TRAILER_MINOR_VERSION = 2; + + /** + * The size of a (key length, value length) tuple that prefixes each entry in + * a data block. + */ + public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + + protected boolean includesMemstoreTS = false; + protected boolean decodeMemstoreTS = false; + + + public boolean isDecodeMemstoreTS() { + return this.decodeMemstoreTS; + } + + public boolean shouldIncludeMemstoreTS() { + return includesMemstoreTS; + } + + /** Filesystem-level block reader. */ + protected HFileBlock.FSReader fsBlockReader; + + /** + * A "sparse lock" implementation allowing to lock on a particular block + * identified by offset. The purpose of this is to avoid two clients loading + * the same block, and have all but one client wait to get the block from the + * cache. + */ + private IdLock offsetLock = new IdLock(); + + /** + * Blocks read from the load-on-open section, excluding data root index, meta + * index, and file info. + */ + private List loadOnOpenBlocks = new ArrayList(); + + /** Minimum minor version supported by this HFile format */ + static final int MIN_MINOR_VERSION = 0; + + /** Maximum minor version supported by this HFile format */ + // We went to version 2 when we moved to pb'ing fileinfo and the trailer on + // the file. This version can read Writables version 1. + static final int MAX_MINOR_VERSION = 3; + + /** Minor versions starting with this number have faked index key */ + static final int MINOR_VERSION_WITH_FAKED_KEY = 3; + + /** + * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType} + * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary. + */ + private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, + boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding) throws IOException { + // Check cache for block. If found return. + if (cacheConf.isBlockCacheEnabled()) { + BlockCache cache = cacheConf.getBlockCache(); + HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, + updateCacheMetrics); + if (cachedBlock != null) { + if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { + cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); + } + validateBlockType(cachedBlock, expectedBlockType); + + if (expectedDataBlockEncoding == null) { + return cachedBlock; + } + DataBlockEncoding actualDataBlockEncoding = + cachedBlock.getDataBlockEncoding(); + // Block types other than data blocks always have + // DataBlockEncoding.NONE. To avoid false negative cache misses, only + // perform this check if cached block is a data block. + if (cachedBlock.getBlockType().isData() && + !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) { + // This mismatch may happen if a ScannerV2, which is used for say a + // compaction, tries to read an encoded block from the block cache. + // The reverse might happen when an EncodedScannerV2 tries to read + // un-encoded blocks which were cached earlier. + // + // Because returning a data block with an implicit BlockType mismatch + // will cause the requesting scanner to throw a disk read should be + // forced here. This will potentially cause a significant number of + // cache misses, so update so we should keep track of this as it might + // justify the work on a CompoundScannerV2. + if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) && + !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) { + // If the block is encoded but the encoding does not match the + // expected encoding it is likely the encoding was changed but the + // block was not yet evicted. Evictions on file close happen async + // so blocks with the old encoding still linger in cache for some + // period of time. This event should be rare as it only happens on + // schema definition change. + LOG.info("Evicting cached block with key " + cacheKey + + " because of a data block encoding mismatch" + + "; expected: " + expectedDataBlockEncoding + + ", actual: " + actualDataBlockEncoding); + cache.evictBlock(cacheKey); + } + return null; + } + return cachedBlock; + } + } + return null; + } + /** + * @param metaBlockName + * @param cacheBlock Add block to cache, if found + * @return block wrapped in a ByteBuffer, with header skipped + * @throws IOException + */ + @Override + public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) + throws IOException { + if (trailer.getMetaIndexCount() == 0) { + return null; // there are no meta blocks + } + if (metaBlockIndexReader == null) { + throw new IOException("Meta index not loaded"); + } + + byte[] mbname = Bytes.toBytes(metaBlockName); + int block = metaBlockIndexReader.rootBlockContainingKey(mbname, + 0, mbname.length); + if (block == -1) + return null; + long blockSize = metaBlockIndexReader.getRootBlockDataSize(block); + + // Per meta key from any given file, synchronize reads for said block. This + // is OK to do for meta blocks because the meta block index is always + // single-level. + synchronized (metaBlockIndexReader.getRootBlockKey(block)) { + // Check cache for block. If found return. + long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); + BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset); + + cacheBlock &= cacheConf.shouldCacheDataOnRead(); + if (cacheConf.isBlockCacheEnabled()) { + HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true, + BlockType.META, null); + if (cachedBlock != null) { + assert cachedBlock.isUnpacked() : "Packed block leak."; + // Return a distinct 'shallow copy' of the block, + // so pos does not get messed by the scanner + return cachedBlock.getBufferWithoutHeader(); + } + // Cache Miss, please load. + } + + HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, + blockSize, -1, true).unpack(hfileContext, fsBlockReader); + + // Cache the block + if (cacheBlock) { + cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock, + cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); + } + + return metaBlock.getBufferWithoutHeader(); + } + } + + /** + * Read in a file block of the given {@link BlockType} and + * {@link DataBlockEncoding}. Unpacks the block as necessary. + * @param dataBlockOffset offset to read. + * @param onDiskBlockSize size of the block + * @param cacheBlock + * @param pread Use positional read instead of seek+read (positional is + * better doing random reads whereas seek+read is better scanning). + * @param isCompaction is this block being read as part of a compaction + * @param expectedBlockType the block type we are expecting to read with this + * read operation, or null to read whatever block type is available + * and avoid checking (that might reduce caching efficiency of + * encoded data blocks) + * @param expectedDataBlockEncoding the data block encoding the caller is + * expecting data blocks to be in, or null to not perform this + * check and return the block irrespective of the encoding. This + * check only applies to data blocks and can be set to null when + * the caller is expecting to read a non-data block and has set + * expectedBlockType accordingly. + * @return Block wrapped in a ByteBuffer. + * @throws IOException + */ + @Override + public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, + final boolean cacheBlock, boolean pread, final boolean isCompaction, + boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding) + throws IOException { + if (dataBlockIndexReader == null) { + throw new IOException("Block index not loaded"); + } + if (dataBlockOffset < 0 + || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) { + throw new IOException("Requested block is out of range: " + + dataBlockOffset + ", lastDataBlockOffset: " + + trailer.getLastDataBlockOffset()); + } + // For any given block from any given file, synchronize reads for said + // block. + // Without a cache, this synchronizing is needless overhead, but really + // the other choice is to duplicate work (which the cache would prevent you + // from doing). + + BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset); + + boolean useLock = false; + IdLock.Entry lockEntry = null; + TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock"); + try { + while (true) { + if (useLock) { + lockEntry = offsetLock.getLockEntry(dataBlockOffset); + } + + // Check cache for block. If found return. + if (cacheConf.isBlockCacheEnabled()) { + // Try and get the block from the block cache. If the useLock variable is true then this + // is the second time through the loop and it should not be counted as a block cache miss. + HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, + updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); + if (cachedBlock != null) { + assert cachedBlock.isUnpacked() : "Packed block leak."; + if (cachedBlock.getBlockType().isData()) { + if (updateCacheMetrics) { + HFile.dataBlockReadCnt.incrementAndGet(); + } + // Validate encoding type for data blocks. We include encoding + // type in the cache key, and we expect it to match on a cache hit. + if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { + throw new IOException("Cached block under key " + cacheKey + " " + + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " + + dataBlockEncoder.getDataBlockEncoding() + ")"); + } + } + return cachedBlock; + } + // Carry on, please load. + } + if (!useLock) { + // check cache again with lock + useLock = true; + continue; + } + if (Trace.isTracing()) { + traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); + } + // Load block from filesystem. + HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, + pread); + validateBlockType(hfileBlock, expectedBlockType); + HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); + BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); + + // Cache the block if necessary + if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + cacheConf.getBlockCache().cacheBlock(cacheKey, + cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked, + cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); + } + + if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { + HFile.dataBlockReadCnt.incrementAndGet(); + } + + return unpacked; + } + } finally { + traceScope.close(); + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } + } + } + + @Override + public boolean hasMVCCInfo() { + return includesMemstoreTS && decodeMemstoreTS; + } + + /** + * Compares the actual type of a block retrieved from cache or disk with its + * expected type and throws an exception in case of a mismatch. Expected + * block type of {@link BlockType#DATA} is considered to match the actual + * block type [@link {@link BlockType#ENCODED_DATA} as well. + * @param block a block retrieved from cache or disk + * @param expectedBlockType the expected block type, or null to skip the + * check + */ + private void validateBlockType(HFileBlock block, + BlockType expectedBlockType) throws IOException { + if (expectedBlockType == null) { + return; + } + BlockType actualBlockType = block.getBlockType(); + if (expectedBlockType.isData() && actualBlockType.isData()) { + // We consider DATA to match ENCODED_DATA for the purpose of this + // verification. + return; + } + if (actualBlockType != expectedBlockType) { + throw new IOException("Expected block type " + expectedBlockType + ", " + + "but got " + actualBlockType + ": " + block); + } + } + + /** + * @return Last key in the file. May be null if file has no entries. Note that + * this is not the last row key, but rather the byte form of the last + * KeyValue. + */ + @Override + public byte[] getLastKey() { + return dataBlockIndexReader.isEmpty() ? null : lastKey; + } + + /** + * @return Midkey for this file. We work with block boundaries only so + * returned midkey is an approximation only. + * @throws IOException + */ + @Override + public byte[] midkey() throws IOException { + return dataBlockIndexReader.midkey(); + } + + @Override + public void close() throws IOException { + close(cacheConf.shouldEvictOnClose()); + } + + public void close(boolean evictOnClose) throws IOException { + PrefetchExecutor.cancel(path); + if (evictOnClose && cacheConf.isBlockCacheEnabled()) { + int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name); + if (LOG.isTraceEnabled()) { + LOG.trace("On close, file=" + name + " evicted=" + numEvicted + + " block(s)"); + } + } + fsBlockReader.closeStreams(); + } + + public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) { + return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction); + } + + /** For testing */ + HFileBlock.FSReader getUncachedBlockReader() { + return fsBlockReader; + } + + /** + * Scanner that operates on encoded data blocks. + */ + protected static class EncodedScanner extends HFileScannerImpl { + private final HFileBlockDecodingContext decodingCtx; + private final DataBlockEncoder.EncodedSeeker seeker; + private final DataBlockEncoder dataBlockEncoder; + protected final HFileContext meta; + + public EncodedScanner(HFile.Reader reader, boolean cacheBlocks, + boolean pread, boolean isCompaction, HFileContext meta) { + super(reader, cacheBlocks, pread, isCompaction); + DataBlockEncoding encoding = reader.getDataBlockEncoding(); + dataBlockEncoder = encoding.getEncoder(); + decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta); + seeker = dataBlockEncoder.createSeeker( + reader.getComparator(), decodingCtx); + this.meta = meta; + } + + @Override + public boolean isSeeked(){ + return this.block != null; + } + + /** + * Updates the current block to be the given {@link HFileBlock}. Seeks to + * the the first key/value pair. + * + * @param newBlock the block to make current + * @throws CorruptHFileException + */ + private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { + block = newBlock; + + // sanity checks + if (block.getBlockType() != BlockType.ENCODED_DATA) { + throw new IllegalStateException( + "EncodedScanner works only on encoded data blocks"); + } + short dataBlockEncoderId = block.getDataBlockEncodingId(); + if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { + String encoderCls = dataBlockEncoder.getClass().getName(); + throw new CorruptHFileException("Encoder " + encoderCls + + " doesn't support data block encoding " + + DataBlockEncoding.getNameFromId(dataBlockEncoderId)); + } + + seeker.setCurrentBuffer(getEncodedBuffer(newBlock)); + blockFetches++; + + // Reset the next indexed key + this.nextIndexedKey = null; + } + + private ByteBuffer getEncodedBuffer(HFileBlock newBlock) { + ByteBuffer origBlock = newBlock.getBufferReadOnly(); + ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(), + origBlock.arrayOffset() + newBlock.headerSize() + + DataBlockEncoding.ID_SIZE, + newBlock.getUncompressedSizeWithoutHeader() - + DataBlockEncoding.ID_SIZE).slice(); + return encodedBlock; + } + + @Override + public boolean seekTo() throws IOException { + if (reader == null) { + return false; + } + + if (reader.getTrailer().getEntryCount() == 0) { + // No data blocks. + return false; + } + + long firstDataBlockOffset = + reader.getTrailer().getFirstDataBlockOffset(); + if (block != null && block.getOffset() == firstDataBlockOffset) { + seeker.rewind(); + return true; + } + + block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, + isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + if (block.getOffset() < 0) { + throw new IOException("Invalid block offset: " + block.getOffset()); + } + updateCurrentBlock(block); + return true; + } + + @Override + public boolean next() throws IOException { + boolean isValid = seeker.next(); + if (!isValid) { + block = readNextDataBlock(); + isValid = block != null; + if (isValid) { + updateCurrentBlock(block); + } + } + return isValid; + } + + @Override + public ByteBuffer getKey() { + assertValidSeek(); + return seeker.getKeyDeepCopy(); + } + + public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { + return seeker.compareKey(comparator, key, offset, length); + } + + @Override + public ByteBuffer getValue() { + assertValidSeek(); + return seeker.getValueShallowCopy(); + } + + @Override + public Cell getKeyValue() { + if (block == null) { + return null; + } + return seeker.getKeyValue(); + } + + @Override + public String getKeyString() { + ByteBuffer keyBuffer = getKey(); + return Bytes.toStringBinary(keyBuffer.array(), + keyBuffer.arrayOffset(), keyBuffer.limit()); + } + + @Override + public String getValueString() { + ByteBuffer valueBuffer = getValue(); + return Bytes.toStringBinary(valueBuffer.array(), + valueBuffer.arrayOffset(), valueBuffer.limit()); + } + + private void assertValidSeek() { + if (block == null) { + throw new NotSeekedException(); + } + } + + protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) { + return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock)); + } + + protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, byte[] nextIndexedKey, + boolean rewind, Cell key, boolean seekBefore) throws IOException { + if (block == null || block.getOffset() != seekToBlock.getOffset()) { + updateCurrentBlock(seekToBlock); + } else if (rewind) { + seeker.rewind(); + } + this.nextIndexedKey = nextIndexedKey; + return seeker.seekToKeyInBlock(key, seekBefore); + } + + public int compareKey(KVComparator comparator, Cell key) { + return seeker.compareKey(comparator, key); + } + } + + /** + * Returns a buffer with the Bloom filter metadata. The caller takes + * ownership of the buffer. + */ + @Override + public DataInput getGeneralBloomFilterMetadata() throws IOException { + return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META); + } + + @Override + public DataInput getDeleteBloomFilterMetadata() throws IOException { + return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META); + } + + private DataInput getBloomFilterMetadata(BlockType blockType) + throws IOException { + if (blockType != BlockType.GENERAL_BLOOM_META && + blockType != BlockType.DELETE_FAMILY_BLOOM_META) { + throw new RuntimeException("Block Type: " + blockType.toString() + + " is not supported") ; + } + + for (HFileBlock b : loadOnOpenBlocks) + if (b.getBlockType() == blockType) + return b.getByteStream(); + return null; + } + + public boolean isFileInfoLoaded() { + return true; // We load file info in constructor in version 2. + } + + /** + * Validates that the minor version is within acceptable limits. + * Otherwise throws an Runtime exception + */ + private void validateMinorVersion(Path path, int minorVersion) { + if (minorVersion < MIN_MINOR_VERSION || + minorVersion > MAX_MINOR_VERSION) { + String msg = "Minor version for path " + path + + " is expected to be between " + + MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION + + " but is found to be " + minorVersion; + LOG.error(msg); + throw new RuntimeException(msg); + } + } + + @Override + public HFileContext getFileContext() { + return hfileContext; + } + + /** + * Returns false if block prefetching was requested for this file and has + * not completed, true otherwise + */ + @VisibleForTesting + boolean prefetchComplete() { + return PrefetchExecutor.isCompleted(path); + } + + /** + * Opens a HFile. You must load the index before you can use it by calling + * {@link #loadFileInfo()}. + * @param path + * Path to HFile. + * @param trailer + * File trailer. + * @param fsdis + * input stream. + * @param fileSize + * Length of the stream. + * @param cacheConf + * Cache configuration. + * @param hfs + * The file system. + * @param conf + * Configuration + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public HFileReaderImpl(final Path path, FixedFileTrailer trailer, + final FSDataInputStreamWrapper fsdis, + final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs, + final Configuration conf) + throws IOException { + + this.trailer = trailer; + this.compressAlgo = trailer.getCompressionCodec(); + this.cacheConf = cacheConf; + this.fileSize = fileSize; + this.path = path; + this.name = path.getName(); + this.conf = conf; + + + this.conf = conf; + trailer.expectMajorVersion(getMajorVersion()); + validateMinorVersion(path, trailer.getMinorVersion()); + this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer); + this.fsBlockReader = + new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext); + + // Comparator class name is stored in the trailer in version 2. + comparator = trailer.createComparator(); + dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, + trailer.getNumDataIndexLevels(), this); + metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( + KeyValue.RAW_COMPARATOR, 1); + + // Parse load-on-open data. + + HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange( + trailer.getLoadOnOpenDataOffset(), + fileSize - trailer.getTrailerSize()); + + // Data index. We also read statistics about the block index written after + // the root level. + dataBlockIndexReader.readMultiLevelIndexRoot( + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), + trailer.getDataIndexCount()); + + // Meta index. + metaBlockIndexReader.readRootIndex( + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), + trailer.getMetaIndexCount()); + + // File info + fileInfo = new FileInfo(); + fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); + lastKey = fileInfo.get(FileInfo.LASTKEY); + avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); + avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); + byte [] keyValueFormatVersion = + fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); + includesMemstoreTS = keyValueFormatVersion != null && + Bytes.toInt(keyValueFormatVersion) == + HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE; + fsBlockReader.setIncludesMemstoreTS(includesMemstoreTS); + if (includesMemstoreTS) { + decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0; + } + + // Read data block encoding algorithm name from file info. + dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo); + fsBlockReader.setDataBlockEncoder(dataBlockEncoder); + + // Store all other load-on-open blocks for further consumption. + HFileBlock b; + while ((b = blockIter.nextBlock()) != null) { + loadOnOpenBlocks.add(b); + } + + // Prefetch file blocks upon open if requested + if (cacheConf.shouldPrefetchOnOpen()) { + PrefetchExecutor.request(path, new Runnable() { + public void run() { + try { + long offset = 0; + long end = fileSize - getTrailer().getTrailerSize(); + HFileBlock prevBlock = null; + while (offset < end) { + if (Thread.interrupted()) { + break; + } + long onDiskSize = -1; + if (prevBlock != null) { + onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); + } + HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, + null, null); + prevBlock = block; + offset += block.getOnDiskSizeWithHeader(); + } + } catch (IOException e) { + // IOExceptions are probably due to region closes (relocation, etc.) + if (LOG.isTraceEnabled()) { + LOG.trace("Exception encountered while prefetching " + path + ":", e); + } + } catch (Exception e) { + // Other exceptions are interesting + LOG.warn("Exception encountered while prefetching " + path + ":", e); + } finally { + PrefetchExecutor.complete(path); + } + } + }); + } + + byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN); + // max tag length is not present in the HFile means tags were not at all written to file. + if (tmp != null) { + hfileContext.setIncludesTags(true); + tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED); + if (tmp != null && Bytes.toBoolean(tmp)) { + hfileContext.setCompressTags(true); + } + } + } + + protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize, + HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException { + trailer.expectMajorVersion(3); + HFileContextBuilder builder = new HFileContextBuilder() + .withIncludesMvcc(this.includesMemstoreTS) + .withHBaseCheckSum(true) + .withCompression(this.compressAlgo); + + // Check for any key material available + byte[] keyBytes = trailer.getEncryptionKey(); + if (keyBytes != null) { + Encryption.Context cryptoContext = Encryption.newContext(conf); + Key key; + String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, + User.getCurrent().getShortName()); + try { + // First try the master key + key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes); + } catch (KeyException e) { + // If the current master key fails to unwrap, try the alternate, if + // one is configured + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'"); + } + String alternateKeyName = + conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY); + if (alternateKeyName != null) { + try { + key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes); + } catch (KeyException ex) { + throw new IOException(ex); + } + } else { + throw new IOException(e); + } + } + // Use the algorithm the key wants + Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm()); + if (cipher == null) { + throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available"); + } + cryptoContext.setCipher(cipher); + cryptoContext.setKey(key); + builder.withEncryptionContext(cryptoContext); + } + + HFileContext context = builder.build(); + + if (LOG.isTraceEnabled()) { + LOG.trace("Reader" + (path != null ? " for " + path : "" ) + + " initialized with cacheConf: " + cacheConf + + " comparator: " + comparator.getClass().getSimpleName() + + " fileContext: " + context); + } + + return context; + } + + /** + * Create a Scanner on this file. No seeks or reads are done on creation. Call + * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is + * nothing to clean up in a Scanner. Letting go of your references to the + * scanner is sufficient. NOTE: Do not use this overload of getScanner for + * compactions. See {@link #getScanner(boolean, boolean, boolean)} + * + * @param cacheBlocks True if we should cache blocks read in by this scanner. + * @param pread Use positional read rather than seek+read if true (pread is + * better for random reads, seek+read is better scanning). + * @return Scanner on this file. + */ + @Override + public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { + return getScanner(cacheBlocks, pread, false); + } + + /** + * Create a Scanner on this file. No seeks or reads are done on creation. Call + * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is + * nothing to clean up in a Scanner. Letting go of your references to the + * scanner is sufficient. + * @param cacheBlocks + * True if we should cache blocks read in by this scanner. + * @param pread + * Use positional read rather than seek+read if true (pread is better + * for random reads, seek+read is better scanning). + * @param isCompaction + * is scanner being used for a compaction? + * @return Scanner on this file. + */ + @Override + public HFileScanner getScanner(boolean cacheBlocks, final boolean pread, + final boolean isCompaction) { + if (dataBlockEncoder.useEncodedScanner()) { + return new EncodedScanner(this, cacheBlocks, pread, isCompaction, this.hfileContext); + } + return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction); + } + + public int getMajorVersion() { + return 3; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 1efcd47..35f77f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.io.hfile; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -36,8 +39,14 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.io.hfile.HFile.Writer; +import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.security.EncryptionUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.Writable; @@ -46,7 +55,7 @@ import org.apache.hadoop.io.Writable; * Common functionality needed by all versions of {@link HFile} writers. */ @InterfaceAudience.Private -public abstract class AbstractHFileWriter implements HFile.Writer { +public class HFileWriterImpl implements HFile.Writer { /** The Cell previously appended. Becomes the last cell in the file.*/ protected Cell lastCell = null; @@ -107,7 +116,7 @@ public abstract class AbstractHFileWriter implements HFile.Writer { protected final HFileContext hFileContext; - public AbstractHFileWriter(CacheConfig cacheConf, + public HFileWriterImpl(CacheConfig cacheConf, FSDataOutputStream outputStream, Path path, KVComparator comparator, HFileContext fileContext) { this.outputStream = outputStream; @@ -128,29 +137,6 @@ public abstract class AbstractHFileWriter implements HFile.Writer { } /** - * Add last bits of metadata to file info before it is written out. - */ - protected void finishFileInfo() throws IOException { - if (lastCell != null) { - // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean - // byte buffer. Won't take a tuple. - byte[] lastKey = new byte[lastKeyLength]; - KeyValueUtil.appendKeyTo(lastCell, lastKey, 0); - fileInfo.append(FileInfo.LASTKEY, lastKey, false); - } - - // Average key length. - int avgKeyLen = - entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); - fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); - - // Average value length. - int avgValueLen = - entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); - fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); - } - - /** * Add to the file info. All added key/value pairs can be obtained using * {@link HFile.Reader#loadFileInfo()}. * @@ -229,11 +215,493 @@ public abstract class AbstractHFileWriter implements HFile.Writer { + name + ", compression=" + hFileContext.getCompression().getName(); } + public static Compression.Algorithm compressionByName(String algoName) { + if (algoName == null) + return HFile.DEFAULT_COMPRESSION_ALGORITHM; + return Compression.getCompressionAlgorithmByName(algoName); + } + + /** A helper method to create HFile output streams in constructors */ + protected static FSDataOutputStream createOutputStream(Configuration conf, + FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { + FsPermission perms = FSUtils.getFilePermissions(fs, conf, + HConstants.DATA_FILE_UMASK_KEY); + return FSUtils.create(fs, path, perms, favoredNodes); + } + + static final Log LOG = LogFactory.getLog(HFileWriterV2.class); + + /** Max memstore (mvcc) timestamp in FileInfo */ + public static final byte [] MAX_MEMSTORE_TS_KEY = + Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); + + /** KeyValue version in FileInfo */ + public static final byte [] KEY_VALUE_VERSION = + Bytes.toBytes("KEY_VALUE_VERSION"); + + /** Version for KeyValue which includes memstore timestamp */ + public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; + + /** Inline block writers for multi-level block index and compound Blooms. */ + private List inlineBlockWriters = + new ArrayList(); + + /** Unified version 2 block writer */ + protected HFileBlock.Writer fsBlockWriter; + + private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; + private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; + + /** The offset of the first data block or -1 if the file is empty. */ + private long firstDataBlockOffset = -1; + + /** The offset of the last data block or 0 if the file is empty. */ + protected long lastDataBlockOffset; + + /** The last(stop) Key of the previous data block. */ + private byte[] lastKeyOfPreviousBlock = null; + + /** Additional data items to be written to the "load-on-open" section. */ + private List additionalLoadOnOpenData = + new ArrayList(); + + protected long maxMemstoreTS = 0; + + static class WriterFactoryV2 extends HFile.WriterFactory { + WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { + super(conf, cacheConf); + } + + @Override + public Writer createWriter(FileSystem fs, Path path, + FSDataOutputStream ostream, + KVComparator comparator, HFileContext context) throws IOException { + context.setIncludesTags(false);// HFile V2 does not deal with tags at all! + return new HFileWriterV2(conf, cacheConf, fs, path, ostream, + comparator, context); + } + } + + /** Constructor that takes a path, creates and closes the output stream. */ + public HFileWriterV2(Configuration conf, CacheConfig cacheConf, + FileSystem fs, Path path, FSDataOutputStream ostream, + final KVComparator comparator, final HFileContext context) throws IOException { + super(cacheConf, + ostream == null ? createOutputStream(conf, fs, path, null) : ostream, + path, comparator, context); + finishInit(conf); + } + + /** Additional initialization steps */ + protected void finishInit(final Configuration conf) { + if (fsBlockWriter != null) + throw new IllegalStateException("finishInit called twice"); + + fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext); + + // Data block index writer + boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); + dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, + cacheIndexesOnWrite ? cacheConf : null, + cacheIndexesOnWrite ? name : null); + dataBlockIndexWriter.setMaxChunkSize( + HFileBlockIndex.getMaxChunkSize(conf)); + inlineBlockWriters.add(dataBlockIndexWriter); + + // Meta data block index writer + metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); + if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf); + } + + /** + * At a block boundary, write all the inline blocks and opens new block. + * + * @throws IOException + */ + protected void checkBlockBoundary() throws IOException { + if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) + return; + + finishBlock(); + writeInlineBlocks(false); + newBlock(); + } + + /** Clean up the current data block */ + private void finishBlock() throws IOException { + if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) + return; + + // Update the first data block offset for scanning. + if (firstDataBlockOffset == -1) { + firstDataBlockOffset = outputStream.getPos(); + } + // Update the last data block offset + lastDataBlockOffset = outputStream.getPos(); + fsBlockWriter.writeHeaderAndData(outputStream); + int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader(); + + byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock); + dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + if (cacheConf.shouldCacheDataOnWrite()) { + doCacheOnWrite(lastDataBlockOffset); + } + } + + /** Gives inline block writers an opportunity to contribute blocks. */ + private void writeInlineBlocks(boolean closing) throws IOException { + for (InlineBlockWriter ibw : inlineBlockWriters) { + while (ibw.shouldWriteBlock(closing)) { + long offset = outputStream.getPos(); + boolean cacheThisBlock = ibw.getCacheOnWrite(); + ibw.writeInlineBlock(fsBlockWriter.startWriting( + ibw.getInlineBlockType())); + fsBlockWriter.writeHeaderAndData(outputStream); + ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(), + fsBlockWriter.getUncompressedSizeWithoutHeader()); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + + if (cacheThisBlock) { + doCacheOnWrite(offset); + } + } + } + } + + /** + * Caches the last written HFile block. + * @param offset the offset of the block we want to cache. Used to determine + * the cache key. + */ + private void doCacheOnWrite(long offset) { + HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); + cacheConf.getBlockCache().cacheBlock( + new BlockCacheKey(name, offset), cacheFormatBlock); + } + + /** + * Ready a new block for writing. + * + * @throws IOException + */ + protected void newBlock() throws IOException { + // This is where the next block begins. + fsBlockWriter.startWriting(BlockType.DATA); + firstKeyInBlock = null; + if (lastKeyLength > 0) { + lastKeyOfPreviousBlock = new byte[lastKeyLength]; + KeyValueUtil.appendKeyTo(lastCell, lastKeyOfPreviousBlock, 0); + } + } + + /** + * Add a meta block to the end of the file. Call before close(). Metadata + * blocks are expensive. Fill one with a bunch of serialized data rather than + * do a metadata block per metadata instance. If metadata is small, consider + * adding to file info using {@link #appendFileInfo(byte[], byte[])} + * + * @param metaBlockName + * name of the block + * @param content + * will call readFields to get data later (DO NOT REUSE) + */ + @Override + public void appendMetaBlock(String metaBlockName, Writable content) { + byte[] key = Bytes.toBytes(metaBlockName); + int i; + for (i = 0; i < metaNames.size(); ++i) { + // stop when the current key is greater than our own + byte[] cur = metaNames.get(i); + if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, + key.length) > 0) { + break; + } + } + metaNames.add(i, key); + metaData.add(i, content); + } + + /** + * Add key/value to file. Keys must be added in an order that agrees with the + * Comparator passed on construction. + * + * @param cell Cell to add. Cannot be empty nor null. + * @throws IOException + */ + @Override + public void append(final Cell cell) throws IOException { + int klength = KeyValueUtil.keyLength(cell); + byte[] value = cell.getValueArray(); + int voffset = cell.getValueOffset(); + int vlength = cell.getValueLength(); + // checkKey uses comparator to check we are writing in order. + boolean dupKey = checkKey(cell); + checkValue(value, voffset, vlength); + if (!dupKey) { + checkBlockBoundary(); + } + + if (!fsBlockWriter.isWriting()) + newBlock(); + + fsBlockWriter.write(cell); + + totalKeyLength += klength; + totalValueLength += vlength; + + // Are we the first key in this block? + if (firstKeyInBlock == null) { + // Copy the key for use as first key in block. It is put into file index. + firstKeyInBlock = new byte[klength]; + KeyValueUtil.appendKeyTo(cell, firstKeyInBlock, 0); + } + + lastCell = cell; + lastKeyLength = klength; + entryCount++; + this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); + } + + @Override + public void close() throws IOException { + if (outputStream == null) { + return; + } + // Save data block encoder metadata in the file info. + blockEncoder.saveMetadata(this); + // Write out the end of the data blocks, then write meta data blocks. + // followed by fileinfo, data block index and meta block index. + + finishBlock(); + writeInlineBlocks(true); + + FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); + + // Write out the metadata blocks if any. + if (!metaNames.isEmpty()) { + for (int i = 0; i < metaNames.size(); ++i) { + // store the beginning offset + long offset = outputStream.getPos(); + // write the metadata content + DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META); + metaData.get(i).write(dos); + + fsBlockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + + // Add the new meta block to the meta index. + metaBlockIndexWriter.addEntry(metaNames.get(i), offset, + fsBlockWriter.getOnDiskSizeWithHeader()); + } + } + + // Load-on-open section. + + // Data block index. + // + // In version 2, this section of the file starts with the root level data + // block index. We call a function that writes intermediate-level blocks + // first, then root level, and returns the offset of the root level block + // index. + + long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); + trailer.setLoadOnOpenOffset(rootIndexOffset); + + // Meta block index. + metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting( + BlockType.ROOT_INDEX), "meta"); + fsBlockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + + if (this.hFileContext.isIncludesMvcc()) { + appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); + appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); + } + + // File info + writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO)); + fsBlockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + + // Load-on-open data supplied by higher levels, e.g. Bloom filters. + for (BlockWritable w : additionalLoadOnOpenData){ + fsBlockWriter.writeBlock(w, outputStream); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + } + + // Now finish off the trailer. + trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); + trailer.setUncompressedDataIndexSize( + dataBlockIndexWriter.getTotalUncompressedSize()); + trailer.setFirstDataBlockOffset(firstDataBlockOffset); + trailer.setLastDataBlockOffset(lastDataBlockOffset); + trailer.setComparatorClass(comparator.getClass()); + trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); + + + finishClose(trailer); + + fsBlockWriter.release(); + } + + @Override + public void addInlineBlockWriter(InlineBlockWriter ibw) { + inlineBlockWriters.add(ibw); + } + + @Override + public void addGeneralBloomFilter(final BloomFilterWriter bfw) { + this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); + } + + @Override + public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { + this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); + } + + private void addBloomFilter(final BloomFilterWriter bfw, + final BlockType blockType) { + if (bfw.getKeyCount() <= 0) + return; + + if (blockType != BlockType.GENERAL_BLOOM_META && + blockType != BlockType.DELETE_FAMILY_BLOOM_META) { + throw new RuntimeException("Block Type: " + blockType.toString() + + "is not supported"); + } + additionalLoadOnOpenData.add(new BlockWritable() { + @Override + public BlockType getBlockType() { + return blockType; + } + + @Override + public void writeToBlock(DataOutput out) throws IOException { + bfw.getMetaWriter().write(out); + Writable dataWriter = bfw.getDataWriter(); + if (dataWriter != null) + dataWriter.write(out); + } + }); + } + + protected int getMajorVersion() { + return 2; + } + + protected int getMinorVersion() { + return HFileReaderImpl.MAX_MINOR_VERSION; + } + + @Override + public HFileContext getFileContext() { + return hFileContext; + } + + private static final Log LOG = LogFactory.getLog(HFileWriterV3.class); + + private int maxTagsLength = 0; + + static class WriterFactoryV3 extends HFile.WriterFactory { + WriterFactoryV3(Configuration conf, CacheConfig cacheConf) { + super(conf, cacheConf); + } + + @Override + public Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream, + final KVComparator comparator, HFileContext fileContext) + throws IOException { + return new HFileWriterV3(conf, cacheConf, fs, path, ostream, comparator, fileContext); + } + } + + /** Constructor that takes a path, creates and closes the output stream. */ + public HFileWriterV3(Configuration conf, CacheConfig cacheConf, FileSystem fs, Path path, + FSDataOutputStream ostream, final KVComparator comparator, + final HFileContext fileContext) throws IOException { + super(conf, cacheConf, fs, path, ostream, comparator, fileContext); + if (LOG.isTraceEnabled()) { + LOG.trace("Writer" + (path != null ? " for " + path : "") + + " initialized with cacheConf: " + cacheConf + + " comparator: " + comparator.getClass().getSimpleName() + + " fileContext: " + fileContext); + } + } + /** - * Sets remaining trailer fields, writes the trailer to disk, and optionally - * closes the output stream. + * Add key/value to file. Keys must be added in an order that agrees with the + * Comparator passed on construction. + * + * @param cell + * Cell to add. Cannot be empty nor null. + * @throws IOException */ + @Override + public void append(final Cell cell) throws IOException { + // Currently get the complete arrays + super.append(cell); + int tagsLength = cell.getTagsLength(); + if (tagsLength > this.maxTagsLength) { + this.maxTagsLength = tagsLength; + } + } + + protected void finishFileInfo() throws IOException { + if (lastCell != null) { + // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean + // byte buffer. Won't take a tuple. + byte[] lastKey = new byte[lastKeyLength]; + KeyValueUtil.appendKeyTo(lastCell, lastKey, 0); + fileInfo.append(FileInfo.LASTKEY, lastKey, false); + } + + // Average key length. + int avgKeyLen = + entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); + fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); + + // Average value length. + int avgValueLen = + entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); + fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); + if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) { + // In case of Prefix Tree encoding, we always write tags information into HFiles even if all + // KVs are having no tags. + fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); + } else if (hFileContext.isIncludesTags()) { + // When tags are not being written in this file, MAX_TAGS_LEN is excluded + // from the FileInfo + fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); + boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) + && hFileContext.isCompressTags(); + fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); + } + } + + @Override + protected int getMajorVersion() { + return 3; + } + + @Override + protected int getMinorVersion() { + return HFileReaderImpl.MAX_MINOR_VERSION; + } + + @Override protected void finishClose(FixedFileTrailer trailer) throws IOException { + // Write out encryption metadata before finalizing if we have a valid crypto context + Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); + if (cryptoContext != Encryption.Context.NONE) { + // Wrap the context's key and write it as the encryption metadata, the wrapper includes + // all information needed for decryption + trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(), + cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, + User.getCurrent().getShortName()), + cryptoContext.getKey())); + } + // Now we can finish the close trailer.setMetaIndexCount(metaNames.size()); trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize()); trailer.setEntryCount(entryCount); @@ -246,18 +714,4 @@ public abstract class AbstractHFileWriter implements HFile.Writer { outputStream = null; } } - - public static Compression.Algorithm compressionByName(String algoName) { - if (algoName == null) - return HFile.DEFAULT_COMPRESSION_ALGORITHM; - return Compression.getCompressionAlgorithmByName(algoName); - } - - /** A helper method to create HFile output streams in constructors */ - protected static FSDataOutputStream createOutputStream(Configuration conf, - FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { - FsPermission perms = FSUtils.getFilePermissions(fs, conf, - HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms, favoredNodes); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index f8f9b4d..01df341 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; @@ -127,7 +127,7 @@ public class HFileOutputFormat2 // Invented config. Add to hbase-*.xml if other than default compression. final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); - final Algorithm defaultCompression = AbstractHFileWriter + final Algorithm defaultCompression = HFileWriterImpl .compressionByName(defaultCompressionStr); final boolean compactionExclude = conf.getBoolean( "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); @@ -420,7 +420,7 @@ public class HFileOutputFormat2 Map compressionMap = new TreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry e : stringMap.entrySet()) { - Algorithm algorithm = AbstractHFileWriter.compressionByName + Algorithm algorithm = HFileWriterImpl.compressionByName (e.getValue()); compressionMap.put(e.getKey(), algorithm); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java index 850dc02..09b3ce4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -119,7 +119,7 @@ public class CompressionTest { throws Exception { Configuration conf = HBaseConfiguration.create(); HFileContext context = new HFileContextBuilder() - .withCompression(AbstractHFileWriter.compressionByName(codec)).build(); + .withCompression(HFileWriterImpl.compressionByName(codec)).build(); HFile.Writer writer = HFile.getWriterFactoryNoCache(conf) .withPath(fs, path) .withFileContext(context) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index 4523b2a..1d6ebab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -240,7 +240,7 @@ public class TestHFile extends HBaseTestCase { FSDataOutputStream fout = createFSOutput(ncTFile); HFileContext meta = new HFileContextBuilder() .withBlockSize(minBlockSize) - .withCompression(AbstractHFileWriter.compressionByName(codec)) + .withCompression(HFileWriterImpl.compressionByName(codec)) .build(); Writer writer = HFile.getWriterFactory(conf, cacheConf) .withOutputStream(fout) @@ -331,7 +331,7 @@ public class TestHFile extends HBaseTestCase { Path mFile = new Path(ROOT_DIR, "meta.hfile"); FSDataOutputStream fout = createFSOutput(mFile); HFileContext meta = new HFileContextBuilder() - .withCompression(AbstractHFileWriter.compressionByName(compress)) + .withCompression(HFileWriterImpl.compressionByName(compress)) .withBlockSize(minBlockSize).build(); Writer writer = HFile.getWriterFactory(conf, cacheConf) .withOutputStream(fout) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java index ff12234..acd1a30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFilePerformance.java @@ -175,7 +175,7 @@ public class TestHFilePerformance extends AbstractHBaseTool { if ("HFile".equals(fileType)){ HFileContextBuilder builder = new HFileContextBuilder() - .withCompression(AbstractHFileWriter.compressionByName(codecName)) + .withCompression(HFileWriterImpl.compressionByName(codecName)) .withBlockSize(minBlockSize); if (cipherName != "none") { byte[] cipherKey = new byte[AES.KEY_LENGTH]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java index 76a8200..26adb49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java @@ -130,7 +130,7 @@ public class TestHFileSeek extends TestCase { try { HFileContext context = new HFileContextBuilder() .withBlockSize(options.minBlockSize) - .withCompression(AbstractHFileWriter.compressionByName(options.compress)) + .withCompression(HFileWriterImpl.compressionByName(options.compress)) .build(); Writer writer = HFile.getWriterFactoryNoCache(conf) .withOutputStream(fout)