From 000c1ff24bc9900657def826813ed78af3aace37 Mon Sep 17 00:00:00 2001 From: stack Date: Wed, 1 Apr 2015 14:39:49 -0700 Subject: [PATCH] Author: stack Date: Wed Apr 1 12:10:40 2015 -0700 HBASE-13373 Squash HFileReaderV3 together with HFileReaderV2... Address second lot of Sean Busbey comments, test failures, javadoc warnings and long lines. Also added making check of hfile version bounded rather than open-ended. --- .../hbase/IntegrationTestIngestWithEncryption.java | 8 +- .../hadoop/hbase/io/hfile/AbstractHFileReader.java | 352 ---- .../hadoop/hbase/io/hfile/AbstractHFileWriter.java | 266 --- .../hadoop/hbase/io/hfile/FixedFileTrailer.java | 6 +- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 42 +- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 110 +- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 4 +- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 1688 ++++++++++++++++++++ .../hadoop/hbase/io/hfile/HFileReaderV2.java | 1323 --------------- .../hadoop/hbase/io/hfile/HFileReaderV3.java | 358 ----- .../hadoop/hbase/io/hfile/HFileWriterFactory.java | 40 + .../hadoop/hbase/io/hfile/HFileWriterImpl.java | 641 ++++++++ .../hadoop/hbase/io/hfile/HFileWriterV2.java | 424 ----- .../hadoop/hbase/io/hfile/HFileWriterV3.java | 136 -- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 6 +- .../hadoop/hbase/regionserver/HRegionServer.java | 2 + .../apache/hadoop/hbase/regionserver/Region.java | 22 +- .../hadoop/hbase/regionserver/StoreFile.java | 3 +- .../hbase/regionserver/compactions/Compactor.java | 4 +- .../apache/hadoop/hbase/util/CompressionTest.java | 4 +- .../hadoop/hbase/HFilePerformanceEvaluation.java | 4 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 18 +- .../hbase/io/hfile/TestFixedFileTrailer.java | 8 +- .../io/hfile/TestForceCacheImportantBlocks.java | 2 - .../apache/hadoop/hbase/io/hfile/TestHFile.java | 4 +- .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 2 +- .../TestHFileInlineToRootChunkConversion.java | 7 +- .../hadoop/hbase/io/hfile/TestHFileSeek.java | 2 +- .../hadoop/hbase/io/hfile/TestHFileWriterV2.java | 9 +- .../hadoop/hbase/io/hfile/TestHFileWriterV3.java | 9 +- .../io/hfile/TestLazyDataBlockDecompression.java | 5 +- .../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 6 +- .../apache/hadoop/hbase/io/hfile/TestReseekTo.java | 2 +- .../apache/hadoop/hbase/io/hfile/TestSeekTo.java | 11 +- .../hbase/regionserver/DataBlockEncodingTool.java | 7 +- .../regionserver/TestCacheOnWriteInSchema.java | 3 +- 36 files changed, 2520 insertions(+), 3018 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java index ff8ed19..cd1b0b6 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java @@ -24,8 +24,8 @@ import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileReaderV3; -import org.apache.hadoop.hbase.io.hfile.HFileWriterV3; +import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader; @@ -46,8 +46,8 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest { static { // These log level changes are only useful when running on a localhost // cluster. - Logger.getLogger(HFileReaderV3.class).setLevel(Level.TRACE); - Logger.getLogger(HFileWriterV3.class).setLevel(Level.TRACE); + Logger.getLogger(HFileReaderImpl.class).setLevel(Level.TRACE); + Logger.getLogger(HFileWriterImpl.class).setLevel(Level.TRACE); Logger.getLogger(SecureProtobufLogReader.class).setLevel(Level.TRACE); Logger.getLogger(SecureProtobufLogWriter.class).setLevel(Level.TRACE); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java deleted file mode 100644 index 8c1e7b9..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java +++ /dev/null @@ -1,352 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KVComparator; -import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; - -/** - * Common functionality needed by all versions of {@link HFile} readers. - */ -@InterfaceAudience.Private -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") -public abstract class AbstractHFileReader - implements HFile.Reader, Configurable { - /** Stream to read from. Does checksum verifications in file system */ - protected FSDataInputStream istream; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD - - /** The file system stream of the underlying {@link HFile} that - * does not do checksum verification in the file system */ - protected FSDataInputStream istreamNoFsChecksum; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD - - /** Data block index reader keeping the root data index in memory */ - protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader; - - /** Meta block index reader -- always single level */ - protected HFileBlockIndex.BlockIndexReader metaBlockIndexReader; - - protected final FixedFileTrailer trailer; - - /** Filled when we read in the trailer. */ - protected final Compression.Algorithm compressAlgo; - - /** - * What kind of data block encoding should be used while reading, writing, - * and handling cache. - */ - protected HFileDataBlockEncoder dataBlockEncoder = - NoOpDataBlockEncoder.INSTANCE; - - /** Last key in the file. Filled in when we read in the file info */ - protected byte [] lastKey = null; - - /** Average key length read from file info */ - protected int avgKeyLen = -1; - - /** Average value length read from file info */ - protected int avgValueLen = -1; - - /** Key comparator */ - protected KVComparator comparator = new KVComparator(); - - /** Size of this file. */ - protected final long fileSize; - - /** Block cache configuration. */ - protected final CacheConfig cacheConf; - - /** Path of file */ - protected final Path path; - - /** File name to be used for block names */ - protected final String name; - - protected FileInfo fileInfo; - - /** The filesystem used for accesing data */ - protected HFileSystem hfs; - - protected Configuration conf; - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") - protected AbstractHFileReader(Path path, FixedFileTrailer trailer, - final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs, - final Configuration conf) { - this.trailer = trailer; - this.compressAlgo = trailer.getCompressionCodec(); - this.cacheConf = cacheConf; - this.fileSize = fileSize; - this.path = path; - this.name = path.getName(); - this.hfs = hfs; // URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD - this.conf = conf; - } - - @SuppressWarnings("serial") - public static class BlockIndexNotLoadedException - extends IllegalStateException { - public BlockIndexNotLoadedException() { - // Add a message in case anyone relies on it as opposed to class name. - super("Block index not loaded"); - } - } - - protected String toStringFirstKey() { - return KeyValue.keyToString(getFirstKey()); - } - - protected String toStringLastKey() { - return KeyValue.keyToString(getLastKey()); - } - - public abstract boolean isFileInfoLoaded(); - - @Override - public String toString() { - return "reader=" + path.toString() + - (!isFileInfoLoaded()? "": - ", compression=" + compressAlgo.getName() + - ", cacheConf=" + cacheConf + - ", firstKey=" + toStringFirstKey() + - ", lastKey=" + toStringLastKey()) + - ", avgKeyLen=" + avgKeyLen + - ", avgValueLen=" + avgValueLen + - ", entries=" + trailer.getEntryCount() + - ", length=" + fileSize; - } - - @Override - public long length() { - return fileSize; - } - - /** - * Create a Scanner on this file. No seeks or reads are done on creation. Call - * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is - * nothing to clean up in a Scanner. Letting go of your references to the - * scanner is sufficient. NOTE: Do not use this overload of getScanner for - * compactions. - * - * @param cacheBlocks True if we should cache blocks read in by this scanner. - * @param pread Use positional read rather than seek+read if true (pread is - * better for random reads, seek+read is better scanning). - * @return Scanner on this file. - */ - @Override - public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { - return getScanner(cacheBlocks, pread, false); - } - - /** - * @return the first key in the file. May be null if file has no entries. Note - * that this is not the first row key, but rather the byte form of the - * first KeyValue. - */ - @Override - public byte [] getFirstKey() { - if (dataBlockIndexReader == null) { - throw new BlockIndexNotLoadedException(); - } - return dataBlockIndexReader.isEmpty() ? null - : dataBlockIndexReader.getRootBlockKey(0); - } - - /** - * TODO left from {@link HFile} version 1: move this to StoreFile after Ryan's - * patch goes in to eliminate {@link KeyValue} here. - * - * @return the first row key, or null if the file is empty. - */ - @Override - public byte[] getFirstRowKey() { - byte[] firstKey = getFirstKey(); - if (firstKey == null) - return null; - return KeyValue.createKeyValueFromKey(firstKey).getRow(); - } - - /** - * TODO left from {@link HFile} version 1: move this to StoreFile after - * Ryan's patch goes in to eliminate {@link KeyValue} here. - * - * @return the last row key, or null if the file is empty. - */ - @Override - public byte[] getLastRowKey() { - byte[] lastKey = getLastKey(); - if (lastKey == null) - return null; - return KeyValue.createKeyValueFromKey(lastKey).getRow(); - } - - /** @return number of KV entries in this HFile */ - @Override - public long getEntries() { - return trailer.getEntryCount(); - } - - /** @return comparator */ - @Override - public KVComparator getComparator() { - return comparator; - } - - /** @return compression algorithm */ - @Override - public Compression.Algorithm getCompressionAlgorithm() { - return compressAlgo; - } - - /** - * @return the total heap size of data and meta block indexes in bytes. Does - * not take into account non-root blocks of a multilevel data index. - */ - public long indexSize() { - return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0) - + ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize() - : 0); - } - - @Override - public String getName() { - return name; - } - - @Override - public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() { - return dataBlockIndexReader; - } - - @Override - public FixedFileTrailer getTrailer() { - return trailer; - } - - @Override - public FileInfo loadFileInfo() throws IOException { - return fileInfo; - } - - /** - * An exception thrown when an operation requiring a scanner to be seeked - * is invoked on a scanner that is not seeked. - */ - @SuppressWarnings("serial") - public static class NotSeekedException extends IllegalStateException { - public NotSeekedException() { - super("Not seeked to a key/value"); - } - } - - protected static abstract class Scanner implements HFileScanner { - protected ByteBuffer blockBuffer; - - protected boolean cacheBlocks; - protected final boolean pread; - protected final boolean isCompaction; - - protected int currKeyLen; - protected int currValueLen; - protected int currMemstoreTSLen; - protected long currMemstoreTS; - - protected int blockFetches; - - protected final HFile.Reader reader; - - public Scanner(final HFile.Reader reader, final boolean cacheBlocks, - final boolean pread, final boolean isCompaction) { - this.reader = reader; - this.cacheBlocks = cacheBlocks; - this.pread = pread; - this.isCompaction = isCompaction; - } - - @Override - public boolean isSeeked(){ - return blockBuffer != null; - } - - @Override - public String toString() { - return "HFileScanner for reader " + String.valueOf(getReader()); - } - - protected void assertSeeked() { - if (!isSeeked()) - throw new NotSeekedException(); - } - - @Override - public int seekTo(byte[] key) throws IOException { - return seekTo(key, 0, key.length); - } - - @Override - public boolean seekBefore(byte[] key) throws IOException { - return seekBefore(key, 0, key.length); - } - - @Override - public int reseekTo(byte[] key) throws IOException { - return reseekTo(key, 0, key.length); - } - - @Override - public HFile.Reader getReader() { - return reader; - } - } - - /** For testing */ - abstract HFileBlock.FSReader getUncachedBlockReader(); - - public Path getPath() { - return path; - } - - @Override - public DataBlockEncoding getDataBlockEncoding() { - return dataBlockEncoder.getDataBlockEncoding(); - } - - public abstract int getMajorVersion(); - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java deleted file mode 100644 index 52491e6..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.io.hfile; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KVComparator; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.io.Writable; - -/** - * Common functionality needed by all versions of {@link HFile} writers. - */ -@InterfaceAudience.Private -public abstract class AbstractHFileWriter implements HFile.Writer { - - /** The Cell previously appended. Becomes the last cell in the file.*/ - protected Cell lastCell = null; - - /** FileSystem stream to write into. */ - protected FSDataOutputStream outputStream; - - /** True if we opened the outputStream (and so will close it). */ - protected final boolean closeOutputStream; - - /** A "file info" block: a key-value map of file-wide metadata. */ - protected FileInfo fileInfo = new HFile.FileInfo(); - - /** Total # of key/value entries, i.e. how many times add() was called. */ - protected long entryCount = 0; - - /** Used for calculating the average key length. */ - protected long totalKeyLength = 0; - - /** Used for calculating the average value length. */ - protected long totalValueLength = 0; - - /** Total uncompressed bytes, maybe calculate a compression ratio later. */ - protected long totalUncompressedBytes = 0; - - /** Key comparator. Used to ensure we write in order. */ - protected final KVComparator comparator; - - /** Meta block names. */ - protected List metaNames = new ArrayList(); - - /** {@link Writable}s representing meta block data. */ - protected List metaData = new ArrayList(); - - /** - * First cell in a block. - * This reference should be short-lived since we write hfiles in a burst. - */ - protected Cell firstCellInBlock = null; - - /** May be null if we were passed a stream. */ - protected final Path path; - - - /** Cache configuration for caching data on write. */ - protected final CacheConfig cacheConf; - - /** - * Name for this object used when logging or in toString. Is either - * the result of a toString on stream or else name of passed file Path. - */ - protected final String name; - - /** - * The data block encoding which will be used. - * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding. - */ - protected final HFileDataBlockEncoder blockEncoder; - - protected final HFileContext hFileContext; - - public AbstractHFileWriter(CacheConfig cacheConf, - FSDataOutputStream outputStream, Path path, - KVComparator comparator, HFileContext fileContext) { - this.outputStream = outputStream; - this.path = path; - this.name = path != null ? path.getName() : outputStream.toString(); - this.hFileContext = fileContext; - DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); - if (encoding != DataBlockEncoding.NONE) { - this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); - } else { - this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; - } - this.comparator = comparator != null ? comparator - : KeyValue.COMPARATOR; - - closeOutputStream = path != null; - this.cacheConf = cacheConf; - } - - /** - * Add last bits of metadata to file info before it is written out. - */ - protected void finishFileInfo() throws IOException { - if (lastCell != null) { - // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean - // byte buffer. Won't take a tuple. - byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); - fileInfo.append(FileInfo.LASTKEY, lastKey, false); - } - - // Average key length. - int avgKeyLen = - entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); - fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); - - // Average value length. - int avgValueLen = - entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); - fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); - - fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), - false); - } - - /** - * Add to the file info. All added key/value pairs can be obtained using - * {@link HFile.Reader#loadFileInfo()}. - * - * @param k Key - * @param v Value - * @throws IOException in case the key or the value are invalid - */ - @Override - public void appendFileInfo(final byte[] k, final byte[] v) - throws IOException { - fileInfo.append(k, v, true); - } - - /** - * Sets the file info offset in the trailer, finishes up populating fields in - * the file info, and writes the file info into the given data output. The - * reason the data output is not always {@link #outputStream} is that we store - * file info as a block in version 2. - * - * @param trailer fixed file trailer - * @param out the data output to write the file info to - * @throws IOException - */ - protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) - throws IOException { - trailer.setFileInfoOffset(outputStream.getPos()); - finishFileInfo(); - fileInfo.write(out); - } - - /** - * Checks that the given Cell's key does not violate the key order. - * - * @param cell Cell whose key to check. - * @return true if the key is duplicate - * @throws IOException if the key or the key order is wrong - */ - protected boolean checkKey(final Cell cell) throws IOException { - boolean isDuplicateKey = false; - - if (cell == null) { - throw new IOException("Key cannot be null or empty"); - } - if (lastCell != null) { - int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell); - - if (keyComp > 0) { - throw new IOException("Added a key not lexically larger than" - + " previous. Current cell = " + cell + ", lastCell = " + lastCell); - } else if (keyComp == 0) { - isDuplicateKey = true; - } - } - return isDuplicateKey; - } - - /** Checks the given value for validity. */ - protected void checkValue(final byte[] value, final int offset, - final int length) throws IOException { - if (value == null) { - throw new IOException("Value cannot be null"); - } - } - - /** - * @return Path or null if we were passed a stream rather than a Path. - */ - @Override - public Path getPath() { - return path; - } - - @Override - public String toString() { - return "writer=" + (path != null ? path.toString() : null) + ", name=" - + name + ", compression=" + hFileContext.getCompression().getName(); - } - - /** - * Sets remaining trailer fields, writes the trailer to disk, and optionally - * closes the output stream. - */ - protected void finishClose(FixedFileTrailer trailer) throws IOException { - trailer.setMetaIndexCount(metaNames.size()); - trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize()); - trailer.setEntryCount(entryCount); - trailer.setCompressionCodec(hFileContext.getCompression()); - - trailer.serialize(outputStream); - - if (closeOutputStream) { - outputStream.close(); - outputStream = null; - } - } - - public static Compression.Algorithm compressionByName(String algoName) { - if (algoName == null) - return HFile.DEFAULT_COMPRESSION_ALGORITHM; - return Compression.getCompressionAlgorithmByName(algoName); - } - - /** A helper method to create HFile output streams in constructors */ - protected static FSDataOutputStream createOutputStream(Configuration conf, - FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { - FsPermission perms = FSUtils.getFilePermissions(fs, conf, - HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms, favoredNodes); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java index 56510f0..3dcfc9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java @@ -238,7 +238,7 @@ public class FixedFileTrailer { BlockType.TRAILER.readAndCheck(inputStream); if (majorVersion > 2 - || (majorVersion == 2 && minorVersion >= HFileReaderV2.PBUF_TRAILER_MINOR_VERSION)) { + || (majorVersion == 2 && minorVersion >= HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION)) { deserializeFromPB(inputStream); } else { deserializeFromWritable(inputStream); @@ -611,7 +611,9 @@ public class FixedFileTrailer { } public byte[] getEncryptionKey() { - expectAtLeastMajorVersion(3); + // This is a v3 feature but if reading a v2 file the encryptionKey will just be null which + // if fine for this feature. + expectAtLeastMajorVersion(2); return encryptionKey; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 610fe7f..09233a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.Writable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -197,6 +198,8 @@ public class HFile { /** API required to write an {@link HFile} */ public interface Writer extends Closeable { + /** Max memstore (mvcc) timestamp in FileInfo */ + public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); /** Add an element to the file info map. */ void appendFileInfo(byte[] key, byte[] value) throws IOException; @@ -294,7 +297,7 @@ public class HFile { "filesystem/path or path"); } if (path != null) { - ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes); + ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes); } return createWriter(fs, path, ostream, comparator, fileContext); @@ -333,9 +336,12 @@ public class HFile { int version = getFormatVersion(conf); switch (version) { case 2: - return new HFileWriterV2.WriterFactoryV2(conf, cacheConf); + throw new IllegalArgumentException("This should never happen. " + + "Did you change hfile.format.version to read v2? This version of the software writes v3" + + " hfiles only (but it can read v2 files without having to update hfile.format.version " + + "in hbase-site.xml)"); case 3: - return new HFileWriterV3.WriterFactoryV3(conf, cacheConf); + return new HFileWriterFactory(conf, cacheConf); default: throw new IllegalArgumentException("Cannot create writer for HFile " + "format version " + version); @@ -440,6 +446,18 @@ public class HFile { * Return the file context of the HFile this reader belongs to */ HFileContext getFileContext(); + + boolean shouldIncludeMemstoreTS(); + + boolean isDecodeMemstoreTS(); + + DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction); + + @VisibleForTesting + HFileBlock.FSReader getUncachedBlockReader(); + + @VisibleForTesting + boolean prefetchComplete(); } /** @@ -463,9 +481,10 @@ public class HFile { trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size); switch (trailer.getMajorVersion()) { case 2: - return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf); + LOG.debug("Opening HFile v2 with v3 reader"); + // Fall through. case 3 : - return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf); + return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf); default: throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion()); } @@ -489,6 +508,7 @@ public class HFile { * @return A version specific Hfile Reader * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException */ + @SuppressWarnings("resource") public static Reader createReader(FileSystem fs, Path path, FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf) throws IOException { @@ -854,6 +874,18 @@ public class HFile { } } + + public static void checkHFileVersion(final Configuration c) { + int version = c.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION); + if (version < MAX_FORMAT_VERSION || version > MAX_FORMAT_VERSION) { + throw new IllegalArgumentException("The setting for " + FORMAT_VERSION_KEY + + " (in your hbase-*.xml files) is " + version + " which does not match " + + MAX_FORMAT_VERSION + + "; are you running with a configuration from an older or newer hbase install (an " + + "incompatible hbase-default.xml or hbase-site.xml on your CLASSPATH)?"); + } + } + public static void main(String[] args) throws Exception { // delegate to preserve old behavior HFilePrettyPrinter.main(args); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 4115941..a64bb94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1256,13 +1256,40 @@ public class HFileBlock implements Cacheable { /** Get the default decoder for blocks from this file. */ HFileBlockDecodingContext getDefaultBlockDecodingContext(); + + void setIncludesMemstoreTS(boolean includesMemstoreTS); + void setDataBlockEncoder(HFileDataBlockEncoder encoder); } /** - * A common implementation of some methods of {@link FSReader} and some - * tools for implementing HFile format version-specific block readers. + * We always prefetch the header of the next block, so that we know its + * on-disk size in advance and can read it in one operation. */ - private abstract static class AbstractFSReader implements FSReader { + private static class PrefetchedHeader { + long offset = -1; + byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; + final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); + } + + /** Reads version 2 blocks from the filesystem. */ + static class FSReaderImpl implements FSReader { + /** The file system stream of the underlying {@link HFile} that + * does or doesn't do checksum validations in the filesystem */ + protected FSDataInputStreamWrapper streamWrapper; + + private HFileBlockDecodingContext encodedBlockDecodingCtx; + + /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ + private final HFileBlockDefaultDecodingContext defaultDecodingCtx; + + private ThreadLocal prefetchedHeaderForThread = + new ThreadLocal() { + @Override + public PrefetchedHeader initialValue() { + return new PrefetchedHeader(); + } + }; + /** Compression algorithm used by the {@link HFile} */ /** The size of the file we are reading from, or -1 if unknown. */ @@ -1284,18 +1311,31 @@ public class HFileBlock implements Cacheable { protected HFileContext fileContext; - public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext) - throws IOException { + public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, + HFileContext fileContext) throws IOException { this.fileSize = fileSize; this.hfs = hfs; this.path = path; this.fileContext = fileContext; this.hdrSize = headerSize(fileContext.isUseHBaseChecksum()); + + this.streamWrapper = stream; + // Older versions of HBase didn't support checksum. + this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); + defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); + encodedBlockDecodingCtx = defaultDecodingCtx; } - @Override - public BlockIterator blockRange(final long startOffset, - final long endOffset) { + /** + * A constructor that reads files with the latest minor version. + * This is used by unit tests only. + */ + FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext) + throws IOException { + this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext); + } + + public BlockIterator blockRange(final long startOffset, final long endOffset) { final FSReader owner = this; // handle for inner class return new BlockIterator() { private long offset = startOffset; @@ -1392,56 +1432,6 @@ public class HFileBlock implements Cacheable { return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; } - } - - /** - * We always prefetch the header of the next block, so that we know its - * on-disk size in advance and can read it in one operation. - */ - private static class PrefetchedHeader { - long offset = -1; - byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; - final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); - } - - /** Reads version 2 blocks from the filesystem. */ - static class FSReaderImpl extends AbstractFSReader { - /** The file system stream of the underlying {@link HFile} that - * does or doesn't do checksum validations in the filesystem */ - protected FSDataInputStreamWrapper streamWrapper; - - private HFileBlockDecodingContext encodedBlockDecodingCtx; - - /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ - private final HFileBlockDefaultDecodingContext defaultDecodingCtx; - - private ThreadLocal prefetchedHeaderForThread = - new ThreadLocal() { - @Override - public PrefetchedHeader initialValue() { - return new PrefetchedHeader(); - } - }; - - public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, - HFileContext fileContext) throws IOException { - super(fileSize, hfs, path, fileContext); - this.streamWrapper = stream; - // Older versions of HBase didn't support checksum. - this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); - defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); - encodedBlockDecodingCtx = defaultDecodingCtx; - } - - /** - * A constructor that reads files with the latest minor version. - * This is used by unit tests only. - */ - FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext) - throws IOException { - this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext); - } - /** * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as * little memory allocation as possible, using the provided on-disk size. @@ -1682,11 +1672,11 @@ public class HFileBlock implements Cacheable { return b; } - void setIncludesMemstoreTS(boolean includesMemstoreTS) { + public void setIncludesMemstoreTS(boolean includesMemstoreTS) { this.fileContext.setIncludesMvcc(includesMemstoreTS); } - void setDataBlockEncoder(HFileDataBlockEncoder encoder) { + public void setDataBlockEncoder(HFileDataBlockEncoder encoder) { encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 77266df..5b54807 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -55,8 +55,8 @@ import org.apache.hadoop.util.StringUtils; * * Examples of how to use the block index writer can be found in * {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and - * {@link HFileWriterV2}. Examples of how to use the reader can be - * found in {@link HFileReaderV2} and TestHFileBlockIndex. + * {@link HFileWriterImpl}. Examples of how to use the reader can be + * found in {@link HFileWriterImpl} and TestHFileBlockIndex. */ @InterfaceAudience.Private public class HFileBlockIndex { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java new file mode 100644 index 0000000..a007f37 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -0,0 +1,1688 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.Key; +import java.security.KeyException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Cipher; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.security.EncryptionUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.io.WritableUtils; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Implementation that can handle all hfile versions of {@link HFile.Reader}. + */ +@InterfaceAudience.Private +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") +public class HFileReaderImpl implements HFile.Reader, Configurable { + // This class is HFileReaderV3 + HFileReaderV2 + AbstractHFileReader all squashed together into + // one file. Ditto for all the HFileReader.ScannerV? implementations. I was running up against + // the MaxInlineLevel limit because too many tiers involved reading from an hfile. Was also hard + // to navigate the source code when so many classes participating in read. + private static final Log LOG = LogFactory.getLog(HFileReaderImpl.class); + + /** Data block index reader keeping the root data index in memory */ + private HFileBlockIndex.BlockIndexReader dataBlockIndexReader; + + /** Meta block index reader -- always single level */ + private HFileBlockIndex.BlockIndexReader metaBlockIndexReader; + + private final FixedFileTrailer trailer; + + /** Filled when we read in the trailer. */ + private final Compression.Algorithm compressAlgo; + + /** + * What kind of data block encoding should be used while reading, writing, + * and handling cache. + */ + private HFileDataBlockEncoder dataBlockEncoder = NoOpDataBlockEncoder.INSTANCE; + + /** Last key in the file. Filled in when we read in the file info */ + private byte [] lastKey = null; + + /** Average key length read from file info */ + private int avgKeyLen = -1; + + /** Average value length read from file info */ + private int avgValueLen = -1; + + /** Key comparator */ + private KVComparator comparator = new KVComparator(); + + /** Size of this file. */ + private final long fileSize; + + /** Block cache configuration. */ + private final CacheConfig cacheConf; + + /** Path of file */ + private final Path path; + + /** File name to be used for block names */ + private final String name; + + private FileInfo fileInfo; + + private Configuration conf; + + private HFileContext hfileContext; + + /** Filesystem-level block reader. */ + protected HFileBlock.FSReader fsBlockReader; + + /** + * A "sparse lock" implementation allowing to lock on a particular block + * identified by offset. The purpose of this is to avoid two clients loading + * the same block, and have all but one client wait to get the block from the + * cache. + */ + private IdLock offsetLock = new IdLock(); + + /** + * Blocks read from the load-on-open section, excluding data root index, meta + * index, and file info. + */ + private List loadOnOpenBlocks = new ArrayList(); + + /** Minimum minor version supported by this HFile format */ + static final int MIN_MINOR_VERSION = 0; + + /** Maximum minor version supported by this HFile format */ + // We went to version 2 when we moved to pb'ing fileinfo and the trailer on + // the file. This version can read Writables version 1. + static final int MAX_MINOR_VERSION = 3; + + /** + * We can read files whose major version is v2 IFF their minor version is at least 3. + */ + private static final int MIN_V2_MINOR_VERSION_WITH_PB = 3; + + /** Minor versions starting with this number have faked index key */ + static final int MINOR_VERSION_WITH_FAKED_KEY = 3; + + /** + * Opens a HFile. You must load the index before you can use it by calling + * {@link #loadFileInfo()}. + * @param path + * Path to HFile. + * @param trailer + * File trailer. + * @param fsdis + * input stream. + * @param fileSize + * Length of the stream. + * @param cacheConf + * Cache configuration. + * @param hfs + * The file system. + * @param conf + * Configuration + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") + public HFileReaderImpl(final Path path, FixedFileTrailer trailer, + final FSDataInputStreamWrapper fsdis, + final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs, + final Configuration conf) + throws IOException { + this.trailer = trailer; + this.compressAlgo = trailer.getCompressionCodec(); + this.cacheConf = cacheConf; + this.fileSize = fileSize; + this.path = path; + this.name = path.getName(); + this.conf = conf; + checkFileVersion(); + this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer); + this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext); + + // Comparator class name is stored in the trailer in version 2. + comparator = trailer.createComparator(); + dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, + trailer.getNumDataIndexLevels(), this); + metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( + KeyValue.RAW_COMPARATOR, 1); + + // Parse load-on-open data. + + HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange( + trailer.getLoadOnOpenDataOffset(), + fileSize - trailer.getTrailerSize()); + + // Data index. We also read statistics about the block index written after + // the root level. + dataBlockIndexReader.readMultiLevelIndexRoot( + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), + trailer.getDataIndexCount()); + + // Meta index. + metaBlockIndexReader.readRootIndex( + blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), + trailer.getMetaIndexCount()); + + // File info + fileInfo = new FileInfo(); + fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); + byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS); + this.hfileContext.setFileCreateTime(creationTimeBytes == null ? + 0 : Bytes.toLong(creationTimeBytes)); + lastKey = fileInfo.get(FileInfo.LASTKEY); + avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); + avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); + includesMemstoreTS = keyValueFormatVersion != null && + Bytes.toInt(keyValueFormatVersion) == HFileWriterImpl.KEY_VALUE_VER_WITH_MEMSTORE; + fsBlockReader.setIncludesMemstoreTS(includesMemstoreTS); + if (includesMemstoreTS) { + decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterImpl.MAX_MEMSTORE_TS_KEY)) > 0; + } + + // Read data block encoding algorithm name from file info. + dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo); + fsBlockReader.setDataBlockEncoder(dataBlockEncoder); + + // Store all other load-on-open blocks for further consumption. + HFileBlock b; + while ((b = blockIter.nextBlock()) != null) { + loadOnOpenBlocks.add(b); + } + + // Prefetch file blocks upon open if requested + if (cacheConf.shouldPrefetchOnOpen()) { + PrefetchExecutor.request(path, new Runnable() { + public void run() { + try { + long offset = 0; + long end = fileSize - getTrailer().getTrailerSize(); + HFileBlock prevBlock = null; + while (offset < end) { + if (Thread.interrupted()) { + break; + } + long onDiskSize = -1; + if (prevBlock != null) { + onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); + } + HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, + null, null); + prevBlock = block; + offset += block.getOnDiskSizeWithHeader(); + } + } catch (IOException e) { + // IOExceptions are probably due to region closes (relocation, etc.) + if (LOG.isTraceEnabled()) { + LOG.trace("Exception encountered while prefetching " + path + ":", e); + } + } catch (Exception e) { + // Other exceptions are interesting + LOG.warn("Exception encountered while prefetching " + path + ":", e); + } finally { + PrefetchExecutor.complete(path); + } + } + }); + } + + byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN); + // max tag length is not present in the HFile means tags were not at all written to file. + if (tmp != null) { + hfileContext.setIncludesTags(true); + tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED); + if (tmp != null && Bytes.toBoolean(tmp)) { + hfileContext.setCompressTags(true); + } + } + } + + /** + * File version check is a little sloppy. We read v3 files but can also read v2 files if their + * content has been pb'd; files written with 0.98. + */ + private void checkFileVersion() { + int majorVersion = trailer.getMajorVersion(); + if (majorVersion == getMajorVersion()) return; + int minorVersion = trailer.getMinorVersion(); + if (majorVersion == 2 && minorVersion >= MIN_V2_MINOR_VERSION_WITH_PB) return; + // We can read v3 or v2 versions of hfile. + throw new IllegalArgumentException("Invalid HFile version: major=" + + trailer.getMajorVersion() + ", minor=" + trailer.getMinorVersion() + ": expected at least " + + "major=2 and minor=" + MAX_MINOR_VERSION); + } + + @SuppressWarnings("serial") + public static class BlockIndexNotLoadedException extends IllegalStateException { + public BlockIndexNotLoadedException() { + // Add a message in case anyone relies on it as opposed to class name. + super("Block index not loaded"); + } + } + + private String toStringFirstKey() { + return KeyValue.keyToString(getFirstKey()); + } + + private String toStringLastKey() { + return KeyValue.keyToString(getLastKey()); + } + + @Override + public String toString() { + return "reader=" + path.toString() + + (!isFileInfoLoaded()? "": + ", compression=" + compressAlgo.getName() + + ", cacheConf=" + cacheConf + + ", firstKey=" + toStringFirstKey() + + ", lastKey=" + toStringLastKey()) + + ", avgKeyLen=" + avgKeyLen + + ", avgValueLen=" + avgValueLen + + ", entries=" + trailer.getEntryCount() + + ", length=" + fileSize; + } + + @Override + public long length() { + return fileSize; + } + + /** + * @return the first key in the file. May be null if file has no entries. Note + * that this is not the first row key, but rather the byte form of the + * first KeyValue. + */ + @Override + public byte [] getFirstKey() { + if (dataBlockIndexReader == null) { + throw new BlockIndexNotLoadedException(); + } + return dataBlockIndexReader.isEmpty() ? null + : dataBlockIndexReader.getRootBlockKey(0); + } + + /** + * TODO left from {@link HFile} version 1: move this to StoreFile after Ryan's + * patch goes in to eliminate {@link KeyValue} here. + * + * @return the first row key, or null if the file is empty. + */ + @Override + public byte[] getFirstRowKey() { + byte[] firstKey = getFirstKey(); + return firstKey == null? null: KeyValue.createKeyValueFromKey(firstKey).getRow(); + } + + /** + * TODO left from {@link HFile} version 1: move this to StoreFile after + * Ryan's patch goes in to eliminate {@link KeyValue} here. + * + * @return the last row key, or null if the file is empty. + */ + @Override + public byte[] getLastRowKey() { + byte[] lastKey = getLastKey(); + return lastKey == null? null: KeyValue.createKeyValueFromKey(lastKey).getRow(); + } + + /** @return number of KV entries in this HFile */ + @Override + public long getEntries() { + return trailer.getEntryCount(); + } + + /** @return comparator */ + @Override + public KVComparator getComparator() { + return comparator; + } + + /** @return compression algorithm */ + @Override + public Compression.Algorithm getCompressionAlgorithm() { + return compressAlgo; + } + + /** + * @return the total heap size of data and meta block indexes in bytes. Does + * not take into account non-root blocks of a multilevel data index. + */ + public long indexSize() { + return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0) + + ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize() + : 0); + } + + @Override + public String getName() { + return name; + } + + @Override + public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() { + return dataBlockIndexReader; + } + + @Override + public FixedFileTrailer getTrailer() { + return trailer; + } + + @Override + public FileInfo loadFileInfo() throws IOException { + return fileInfo; + } + + /** + * An exception thrown when an operation requiring a scanner to be seeked + * is invoked on a scanner that is not seeked. + */ + @SuppressWarnings("serial") + public static class NotSeekedException extends IllegalStateException { + public NotSeekedException() { + super("Not seeked to a key/value"); + } + } + + protected static class HFileScannerImpl implements HFileScanner { + private ByteBuffer blockBuffer; + protected final boolean cacheBlocks; + protected final boolean pread; + protected final boolean isCompaction; + private int currKeyLen; + private int currValueLen; + private int currMemstoreTSLen; + private long currMemstoreTS; + // Updated but never read? + protected volatile int blockFetches; + protected final HFile.Reader reader; + private int currTagsLen; + + protected HFileBlock block; + + /** + * The next indexed key is to keep track of the indexed key of the next data block. + * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the + * current data block is the last data block. + * + * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet. + */ + protected Cell nextIndexedKey; + + public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks, + final boolean pread, final boolean isCompaction) { + this.reader = reader; + this.cacheBlocks = cacheBlocks; + this.pread = pread; + this.isCompaction = isCompaction; + } + + @Override + public boolean isSeeked(){ + return blockBuffer != null; + } + + @Override + public String toString() { + return "HFileScanner for reader " + String.valueOf(getReader()); + } + + protected void assertSeeked() { + if (!isSeeked()) + throw new NotSeekedException(); + } + + @Override + public int seekTo(byte[] key) throws IOException { + return seekTo(key, 0, key.length); + } + + @Override + public boolean seekBefore(byte[] key) throws IOException { + return seekBefore(key, 0, key.length); + } + + @Override + public int reseekTo(byte[] key) throws IOException { + return reseekTo(key, 0, key.length); + } + + @Override + public HFile.Reader getReader() { + return reader; + } + + protected int getCellBufSize() { + int kvBufSize = KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen; + if (this.reader.getFileContext().isIncludesTags()) { + kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen; + } + return kvBufSize; + } + + protected int getNextCellStartPosition() { + int nextKvPos = blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen + + currMemstoreTSLen; + if (this.reader.getFileContext().isIncludesTags()) { + nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen; + } + return nextKvPos; + } + + protected void readKeyValueLen() { + blockBuffer.mark(); + currKeyLen = blockBuffer.getInt(); + currValueLen = blockBuffer.getInt(); + if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit() + || currValueLen > blockBuffer.limit()) { + throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen " + + currValueLen + ". Block offset: " + + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)."); + } + ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen); + if (this.reader.getFileContext().isIncludesTags()) { + // Read short as unsigned, high byte first + currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); + if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) { + throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: " + + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)."); + } + ByteBufferUtils.skip(blockBuffer, currTagsLen); + } + readMvccVersion(); + blockBuffer.reset(); + } + + /** + * Within a loaded block, seek looking for the last key that is smaller than + * (or equal to?) the key we are interested in. + * A note on the seekBefore: if you have seekBefore = true, AND the first + * key in the block = key, then you'll get thrown exceptions. The caller has + * to check for that case and load the previous block as appropriate. + * @param key + * the key to find + * @param seekBefore + * find the key before the given key in case of exact match. + * @return 0 in case of an exact key match, 1 in case of an inexact match, + * -2 in case of an inexact match and furthermore, the input key + * less than the first key of current block(e.g. using a faked index + * key) + */ + protected int blockSeek(Cell key, boolean seekBefore) { + int klen, vlen, tlen = 0; + long memstoreTS = 0; + int memstoreTSLen = 0; + int lastKeyValueSize = -1; + KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue(); + do { + blockBuffer.mark(); + klen = blockBuffer.getInt(); + vlen = blockBuffer.getInt(); + if (klen < 0 || vlen < 0 || klen > blockBuffer.limit() + || vlen > blockBuffer.limit()) { + throw new IllegalStateException("Invalid klen " + klen + " or vlen " + + vlen + ". Block offset: " + + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)."); + } + ByteBufferUtils.skip(blockBuffer, klen + vlen); + if (this.reader.getFileContext().isIncludesTags()) { + // Read short as unsigned, high byte first + tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); + if (tlen < 0 || tlen > blockBuffer.limit()) { + throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " + + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)."); + } + ByteBufferUtils.skip(blockBuffer, tlen); + } + if (this.reader.shouldIncludeMemstoreTS()) { + if (this.reader.isDecodeMemstoreTS()) { + try { + memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position()); + memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstore timestamp", e); + } + } else { + memstoreTS = 0; + memstoreTSLen = 1; + } + } + blockBuffer.reset(); + int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2); + keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen); + int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv); + + if (comp == 0) { + if (seekBefore) { + if (lastKeyValueSize < 0) { + throw new IllegalStateException("blockSeek with seekBefore " + + "at the first key of the block: key=" + + CellUtil.getCellKeyAsString(key) + + ", blockOffset=" + block.getOffset() + ", onDiskSize=" + + block.getOnDiskSizeWithHeader()); + } + blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + readKeyValueLen(); + return 1; // non exact match. + } + currKeyLen = klen; + currValueLen = vlen; + currTagsLen = tlen; + if (this.reader.shouldIncludeMemstoreTS()) { + currMemstoreTS = memstoreTS; + currMemstoreTSLen = memstoreTSLen; + } + return 0; // indicate exact match + } else if (comp < 0) { + if (lastKeyValueSize > 0) + blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + readKeyValueLen(); + if (lastKeyValueSize == -1 && blockBuffer.position() == 0) { + return HConstants.INDEX_KEY_MAGIC; + } + return 1; + } + + // The size of this key/value tuple, including key/value length fields. + lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; + // include tag length also if tags included with KV + if (this.reader.getFileContext().isIncludesTags()) { + lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT; + } + blockBuffer.position(blockBuffer.position() + lastKeyValueSize); + } while (blockBuffer.remaining() > 0); + + // Seek to the last key we successfully read. This will happen if this is + // the last key/value pair in the file, in which case the following call + // to next() has to return false. + blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + readKeyValueLen(); + return 1; // didn't exactly find it. + } + + @Override + public Cell getNextIndexedKey() { + return nextIndexedKey; + } + + @Override + public int seekTo(byte[] key, int offset, int length) throws IOException { + // Always rewind to the first key of the block, because the given key + // might be before or after the current key. + return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length)); + } + + @Override + public int reseekTo(byte[] key, int offset, int length) throws IOException { + return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length)); + } + + @Override + public int seekTo(Cell key) throws IOException { + return seekTo(key, true); + } + + @Override + public int reseekTo(Cell key) throws IOException { + int compared; + if (isSeeked()) { + compared = compareKey(reader.getComparator(), key); + if (compared < 1) { + // If the required key is less than or equal to current key, then + // don't do anything. + return compared; + } else { + // The comparison with no_next_index_key has to be checked + if (this.nextIndexedKey != null && + (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader + .getComparator().compareOnlyKeyPortion(key, nextIndexedKey) < 0)) { + // The reader shall continue to scan the current data block instead + // of querying the + // block index as long as it knows the target key is strictly + // smaller than + // the next indexed key or the current data block is the last data + // block. + return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false); + } + } + } + // Don't rewind on a reseek operation, because reseek implies that we are + // always going forward in the file. + return seekTo(key, false); + } + + /** + * An internal API function. Seek to the given key, optionally rewinding to + * the first key of the block before doing the seek. + * + * @param key - a cell representing the key that we need to fetch + * @param rewind whether to rewind to the first key of the block before + * doing the seek. If this is false, we are assuming we never go + * back, otherwise the result is undefined. + * @return -1 if the key is earlier than the first key of the file, + * 0 if we are at the given key, 1 if we are past the given key + * -2 if the key is earlier than the first key of the file while + * using a faked index key + * @throws IOException + */ + public int seekTo(Cell key, boolean rewind) throws IOException { + HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); + BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block, + cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding()); + if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { + // This happens if the key e.g. falls before the beginning of the file. + return -1; + } + return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(), + blockWithScanInfo.getNextIndexedKey(), rewind, key, false); + } + + @Override + public boolean seekBefore(byte[] key, int offset, int length) throws IOException { + return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length)); + } + + @Override + public boolean seekBefore(Cell key) throws IOException { + HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block, + cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction)); + if (seekToBlock == null) { + return false; + } + ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock); + + if (reader.getComparator() + .compareOnlyKeyPortion( + new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(), + firstKey.limit()), key) >= 0) { + long previousBlockOffset = seekToBlock.getPrevBlockOffset(); + // The key we are interested in + if (previousBlockOffset == -1) { + // we have a 'problem', the key we want is the first of the file. + return false; + } + + // It is important that we compute and pass onDiskSize to the block + // reader so that it does not have to read the header separately to + // figure out the size. + seekToBlock = reader.readBlock(previousBlockOffset, + seekToBlock.getOffset() - previousBlockOffset, cacheBlocks, + pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + // TODO shortcut: seek forward in this block to the last key of the + // block. + } + Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey)); + loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true); + return true; + } + + /** + * Scans blocks in the "scanned" section of the {@link HFile} until the next + * data block is found. + * + * @return the next block, or null if there are no more data blocks + * @throws IOException + */ + protected HFileBlock readNextDataBlock() throws IOException { + long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); + if (block == null) + return null; + + HFileBlock curBlock = block; + + do { + if (curBlock.getOffset() >= lastDataBlockOffset) + return null; + + if (curBlock.getOffset() < 0) { + throw new IOException("Invalid block file offset: " + block); + } + + // We are reading the next block without block type validation, because + // it might turn out to be a non-data block. + curBlock = reader.readBlock(curBlock.getOffset() + + curBlock.getOnDiskSizeWithHeader(), + curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, + isCompaction, true, null, getEffectiveDataBlockEncoding()); + } while (!curBlock.getBlockType().isData()); + + return curBlock; + } + + public DataBlockEncoding getEffectiveDataBlockEncoding() { + return this.reader.getEffectiveEncodingInCache(isCompaction); + } + + @Override + public Cell getKeyValue() { + if (!isSeeked()) + return null; + + KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + if (this.reader.shouldIncludeMemstoreTS()) { + ret.setSequenceId(currMemstoreTS); + } + return ret; + } + + @Override + public ByteBuffer getKey() { + assertSeeked(); + return ByteBuffer.wrap( + blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE, currKeyLen).slice(); + } + + public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { + return comparator.compareFlatKey(key, offset, length, blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen); + } + + @Override + public ByteBuffer getValue() { + assertSeeked(); + return ByteBuffer.wrap( + blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice(); + } + + protected void setNonSeekedState() { + block = null; + blockBuffer = null; + currKeyLen = 0; + currValueLen = 0; + currMemstoreTS = 0; + currMemstoreTSLen = 0; + currTagsLen = 0; + } + + /** + * Go to the next key/value in the block section. Loads the next block if + * necessary. If successful, {@link #getKey()} and {@link #getValue()} can + * be called. + * + * @return true if successfully navigated to the next key/value + */ + @Override + public boolean next() throws IOException { + assertSeeked(); + + try { + blockBuffer.position(getNextCellStartPosition()); + } catch (IllegalArgumentException e) { + LOG.error("Current pos = " + blockBuffer.position() + + "; currKeyLen = " + currKeyLen + "; currValLen = " + + currValueLen + "; block limit = " + blockBuffer.limit() + + "; HFile name = " + reader.getName() + + "; currBlock currBlockOffset = " + block.getOffset()); + throw e; + } + + if (blockBuffer.remaining() <= 0) { + long lastDataBlockOffset = + reader.getTrailer().getLastDataBlockOffset(); + + if (block.getOffset() >= lastDataBlockOffset) { + setNonSeekedState(); + return false; + } + + // read the next block + HFileBlock nextBlock = readNextDataBlock(); + if (nextBlock == null) { + setNonSeekedState(); + return false; + } + + updateCurrBlock(nextBlock); + return true; + } + + // We are still in the same block. + readKeyValueLen(); + return true; + } + + /** + * Positions this scanner at the start of the file. + * + * @return false if empty file; i.e. a call to next would return false and + * the current key and value are undefined. + * @throws IOException + */ + @Override + public boolean seekTo() throws IOException { + if (reader == null) { + return false; + } + + if (reader.getTrailer().getEntryCount() == 0) { + // No data blocks. + return false; + } + + long firstDataBlockOffset = + reader.getTrailer().getFirstDataBlockOffset(); + if (block != null && block.getOffset() == firstDataBlockOffset) { + blockBuffer.rewind(); + readKeyValueLen(); + return true; + } + + block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, + isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + if (block.getOffset() < 0) { + throw new IOException("Invalid block offset: " + block.getOffset()); + } + updateCurrBlock(block); + return true; + } + + protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, + boolean rewind, Cell key, boolean seekBefore) throws IOException { + if (block == null || block.getOffset() != seekToBlock.getOffset()) { + updateCurrBlock(seekToBlock); + } else if (rewind) { + blockBuffer.rewind(); + } + + // Update the nextIndexedKey + this.nextIndexedKey = nextIndexedKey; + return blockSeek(key, seekBefore); + } + + /** + * Updates the current block to be the given {@link HFileBlock}. Seeks to + * the the first key/value pair. + * + * @param newBlock the block to make current + */ + protected void updateCurrBlock(HFileBlock newBlock) { + block = newBlock; + + // sanity check + if (block.getBlockType() != BlockType.DATA) { + throw new IllegalStateException("Scanner works only on data " + + "blocks, got " + block.getBlockType() + "; " + + "fileName=" + reader.getName() + ", " + + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + + "isCompaction=" + isCompaction); + } + + blockBuffer = block.getBufferWithoutHeader(); + readKeyValueLen(); + blockFetches++; + + // Reset the next indexed key + this.nextIndexedKey = null; + } + + protected void readMvccVersion() { + if (this.reader.shouldIncludeMemstoreTS()) { + if (this.reader.isDecodeMemstoreTS()) { + try { + currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position()); + currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); + } catch (Exception e) { + throw new RuntimeException("Error reading memstore timestamp", e); + } + } else { + currMemstoreTS = 0; + currMemstoreTSLen = 1; + } + } + } + + protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) { + ByteBuffer buffer = curBlock.getBufferWithoutHeader(); + // It is safe to manipulate this buffer because we own the buffer object. + buffer.rewind(); + int klen = buffer.getInt(); + buffer.getInt(); + ByteBuffer keyBuff = buffer.slice(); + keyBuff.limit(klen); + keyBuff.rewind(); + return keyBuff; + } + + @Override + public String getKeyString() { + return Bytes.toStringBinary(blockBuffer.array(), + blockBuffer.arrayOffset() + blockBuffer.position() + + KEY_VALUE_LEN_SIZE, currKeyLen); + } + + @Override + public String getValueString() { + return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen, + currValueLen); + } + + public int compareKey(KVComparator comparator, Cell key) { + return comparator.compareOnlyKeyPortion( + key, + new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen)); + } + } + + public Path getPath() { + return path; + } + + @Override + public DataBlockEncoding getDataBlockEncoding() { + return dataBlockEncoder.getDataBlockEncoding(); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** Minor versions in HFile starting with this number have hbase checksums */ + public static final int MINOR_VERSION_WITH_CHECKSUM = 1; + /** In HFile minor version that does not support checksums */ + public static final int MINOR_VERSION_NO_CHECKSUM = 0; + + /** HFile minor version that introduced pbuf filetrailer */ + public static final int PBUF_TRAILER_MINOR_VERSION = 2; + + /** + * The size of a (key length, value length) tuple that prefixes each entry in + * a data block. + */ + public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; + + protected boolean includesMemstoreTS = false; + protected boolean decodeMemstoreTS = false; + + + public boolean isDecodeMemstoreTS() { + return this.decodeMemstoreTS; + } + + public boolean shouldIncludeMemstoreTS() { + return includesMemstoreTS; + } + + /** + * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType} + * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary. + */ + private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, + boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding) throws IOException { + // Check cache for block. If found return. + if (cacheConf.isBlockCacheEnabled()) { + BlockCache cache = cacheConf.getBlockCache(); + HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, + updateCacheMetrics); + if (cachedBlock != null) { + if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { + cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); + } + validateBlockType(cachedBlock, expectedBlockType); + + if (expectedDataBlockEncoding == null) { + return cachedBlock; + } + DataBlockEncoding actualDataBlockEncoding = + cachedBlock.getDataBlockEncoding(); + // Block types other than data blocks always have + // DataBlockEncoding.NONE. To avoid false negative cache misses, only + // perform this check if cached block is a data block. + if (cachedBlock.getBlockType().isData() && + !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) { + // This mismatch may happen if a Scanner, which is used for say a + // compaction, tries to read an encoded block from the block cache. + // The reverse might happen when an EncodedScanner tries to read + // un-encoded blocks which were cached earlier. + // + // Because returning a data block with an implicit BlockType mismatch + // will cause the requesting scanner to throw a disk read should be + // forced here. This will potentially cause a significant number of + // cache misses, so update so we should keep track of this as it might + // justify the work on a CompoundScanner. + if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) && + !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) { + // If the block is encoded but the encoding does not match the + // expected encoding it is likely the encoding was changed but the + // block was not yet evicted. Evictions on file close happen async + // so blocks with the old encoding still linger in cache for some + // period of time. This event should be rare as it only happens on + // schema definition change. + LOG.info("Evicting cached block with key " + cacheKey + + " because of a data block encoding mismatch" + + "; expected: " + expectedDataBlockEncoding + + ", actual: " + actualDataBlockEncoding); + cache.evictBlock(cacheKey); + } + return null; + } + return cachedBlock; + } + } + return null; + } + + /** + * @param metaBlockName + * @param cacheBlock Add block to cache, if found + * @return block wrapped in a ByteBuffer, with header skipped + * @throws IOException + */ + @Override + public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) + throws IOException { + if (trailer.getMetaIndexCount() == 0) { + return null; // there are no meta blocks + } + if (metaBlockIndexReader == null) { + throw new IOException("Meta index not loaded"); + } + + byte[] mbname = Bytes.toBytes(metaBlockName); + int block = metaBlockIndexReader.rootBlockContainingKey(mbname, + 0, mbname.length); + if (block == -1) + return null; + long blockSize = metaBlockIndexReader.getRootBlockDataSize(block); + + // Per meta key from any given file, synchronize reads for said block. This + // is OK to do for meta blocks because the meta block index is always + // single-level. + synchronized (metaBlockIndexReader.getRootBlockKey(block)) { + // Check cache for block. If found return. + long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); + BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset); + + cacheBlock &= cacheConf.shouldCacheDataOnRead(); + if (cacheConf.isBlockCacheEnabled()) { + HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true, + BlockType.META, null); + if (cachedBlock != null) { + assert cachedBlock.isUnpacked() : "Packed block leak."; + // Return a distinct 'shallow copy' of the block, + // so pos does not get messed by the scanner + return cachedBlock.getBufferWithoutHeader(); + } + // Cache Miss, please load. + } + + HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, + blockSize, -1, true).unpack(hfileContext, fsBlockReader); + + // Cache the block + if (cacheBlock) { + cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock, + cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); + } + + return metaBlock.getBufferWithoutHeader(); + } + } + + @Override + public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, + final boolean cacheBlock, boolean pread, final boolean isCompaction, + boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding) + throws IOException { + if (dataBlockIndexReader == null) { + throw new IOException("Block index not loaded"); + } + if (dataBlockOffset < 0 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) { + throw new IOException("Requested block is out of range: " + dataBlockOffset + + ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset()); + } + // For any given block from any given file, synchronize reads for said + // block. + // Without a cache, this synchronizing is needless overhead, but really + // the other choice is to duplicate work (which the cache would prevent you + // from doing). + + BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset); + + boolean useLock = false; + IdLock.Entry lockEntry = null; + TraceScope traceScope = Trace.startSpan("HFileReaderImpl.readBlock"); + try { + while (true) { + if (useLock) { + lockEntry = offsetLock.getLockEntry(dataBlockOffset); + } + + // Check cache for block. If found return. + if (cacheConf.isBlockCacheEnabled()) { + // Try and get the block from the block cache. If the useLock variable is true then this + // is the second time through the loop and it should not be counted as a block cache miss. + HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, + updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); + if (cachedBlock != null) { + if (Trace.isTracing()) { + traceScope.getSpan().addTimelineAnnotation("blockCacheHit"); + } + assert cachedBlock.isUnpacked() : "Packed block leak."; + if (cachedBlock.getBlockType().isData()) { + if (updateCacheMetrics) { + HFile.dataBlockReadCnt.incrementAndGet(); + } + // Validate encoding type for data blocks. We include encoding + // type in the cache key, and we expect it to match on a cache hit. + if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { + throw new IOException("Cached block under key " + cacheKey + " " + + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " + + dataBlockEncoder.getDataBlockEncoding() + ")"); + } + } + // Cache-hit. Return! + return cachedBlock; + } + // Carry on, please load. + } + if (!useLock) { + // check cache again with lock + useLock = true; + continue; + } + if (Trace.isTracing()) { + traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); + } + // Load block from filesystem. + HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, + pread); + validateBlockType(hfileBlock, expectedBlockType); + HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); + BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); + + // Cache the block if necessary + if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + cacheConf.getBlockCache().cacheBlock(cacheKey, + cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked, + cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); + } + + if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { + HFile.dataBlockReadCnt.incrementAndGet(); + } + + return unpacked; + } + } finally { + traceScope.close(); + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } + } + } + + @Override + public boolean hasMVCCInfo() { + return includesMemstoreTS && decodeMemstoreTS; + } + + /** + * Compares the actual type of a block retrieved from cache or disk with its + * expected type and throws an exception in case of a mismatch. Expected + * block type of {@link BlockType#DATA} is considered to match the actual + * block type [@link {@link BlockType#ENCODED_DATA} as well. + * @param block a block retrieved from cache or disk + * @param expectedBlockType the expected block type, or null to skip the + * check + */ + private void validateBlockType(HFileBlock block, + BlockType expectedBlockType) throws IOException { + if (expectedBlockType == null) { + return; + } + BlockType actualBlockType = block.getBlockType(); + if (expectedBlockType.isData() && actualBlockType.isData()) { + // We consider DATA to match ENCODED_DATA for the purpose of this + // verification. + return; + } + if (actualBlockType != expectedBlockType) { + throw new IOException("Expected block type " + expectedBlockType + ", " + + "but got " + actualBlockType + ": " + block); + } + } + + /** + * @return Last key in the file. May be null if file has no entries. Note that + * this is not the last row key, but rather the byte form of the last + * KeyValue. + */ + @Override + public byte[] getLastKey() { + return dataBlockIndexReader.isEmpty() ? null : lastKey; + } + + /** + * @return Midkey for this file. We work with block boundaries only so + * returned midkey is an approximation only. + * @throws IOException + */ + @Override + public byte[] midkey() throws IOException { + return dataBlockIndexReader.midkey(); + } + + @Override + public void close() throws IOException { + close(cacheConf.shouldEvictOnClose()); + } + + public void close(boolean evictOnClose) throws IOException { + PrefetchExecutor.cancel(path); + if (evictOnClose && cacheConf.isBlockCacheEnabled()) { + int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name); + if (LOG.isTraceEnabled()) { + LOG.trace("On close, file=" + name + " evicted=" + numEvicted + + " block(s)"); + } + } + fsBlockReader.closeStreams(); + } + + public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) { + return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction); + } + + /** For testing */ + public HFileBlock.FSReader getUncachedBlockReader() { + return fsBlockReader; + } + + /** + * Scanner that operates on encoded data blocks. + */ + protected static class EncodedScanner extends HFileScannerImpl { + private final HFileBlockDecodingContext decodingCtx; + private final DataBlockEncoder.EncodedSeeker seeker; + private final DataBlockEncoder dataBlockEncoder; + protected final HFileContext meta; + + public EncodedScanner(HFile.Reader reader, boolean cacheBlocks, + boolean pread, boolean isCompaction, HFileContext meta) { + super(reader, cacheBlocks, pread, isCompaction); + DataBlockEncoding encoding = reader.getDataBlockEncoding(); + dataBlockEncoder = encoding.getEncoder(); + decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta); + seeker = dataBlockEncoder.createSeeker( + reader.getComparator(), decodingCtx); + this.meta = meta; + } + + @Override + public boolean isSeeked(){ + return this.block != null; + } + + /** + * Updates the current block to be the given {@link HFileBlock}. Seeks to + * the the first key/value pair. + * + * @param newBlock the block to make current + * @throws CorruptHFileException + */ + private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { + block = newBlock; + + // sanity checks + if (block.getBlockType() != BlockType.ENCODED_DATA) { + throw new IllegalStateException( + "EncodedScanner works only on encoded data blocks"); + } + short dataBlockEncoderId = block.getDataBlockEncodingId(); + if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { + String encoderCls = dataBlockEncoder.getClass().getName(); + throw new CorruptHFileException("Encoder " + encoderCls + + " doesn't support data block encoding " + + DataBlockEncoding.getNameFromId(dataBlockEncoderId)); + } + + seeker.setCurrentBuffer(getEncodedBuffer(newBlock)); + blockFetches++; + + // Reset the next indexed key + this.nextIndexedKey = null; + } + + private ByteBuffer getEncodedBuffer(HFileBlock newBlock) { + ByteBuffer origBlock = newBlock.getBufferReadOnly(); + ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(), + origBlock.arrayOffset() + newBlock.headerSize() + + DataBlockEncoding.ID_SIZE, + newBlock.getUncompressedSizeWithoutHeader() - + DataBlockEncoding.ID_SIZE).slice(); + return encodedBlock; + } + + @Override + public boolean seekTo() throws IOException { + if (reader == null) { + return false; + } + + if (reader.getTrailer().getEntryCount() == 0) { + // No data blocks. + return false; + } + + long firstDataBlockOffset = + reader.getTrailer().getFirstDataBlockOffset(); + if (block != null && block.getOffset() == firstDataBlockOffset) { + seeker.rewind(); + return true; + } + + block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, + isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); + if (block.getOffset() < 0) { + throw new IOException("Invalid block offset: " + block.getOffset()); + } + updateCurrentBlock(block); + return true; + } + + @Override + public boolean next() throws IOException { + boolean isValid = seeker.next(); + if (!isValid) { + block = readNextDataBlock(); + isValid = block != null; + if (isValid) { + updateCurrentBlock(block); + } + } + return isValid; + } + + @Override + public ByteBuffer getKey() { + assertValidSeek(); + return seeker.getKeyDeepCopy(); + } + + public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { + return seeker.compareKey(comparator, key, offset, length); + } + + @Override + public ByteBuffer getValue() { + assertValidSeek(); + return seeker.getValueShallowCopy(); + } + + @Override + public Cell getKeyValue() { + if (block == null) { + return null; + } + return seeker.getKeyValue(); + } + + @Override + public String getKeyString() { + ByteBuffer keyBuffer = getKey(); + return Bytes.toStringBinary(keyBuffer.array(), + keyBuffer.arrayOffset(), keyBuffer.limit()); + } + + @Override + public String getValueString() { + ByteBuffer valueBuffer = getValue(); + return Bytes.toStringBinary(valueBuffer.array(), + valueBuffer.arrayOffset(), valueBuffer.limit()); + } + + private void assertValidSeek() { + if (block == null) { + throw new NotSeekedException(); + } + } + + protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) { + return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock)); + } + + protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, + boolean rewind, Cell key, boolean seekBefore) throws IOException { + if (block == null || block.getOffset() != seekToBlock.getOffset()) { + updateCurrentBlock(seekToBlock); + } else if (rewind) { + seeker.rewind(); + } + this.nextIndexedKey = nextIndexedKey; + return seeker.seekToKeyInBlock(key, seekBefore); + } + + public int compareKey(KVComparator comparator, Cell key) { + return seeker.compareKey(comparator, key); + } + } + + /** + * Returns a buffer with the Bloom filter metadata. The caller takes + * ownership of the buffer. + */ + @Override + public DataInput getGeneralBloomFilterMetadata() throws IOException { + return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META); + } + + @Override + public DataInput getDeleteBloomFilterMetadata() throws IOException { + return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META); + } + + private DataInput getBloomFilterMetadata(BlockType blockType) + throws IOException { + if (blockType != BlockType.GENERAL_BLOOM_META && + blockType != BlockType.DELETE_FAMILY_BLOOM_META) { + throw new RuntimeException("Block Type: " + blockType.toString() + + " is not supported") ; + } + + for (HFileBlock b : loadOnOpenBlocks) + if (b.getBlockType() == blockType) + return b.getByteStream(); + return null; + } + + public boolean isFileInfoLoaded() { + return true; // We load file info in constructor in version 2. + } + + /** + * Validates that the minor version is within acceptable limits. + * Otherwise throws an Runtime exception + */ + private void validateMinorVersion(Path path, int minorVersion) { + if (minorVersion < MIN_MINOR_VERSION || + minorVersion > MAX_MINOR_VERSION) { + String msg = "Minor version for path " + path + + " is expected to be between " + + MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION + + " but is found to be " + minorVersion; + LOG.error(msg); + throw new RuntimeException(msg); + } + } + + @Override + public HFileContext getFileContext() { + return hfileContext; + } + + /** + * Returns false if block prefetching was requested for this file and has + * not completed, true otherwise + */ + @VisibleForTesting + public boolean prefetchComplete() { + return PrefetchExecutor.isCompleted(path); + } + + protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize, + HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException { + HFileContextBuilder builder = new HFileContextBuilder() + .withIncludesMvcc(this.includesMemstoreTS) + .withHBaseCheckSum(true) + .withCompression(this.compressAlgo); + + // Check for any key material available + byte[] keyBytes = trailer.getEncryptionKey(); + if (keyBytes != null) { + Encryption.Context cryptoContext = Encryption.newContext(conf); + Key key; + String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, + User.getCurrent().getShortName()); + try { + // First try the master key + key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes); + } catch (KeyException e) { + // If the current master key fails to unwrap, try the alternate, if + // one is configured + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'"); + } + String alternateKeyName = + conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY); + if (alternateKeyName != null) { + try { + key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes); + } catch (KeyException ex) { + throw new IOException(ex); + } + } else { + throw new IOException(e); + } + } + // Use the algorithm the key wants + Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm()); + if (cipher == null) { + throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available"); + } + cryptoContext.setCipher(cipher); + cryptoContext.setKey(key); + builder.withEncryptionContext(cryptoContext); + } + + HFileContext context = builder.build(); + + if (LOG.isTraceEnabled()) { + LOG.trace("Reader" + (path != null ? " for " + path : "" ) + + " initialized with cacheConf: " + cacheConf + + " comparator: " + comparator.getClass().getSimpleName() + + " fileContext: " + context); + } + + return context; + } + + /** + * Create a Scanner on this file. No seeks or reads are done on creation. Call + * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is + * nothing to clean up in a Scanner. Letting go of your references to the + * scanner is sufficient. NOTE: Do not use this overload of getScanner for + * compactions. See {@link #getScanner(boolean, boolean, boolean)} + * + * @param cacheBlocks True if we should cache blocks read in by this scanner. + * @param pread Use positional read rather than seek+read if true (pread is + * better for random reads, seek+read is better scanning). + * @return Scanner on this file. + */ + @Override + public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { + return getScanner(cacheBlocks, pread, false); + } + + /** + * Create a Scanner on this file. No seeks or reads are done on creation. Call + * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is + * nothing to clean up in a Scanner. Letting go of your references to the + * scanner is sufficient. + * @param cacheBlocks + * True if we should cache blocks read in by this scanner. + * @param pread + * Use positional read rather than seek+read if true (pread is better + * for random reads, seek+read is better scanning). + * @param isCompaction + * is scanner being used for a compaction? + * @return Scanner on this file. + */ + @Override + public HFileScanner getScanner(boolean cacheBlocks, final boolean pread, + final boolean isCompaction) { + if (dataBlockEncoder.useEncodedScanner()) { + return new EncodedScanner(this, cacheBlocks, pread, isCompaction, this.hfileContext); + } + return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction); + } + + public int getMajorVersion() { + return 3; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java deleted file mode 100644 index c0e3e91..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ /dev/null @@ -1,1323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KVComparator; -import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; -import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.util.ByteBufferUtils; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.IdLock; -import org.apache.hadoop.io.WritableUtils; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - -import com.google.common.annotations.VisibleForTesting; - -/** - * {@link HFile} reader for version 2. - */ -@InterfaceAudience.Private -public class HFileReaderV2 extends AbstractHFileReader { - - private static final Log LOG = LogFactory.getLog(HFileReaderV2.class); - - /** Minor versions in HFile V2 starting with this number have hbase checksums */ - public static final int MINOR_VERSION_WITH_CHECKSUM = 1; - /** In HFile V2 minor version that does not support checksums */ - public static final int MINOR_VERSION_NO_CHECKSUM = 0; - - /** HFile minor version that introduced pbuf filetrailer */ - public static final int PBUF_TRAILER_MINOR_VERSION = 2; - - /** - * The size of a (key length, value length) tuple that prefixes each entry in - * a data block. - */ - public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT; - - protected boolean includesMemstoreTS = false; - protected boolean decodeMemstoreTS = false; - protected boolean shouldIncludeMemstoreTS() { - return includesMemstoreTS; - } - - /** Filesystem-level block reader. */ - protected HFileBlock.FSReader fsBlockReader; - - /** - * A "sparse lock" implementation allowing to lock on a particular block - * identified by offset. The purpose of this is to avoid two clients loading - * the same block, and have all but one client wait to get the block from the - * cache. - */ - private IdLock offsetLock = new IdLock(); - - /** - * Blocks read from the load-on-open section, excluding data root index, meta - * index, and file info. - */ - private List loadOnOpenBlocks = new ArrayList(); - - /** Minimum minor version supported by this HFile format */ - static final int MIN_MINOR_VERSION = 0; - - /** Maximum minor version supported by this HFile format */ - // We went to version 2 when we moved to pb'ing fileinfo and the trailer on - // the file. This version can read Writables version 1. - static final int MAX_MINOR_VERSION = 3; - - /** Minor versions starting with this number have faked index key */ - static final int MINOR_VERSION_WITH_FAKED_KEY = 3; - - protected HFileContext hfileContext; - - /** - * Opens a HFile. You must load the index before you can use it by calling - * {@link #loadFileInfo()}. - * - * @param path Path to HFile. - * @param trailer File trailer. - * @param fsdis input stream. - * @param size Length of the stream. - * @param cacheConf Cache configuration. - * @param hfs - * @param conf - */ - public HFileReaderV2(final Path path, final FixedFileTrailer trailer, - final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf, - final HFileSystem hfs, final Configuration conf) throws IOException { - super(path, trailer, size, cacheConf, hfs, conf); - this.conf = conf; - trailer.expectMajorVersion(getMajorVersion()); - validateMinorVersion(path, trailer.getMinorVersion()); - this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer); - HFileBlock.FSReaderImpl fsBlockReaderV2 = - new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext); - this.fsBlockReader = fsBlockReaderV2; // upcast - - // Comparator class name is stored in the trailer in version 2. - comparator = trailer.createComparator(); - dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator, - trailer.getNumDataIndexLevels(), this); - metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader( - KeyValue.RAW_COMPARATOR, 1); - - // Parse load-on-open data. - - HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange( - trailer.getLoadOnOpenDataOffset(), - fileSize - trailer.getTrailerSize()); - - // Data index. We also read statistics about the block index written after - // the root level. - dataBlockIndexReader.readMultiLevelIndexRoot( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), - trailer.getDataIndexCount()); - - // Meta index. - metaBlockIndexReader.readRootIndex( - blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), - trailer.getMetaIndexCount()); - - // File info - fileInfo = new FileInfo(); - fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); - byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS); - this.hfileContext.setFileCreateTime(creationTimeBytes == null ? 0 : Bytes.toLong(creationTimeBytes)); - lastKey = fileInfo.get(FileInfo.LASTKEY); - avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); - avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); - byte [] keyValueFormatVersion = - fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION); - includesMemstoreTS = keyValueFormatVersion != null && - Bytes.toInt(keyValueFormatVersion) == - HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE; - fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS); - if (includesMemstoreTS) { - decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0; - } - - // Read data block encoding algorithm name from file info. - dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo); - fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder); - - // Store all other load-on-open blocks for further consumption. - HFileBlock b; - while ((b = blockIter.nextBlock()) != null) { - loadOnOpenBlocks.add(b); - } - - // Prefetch file blocks upon open if requested - if (cacheConf.shouldPrefetchOnOpen()) { - PrefetchExecutor.request(path, new Runnable() { - public void run() { - try { - long offset = 0; - long end = fileSize - getTrailer().getTrailerSize(); - HFileBlock prevBlock = null; - while (offset < end) { - if (Thread.interrupted()) { - break; - } - long onDiskSize = -1; - if (prevBlock != null) { - onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); - } - HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, - null, null); - prevBlock = block; - offset += block.getOnDiskSizeWithHeader(); - } - } catch (IOException e) { - // IOExceptions are probably due to region closes (relocation, etc.) - if (LOG.isTraceEnabled()) { - LOG.trace("Exception encountered while prefetching " + path + ":", e); - } - } catch (Exception e) { - // Other exceptions are interesting - LOG.warn("Exception encountered while prefetching " + path + ":", e); - } finally { - PrefetchExecutor.complete(path); - } - } - }); - } - } - - protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize, - HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException { - return new HFileContextBuilder() - .withIncludesMvcc(this.includesMemstoreTS) - .withCompression(this.compressAlgo) - .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM) - .build(); - } - - /** - * Create a Scanner on this file. No seeks or reads are done on creation. Call - * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is - * nothing to clean up in a Scanner. Letting go of your references to the - * scanner is sufficient. - * - * @param cacheBlocks True if we should cache blocks read in by this scanner. - * @param pread Use positional read rather than seek+read if true (pread is - * better for random reads, seek+read is better scanning). - * @param isCompaction is scanner being used for a compaction? - * @return Scanner on this file. - */ - @Override - public HFileScanner getScanner(boolean cacheBlocks, final boolean pread, - final boolean isCompaction) { - if (dataBlockEncoder.useEncodedScanner()) { - return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction, - hfileContext); - } - - return new ScannerV2(this, cacheBlocks, pread, isCompaction); - } - - /** - * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType} - * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary. - */ - private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, - boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, - DataBlockEncoding expectedDataBlockEncoding) throws IOException { - // Check cache for block. If found return. - if (cacheConf.isBlockCacheEnabled()) { - BlockCache cache = cacheConf.getBlockCache(); - HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, - updateCacheMetrics); - if (cachedBlock != null) { - if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { - cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); - } - validateBlockType(cachedBlock, expectedBlockType); - - if (expectedDataBlockEncoding == null) { - return cachedBlock; - } - DataBlockEncoding actualDataBlockEncoding = - cachedBlock.getDataBlockEncoding(); - // Block types other than data blocks always have - // DataBlockEncoding.NONE. To avoid false negative cache misses, only - // perform this check if cached block is a data block. - if (cachedBlock.getBlockType().isData() && - !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) { - // This mismatch may happen if a ScannerV2, which is used for say a - // compaction, tries to read an encoded block from the block cache. - // The reverse might happen when an EncodedScannerV2 tries to read - // un-encoded blocks which were cached earlier. - // - // Because returning a data block with an implicit BlockType mismatch - // will cause the requesting scanner to throw a disk read should be - // forced here. This will potentially cause a significant number of - // cache misses, so update so we should keep track of this as it might - // justify the work on a CompoundScannerV2. - if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) && - !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) { - // If the block is encoded but the encoding does not match the - // expected encoding it is likely the encoding was changed but the - // block was not yet evicted. Evictions on file close happen async - // so blocks with the old encoding still linger in cache for some - // period of time. This event should be rare as it only happens on - // schema definition change. - LOG.info("Evicting cached block with key " + cacheKey + - " because of a data block encoding mismatch" + - "; expected: " + expectedDataBlockEncoding + - ", actual: " + actualDataBlockEncoding); - cache.evictBlock(cacheKey); - } - return null; - } - return cachedBlock; - } - } - return null; - } - /** - * @param metaBlockName - * @param cacheBlock Add block to cache, if found - * @return block wrapped in a ByteBuffer, with header skipped - * @throws IOException - */ - @Override - public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) - throws IOException { - if (trailer.getMetaIndexCount() == 0) { - return null; // there are no meta blocks - } - if (metaBlockIndexReader == null) { - throw new IOException("Meta index not loaded"); - } - - byte[] mbname = Bytes.toBytes(metaBlockName); - int block = metaBlockIndexReader.rootBlockContainingKey(mbname, - 0, mbname.length); - if (block == -1) - return null; - long blockSize = metaBlockIndexReader.getRootBlockDataSize(block); - - // Per meta key from any given file, synchronize reads for said block. This - // is OK to do for meta blocks because the meta block index is always - // single-level. - synchronized (metaBlockIndexReader.getRootBlockKey(block)) { - // Check cache for block. If found return. - long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); - BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset); - - cacheBlock &= cacheConf.shouldCacheDataOnRead(); - if (cacheConf.isBlockCacheEnabled()) { - HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true, - BlockType.META, null); - if (cachedBlock != null) { - assert cachedBlock.isUnpacked() : "Packed block leak."; - // Return a distinct 'shallow copy' of the block, - // so pos does not get messed by the scanner - return cachedBlock.getBufferWithoutHeader(); - } - // Cache Miss, please load. - } - - HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true).unpack(hfileContext, fsBlockReader); - - // Cache the block - if (cacheBlock) { - cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock, - cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); - } - - return metaBlock.getBufferWithoutHeader(); - } - } - - @Override - public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, - final boolean cacheBlock, boolean pread, final boolean isCompaction, - boolean updateCacheMetrics, BlockType expectedBlockType, - DataBlockEncoding expectedDataBlockEncoding) - throws IOException { - if (dataBlockIndexReader == null) { - throw new IOException("Block index not loaded"); - } - if (dataBlockOffset < 0 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) { - throw new IOException("Requested block is out of range: " + dataBlockOffset + - ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset()); - } - - // For any given block from any given file, synchronize reads for said block. - // Without a cache, this synchronizing is needless overhead, but really - // the other choice is to duplicate work (which the cache would prevent you - // from doing). - BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset); - boolean useLock = false; - IdLock.Entry lockEntry = null; - TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock"); - try { - while (true) { - if (useLock) { - lockEntry = offsetLock.getLockEntry(dataBlockOffset); - } - - // Check cache for block. If found return. - if (cacheConf.isBlockCacheEnabled()) { - // Try and get the block from the block cache. If the useLock variable is true then this - // is the second time through the loop and it should not be counted as a block cache miss. - HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, - updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); - if (cachedBlock != null) { - if (Trace.isTracing()) { - traceScope.getSpan().addTimelineAnnotation("blockCacheHit"); - } - assert cachedBlock.isUnpacked() : "Packed block leak."; - if (cachedBlock.getBlockType().isData()) { - if (updateCacheMetrics) { - HFile.dataBlockReadCnt.incrementAndGet(); - } - // Validate encoding type for data blocks. We include encoding - // type in the cache key, and we expect it to match on a cache hit. - if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { - throw new IOException("Cached block under key " + cacheKey + " " - + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " - + dataBlockEncoder.getDataBlockEncoding() + ")"); - } - } - // Cache-hit. Return! - return cachedBlock; - } - // Carry on, please load. - } - if (!useLock) { - // check cache again with lock - useLock = true; - continue; - } - if (Trace.isTracing()) { - traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); - } - // Load block from filesystem. - HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, - pread); - validateBlockType(hfileBlock, expectedBlockType); - HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); - BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); - - // Cache the block if necessary - if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cacheConf.getBlockCache().cacheBlock(cacheKey, - cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked, - cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); - } - - if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { - HFile.dataBlockReadCnt.incrementAndGet(); - } - - return unpacked; - } - } finally { - traceScope.close(); - if (lockEntry != null) { - offsetLock.releaseLockEntry(lockEntry); - } - } - } - - @Override - public boolean hasMVCCInfo() { - return includesMemstoreTS && decodeMemstoreTS; - } - - /** - * Compares the actual type of a block retrieved from cache or disk with its - * expected type and throws an exception in case of a mismatch. Expected - * block type of {@link BlockType#DATA} is considered to match the actual - * block type [@link {@link BlockType#ENCODED_DATA} as well. - * @param block a block retrieved from cache or disk - * @param expectedBlockType the expected block type, or null to skip the - * check - */ - private void validateBlockType(HFileBlock block, - BlockType expectedBlockType) throws IOException { - if (expectedBlockType == null) { - return; - } - BlockType actualBlockType = block.getBlockType(); - if (expectedBlockType.isData() && actualBlockType.isData()) { - // We consider DATA to match ENCODED_DATA for the purpose of this - // verification. - return; - } - if (actualBlockType != expectedBlockType) { - throw new IOException("Expected block type " + expectedBlockType + ", " + - "but got " + actualBlockType + ": " + block); - } - } - - /** - * @return Last key in the file. May be null if file has no entries. Note that - * this is not the last row key, but rather the byte form of the last - * KeyValue. - */ - @Override - public byte[] getLastKey() { - return dataBlockIndexReader.isEmpty() ? null : lastKey; - } - - /** - * @return Midkey for this file. We work with block boundaries only so - * returned midkey is an approximation only. - * @throws IOException - */ - @Override - public byte[] midkey() throws IOException { - return dataBlockIndexReader.midkey(); - } - - @Override - public void close() throws IOException { - close(cacheConf.shouldEvictOnClose()); - } - - public void close(boolean evictOnClose) throws IOException { - PrefetchExecutor.cancel(path); - if (evictOnClose && cacheConf.isBlockCacheEnabled()) { - int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name); - if (LOG.isTraceEnabled()) { - LOG.trace("On close, file=" + name + " evicted=" + numEvicted - + " block(s)"); - } - } - fsBlockReader.closeStreams(); - } - - public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) { - return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction); - } - - /** For testing */ - @Override - HFileBlock.FSReader getUncachedBlockReader() { - return fsBlockReader; - } - - - protected abstract static class AbstractScannerV2 - extends AbstractHFileReader.Scanner { - protected HFileBlock block; - - @Override - public Cell getNextIndexedKey() { - return nextIndexedKey; - } - /** - * The next indexed key is to keep track of the indexed key of the next data block. - * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the - * current data block is the last data block. - * - * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet. - */ - protected Cell nextIndexedKey; - - public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks, - final boolean pread, final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); - } - - protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock); - - protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, - boolean rewind, Cell key, boolean seekBefore) throws IOException; - - @Override - public int seekTo(byte[] key, int offset, int length) throws IOException { - // Always rewind to the first key of the block, because the given key - // might be before or after the current key. - return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length)); - } - - @Override - public int reseekTo(byte[] key, int offset, int length) throws IOException { - return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length)); - } - - @Override - public int seekTo(Cell key) throws IOException { - return seekTo(key, true); - } - - @Override - public int reseekTo(Cell key) throws IOException { - int compared; - if (isSeeked()) { - compared = compareKey(reader.getComparator(), key); - if (compared < 1) { - // If the required key is less than or equal to current key, then - // don't do anything. - return compared; - } else { - // The comparison with no_next_index_key has to be checked - if (this.nextIndexedKey != null && - (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader - .getComparator() - .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) { - // The reader shall continue to scan the current data block instead - // of querying the - // block index as long as it knows the target key is strictly - // smaller than - // the next indexed key or the current data block is the last data - // block. - return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false); - } - } - } - // Don't rewind on a reseek operation, because reseek implies that we are - // always going forward in the file. - return seekTo(key, false); - } - - - /** - * An internal API function. Seek to the given key, optionally rewinding to - * the first key of the block before doing the seek. - * - * @param key - a cell representing the key that we need to fetch - * @param rewind whether to rewind to the first key of the block before - * doing the seek. If this is false, we are assuming we never go - * back, otherwise the result is undefined. - * @return -1 if the key is earlier than the first key of the file, - * 0 if we are at the given key, 1 if we are past the given key - * -2 if the key is earlier than the first key of the file while - * using a faked index key - * @throws IOException - */ - public int seekTo(Cell key, boolean rewind) throws IOException { - HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); - BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block, - cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding()); - if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { - // This happens if the key e.g. falls before the beginning of the file. - return -1; - } - return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(), - blockWithScanInfo.getNextIndexedKey(), rewind, key, false); - } - - @Override - public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length)); - } - - @Override - public boolean seekBefore(Cell key) throws IOException { - HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block, - cacheBlocks, pread, isCompaction, - ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction)); - if (seekToBlock == null) { - return false; - } - ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock); - - if (reader.getComparator() - .compareOnlyKeyPortion( - new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(), - firstKey.limit()), key) >= 0) { - long previousBlockOffset = seekToBlock.getPrevBlockOffset(); - // The key we are interested in - if (previousBlockOffset == -1) { - // we have a 'problem', the key we want is the first of the file. - return false; - } - - // It is important that we compute and pass onDiskSize to the block - // reader so that it does not have to read the header separately to - // figure out the size. - seekToBlock = reader.readBlock(previousBlockOffset, - seekToBlock.getOffset() - previousBlockOffset, cacheBlocks, - pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - // TODO shortcut: seek forward in this block to the last key of the - // block. - } - Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey)); - loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true); - return true; - } - - /** - * Scans blocks in the "scanned" section of the {@link HFile} until the next - * data block is found. - * - * @return the next block, or null if there are no more data blocks - * @throws IOException - */ - protected HFileBlock readNextDataBlock() throws IOException { - long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); - if (block == null) - return null; - - HFileBlock curBlock = block; - - do { - if (curBlock.getOffset() >= lastDataBlockOffset) - return null; - - if (curBlock.getOffset() < 0) { - throw new IOException("Invalid block file offset: " + block); - } - - // We are reading the next block without block type validation, because - // it might turn out to be a non-data block. - curBlock = reader.readBlock(curBlock.getOffset() - + curBlock.getOnDiskSizeWithHeader(), - curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, - isCompaction, true, null, getEffectiveDataBlockEncoding()); - } while (!curBlock.getBlockType().isData()); - - return curBlock; - } - - public DataBlockEncoding getEffectiveDataBlockEncoding() { - return ((HFileReaderV2)reader).getEffectiveEncodingInCache(isCompaction); - } - /** - * Compare the given key against the current key - * @param comparator - * @param key - * @param offset - * @param length - * @return -1 is the passed key is smaller than the current key, 0 if equal and 1 if greater - */ - public abstract int compareKey(KVComparator comparator, byte[] key, int offset, - int length); - - public abstract int compareKey(KVComparator comparator, Cell kv); - } - - /** - * Implementation of {@link HFileScanner} interface. - */ - protected static class ScannerV2 extends AbstractScannerV2 { - private HFileReaderV2 reader; - - public ScannerV2(HFileReaderV2 r, boolean cacheBlocks, - final boolean pread, final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); - this.reader = r; - } - - @Override - public Cell getKeyValue() { - if (!isSeeked()) - return null; - - KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position(), getCellBufSize()); - if (this.reader.shouldIncludeMemstoreTS()) { - ret.setSequenceId(currMemstoreTS); - } - return ret; - } - - protected int getCellBufSize() { - return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen; - } - - @Override - public ByteBuffer getKey() { - assertSeeked(); - return ByteBuffer.wrap( - blockBuffer.array(), - blockBuffer.arrayOffset() + blockBuffer.position() - + KEY_VALUE_LEN_SIZE, currKeyLen).slice(); - } - - @Override - public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { - return comparator.compareFlatKey(key, offset, length, blockBuffer.array(), - blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen); - } - - @Override - public ByteBuffer getValue() { - assertSeeked(); - return ByteBuffer.wrap( - blockBuffer.array(), - blockBuffer.arrayOffset() + blockBuffer.position() - + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice(); - } - - protected void setNonSeekedState() { - block = null; - blockBuffer = null; - currKeyLen = 0; - currValueLen = 0; - currMemstoreTS = 0; - currMemstoreTSLen = 0; - } - - /** - * Go to the next key/value in the block section. Loads the next block if - * necessary. If successful, {@link #getKey()} and {@link #getValue()} can - * be called. - * - * @return true if successfully navigated to the next key/value - */ - @Override - public boolean next() throws IOException { - assertSeeked(); - - try { - blockBuffer.position(getNextCellStartPosition()); - } catch (IllegalArgumentException e) { - LOG.error("Current pos = " + blockBuffer.position() - + "; currKeyLen = " + currKeyLen + "; currValLen = " - + currValueLen + "; block limit = " + blockBuffer.limit() - + "; HFile name = " + reader.getName() - + "; currBlock currBlockOffset = " + block.getOffset()); - throw e; - } - - if (blockBuffer.remaining() <= 0) { - long lastDataBlockOffset = - reader.getTrailer().getLastDataBlockOffset(); - - if (block.getOffset() >= lastDataBlockOffset) { - setNonSeekedState(); - return false; - } - - // read the next block - HFileBlock nextBlock = readNextDataBlock(); - if (nextBlock == null) { - setNonSeekedState(); - return false; - } - - updateCurrBlock(nextBlock); - return true; - } - - // We are still in the same block. - readKeyValueLen(); - return true; - } - - protected int getNextCellStartPosition() { - return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen - + currMemstoreTSLen; - } - - /** - * Positions this scanner at the start of the file. - * - * @return false if empty file; i.e. a call to next would return false and - * the current key and value are undefined. - * @throws IOException - */ - @Override - public boolean seekTo() throws IOException { - if (reader == null) { - return false; - } - - if (reader.getTrailer().getEntryCount() == 0) { - // No data blocks. - return false; - } - - long firstDataBlockOffset = - reader.getTrailer().getFirstDataBlockOffset(); - if (block != null && block.getOffset() == firstDataBlockOffset) { - blockBuffer.rewind(); - readKeyValueLen(); - return true; - } - - block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - if (block.getOffset() < 0) { - throw new IOException("Invalid block offset: " + block.getOffset()); - } - updateCurrBlock(block); - return true; - } - - @Override - protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, - boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (block == null || block.getOffset() != seekToBlock.getOffset()) { - updateCurrBlock(seekToBlock); - } else if (rewind) { - blockBuffer.rewind(); - } - - // Update the nextIndexedKey - this.nextIndexedKey = nextIndexedKey; - return blockSeek(key, seekBefore); - } - - /** - * Updates the current block to be the given {@link HFileBlock}. Seeks to - * the the first key/value pair. - * - * @param newBlock the block to make current - */ - protected void updateCurrBlock(HFileBlock newBlock) { - block = newBlock; - - // sanity check - if (block.getBlockType() != BlockType.DATA) { - throw new IllegalStateException("ScannerV2 works only on data " + - "blocks, got " + block.getBlockType() + "; " + - "fileName=" + reader.name + ", " + - "dataBlockEncoder=" + reader.dataBlockEncoder + ", " + - "isCompaction=" + isCompaction); - } - - blockBuffer = block.getBufferWithoutHeader(); - readKeyValueLen(); - blockFetches++; - - // Reset the next indexed key - this.nextIndexedKey = null; - } - - protected void readKeyValueLen() { - blockBuffer.mark(); - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen); - readMvccVersion(); - if (currKeyLen < 0 || currValueLen < 0 - || currKeyLen > blockBuffer.limit() - || currValueLen > blockBuffer.limit()) { - throw new IllegalStateException("Invalid currKeyLen " + currKeyLen - + " or currValueLen " + currValueLen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() - + ", position: " + blockBuffer.position() + " (without header)."); - } - blockBuffer.reset(); - } - - protected void readMvccVersion() { - if (this.reader.shouldIncludeMemstoreTS()) { - if (this.reader.decodeMemstoreTS) { - try { - currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position()); - currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS); - } catch (Exception e) { - throw new RuntimeException("Error reading memstore timestamp", e); - } - } else { - currMemstoreTS = 0; - currMemstoreTSLen = 1; - } - } - } - - /** - * Within a loaded block, seek looking for the last key that is smaller than - * (or equal to?) the key we are interested in. - * - * A note on the seekBefore: if you have seekBefore = true, AND the first - * key in the block = key, then you'll get thrown exceptions. The caller has - * to check for that case and load the previous block as appropriate. - * - * @param key - * the key to find - * @param seekBefore - * find the key before the given key in case of exact match. - * @return 0 in case of an exact key match, 1 in case of an inexact match, - * -2 in case of an inexact match and furthermore, the input key - * less than the first key of current block(e.g. using a faked index - * key) - */ - protected int blockSeek(Cell key, boolean seekBefore) { - int klen, vlen; - long memstoreTS = 0; - int memstoreTSLen = 0; - int lastKeyValueSize = -1; - KeyValue.KeyOnlyKeyValue keyOnlykv = new KeyValue.KeyOnlyKeyValue(); - do { - blockBuffer.mark(); - klen = blockBuffer.getInt(); - vlen = blockBuffer.getInt(); - blockBuffer.reset(); - if (this.reader.shouldIncludeMemstoreTS()) { - if (this.reader.decodeMemstoreTS) { - try { - int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position() - + KEY_VALUE_LEN_SIZE + klen + vlen; - memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset); - memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); - } catch (Exception e) { - throw new RuntimeException("Error reading memstore timestamp", e); - } - } else { - memstoreTS = 0; - memstoreTSLen = 1; - } - } - - int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE; - keyOnlykv.setKey(blockBuffer.array(), keyOffset, klen); - int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlykv); - - if (comp == 0) { - if (seekBefore) { - if (lastKeyValueSize < 0) { - throw new IllegalStateException("blockSeek with seekBefore " - + "at the first key of the block: key=" - + CellUtil.getCellKeyAsString(key) - + ", blockOffset=" + block.getOffset() + ", onDiskSize=" - + block.getOnDiskSizeWithHeader()); - } - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); - readKeyValueLen(); - return 1; // non exact match. - } - currKeyLen = klen; - currValueLen = vlen; - if (this.reader.shouldIncludeMemstoreTS()) { - currMemstoreTS = memstoreTS; - currMemstoreTSLen = memstoreTSLen; - } - return 0; // indicate exact match - } else if (comp < 0) { - if (lastKeyValueSize > 0) - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); - readKeyValueLen(); - if (lastKeyValueSize == -1 && blockBuffer.position() == 0 - && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) { - return HConstants.INDEX_KEY_MAGIC; - } - return 1; - } - - // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; - blockBuffer.position(blockBuffer.position() + lastKeyValueSize); - } while (blockBuffer.remaining() > 0); - - // Seek to the last key we successfully read. This will happen if this is - // the last key/value pair in the file, in which case the following call - // to next() has to return false. - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); - readKeyValueLen(); - return 1; // didn't exactly find it. - } - - @Override - protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) { - ByteBuffer buffer = curBlock.getBufferWithoutHeader(); - // It is safe to manipulate this buffer because we own the buffer object. - buffer.rewind(); - int klen = buffer.getInt(); - buffer.getInt(); - ByteBuffer keyBuff = buffer.slice(); - keyBuff.limit(klen); - keyBuff.rewind(); - return keyBuff; - } - - @Override - public String getKeyString() { - return Bytes.toStringBinary(blockBuffer.array(), - blockBuffer.arrayOffset() + blockBuffer.position() - + KEY_VALUE_LEN_SIZE, currKeyLen); - } - - @Override - public String getValueString() { - return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen, - currValueLen); - } - - @Override - public int compareKey(KVComparator comparator, Cell key) { - return comparator.compareOnlyKeyPortion( - key, - new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen)); - } - } - - /** - * ScannerV2 that operates on encoded data blocks. - */ - protected static class EncodedScannerV2 extends AbstractScannerV2 { - private final HFileBlockDecodingContext decodingCtx; - private final DataBlockEncoder.EncodedSeeker seeker; - private final DataBlockEncoder dataBlockEncoder; - protected final HFileContext meta; - - public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks, - boolean pread, boolean isCompaction, HFileContext meta) { - super(reader, cacheBlocks, pread, isCompaction); - DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding(); - dataBlockEncoder = encoding.getEncoder(); - decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta); - seeker = dataBlockEncoder.createSeeker( - reader.getComparator(), decodingCtx); - this.meta = meta; - } - - @Override - public boolean isSeeked(){ - return this.block != null; - } - - /** - * Updates the current block to be the given {@link HFileBlock}. Seeks to - * the the first key/value pair. - * - * @param newBlock the block to make current - * @throws CorruptHFileException - */ - private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { - block = newBlock; - - // sanity checks - if (block.getBlockType() != BlockType.ENCODED_DATA) { - throw new IllegalStateException( - "EncodedScanner works only on encoded data blocks"); - } - short dataBlockEncoderId = block.getDataBlockEncodingId(); - if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { - String encoderCls = dataBlockEncoder.getClass().getName(); - throw new CorruptHFileException("Encoder " + encoderCls - + " doesn't support data block encoding " - + DataBlockEncoding.getNameFromId(dataBlockEncoderId)); - } - - seeker.setCurrentBuffer(getEncodedBuffer(newBlock)); - blockFetches++; - - // Reset the next indexed key - this.nextIndexedKey = null; - } - - private ByteBuffer getEncodedBuffer(HFileBlock newBlock) { - ByteBuffer origBlock = newBlock.getBufferReadOnly(); - ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(), - origBlock.arrayOffset() + newBlock.headerSize() + - DataBlockEncoding.ID_SIZE, - newBlock.getUncompressedSizeWithoutHeader() - - DataBlockEncoding.ID_SIZE).slice(); - return encodedBlock; - } - - @Override - public boolean seekTo() throws IOException { - if (reader == null) { - return false; - } - - if (reader.getTrailer().getEntryCount() == 0) { - // No data blocks. - return false; - } - - long firstDataBlockOffset = - reader.getTrailer().getFirstDataBlockOffset(); - if (block != null && block.getOffset() == firstDataBlockOffset) { - seeker.rewind(); - return true; - } - - block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - if (block.getOffset() < 0) { - throw new IOException("Invalid block offset: " + block.getOffset()); - } - updateCurrentBlock(block); - return true; - } - - @Override - public boolean next() throws IOException { - boolean isValid = seeker.next(); - if (!isValid) { - block = readNextDataBlock(); - isValid = block != null; - if (isValid) { - updateCurrentBlock(block); - } - } - return isValid; - } - - @Override - public ByteBuffer getKey() { - assertValidSeek(); - return seeker.getKeyDeepCopy(); - } - - @Override - public int compareKey(KVComparator comparator, byte[] key, int offset, int length) { - return seeker.compareKey(comparator, key, offset, length); - } - - @Override - public ByteBuffer getValue() { - assertValidSeek(); - return seeker.getValueShallowCopy(); - } - - @Override - public Cell getKeyValue() { - if (block == null) { - return null; - } - return seeker.getKeyValue(); - } - - @Override - public String getKeyString() { - ByteBuffer keyBuffer = getKey(); - return Bytes.toStringBinary(keyBuffer.array(), - keyBuffer.arrayOffset(), keyBuffer.limit()); - } - - @Override - public String getValueString() { - ByteBuffer valueBuffer = getValue(); - return Bytes.toStringBinary(valueBuffer.array(), - valueBuffer.arrayOffset(), valueBuffer.limit()); - } - - private void assertValidSeek() { - if (block == null) { - throw new NotSeekedException(); - } - } - - @Override - protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) { - return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock)); - } - - @Override - protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, - boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (block == null || block.getOffset() != seekToBlock.getOffset()) { - updateCurrentBlock(seekToBlock); - } else if (rewind) { - seeker.rewind(); - } - this.nextIndexedKey = nextIndexedKey; - return seeker.seekToKeyInBlock(key, seekBefore); - } - - @Override - public int compareKey(KVComparator comparator, Cell key) { - return seeker.compareKey(comparator, key); - } - } - - /** - * Returns a buffer with the Bloom filter metadata. The caller takes - * ownership of the buffer. - */ - @Override - public DataInput getGeneralBloomFilterMetadata() throws IOException { - return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META); - } - - @Override - public DataInput getDeleteBloomFilterMetadata() throws IOException { - return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META); - } - - private DataInput getBloomFilterMetadata(BlockType blockType) - throws IOException { - if (blockType != BlockType.GENERAL_BLOOM_META && - blockType != BlockType.DELETE_FAMILY_BLOOM_META) { - throw new RuntimeException("Block Type: " + blockType.toString() + - " is not supported") ; - } - - for (HFileBlock b : loadOnOpenBlocks) - if (b.getBlockType() == blockType) - return b.getByteStream(); - return null; - } - - @Override - public boolean isFileInfoLoaded() { - return true; // We load file info in constructor in version 2. - } - - /** - * Validates that the minor version is within acceptable limits. - * Otherwise throws an Runtime exception - */ - private void validateMinorVersion(Path path, int minorVersion) { - if (minorVersion < MIN_MINOR_VERSION || - minorVersion > MAX_MINOR_VERSION) { - String msg = "Minor version for path " + path + - " is expected to be between " + - MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION + - " but is found to be " + minorVersion; - LOG.error(msg); - throw new RuntimeException(msg); - } - } - - @Override - public int getMajorVersion() { - return 2; - } - - @Override - public HFileContext getFileContext() { - return hfileContext; - } - - /** - * Returns false if block prefetching was requested for this file and has - * not completed, true otherwise - */ - @VisibleForTesting - boolean prefetchComplete() { - return PrefetchExecutor.isCompleted(path); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java deleted file mode 100644 index b28d8c1..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.IOException; -import java.security.Key; -import java.security.KeyException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.crypto.Cipher; -import org.apache.hadoop.hbase.io.crypto.Encryption; -import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.security.EncryptionUtil; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.ByteBufferUtils; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.WritableUtils; - -/** - * {@link HFile} reader for version 3. - */ -@InterfaceAudience.Private -public class HFileReaderV3 extends HFileReaderV2 { - - private static final Log LOG = LogFactory.getLog(HFileReaderV3.class); - - public static final int MAX_MINOR_VERSION = 0; - - /** - * Opens a HFile. You must load the index before you can use it by calling - * {@link #loadFileInfo()}. - * @param path - * Path to HFile. - * @param trailer - * File trailer. - * @param fsdis - * input stream. - * @param size - * Length of the stream. - * @param cacheConf - * Cache configuration. - * @param hfs - * The file system. - * @param conf - * Configuration - */ - public HFileReaderV3(final Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis, - final long size, final CacheConfig cacheConf, final HFileSystem hfs, - final Configuration conf) throws IOException { - super(path, trailer, fsdis, size, cacheConf, hfs, conf); - byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN); - // max tag length is not present in the HFile means tags were not at all written to file. - if (tmp != null) { - hfileContext.setIncludesTags(true); - tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED); - if (tmp != null && Bytes.toBoolean(tmp)) { - hfileContext.setCompressTags(true); - } - } - } - - @Override - protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize, - HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException { - trailer.expectMajorVersion(3); - HFileContextBuilder builder = new HFileContextBuilder() - .withIncludesMvcc(this.includesMemstoreTS) - .withHBaseCheckSum(true) - .withCompression(this.compressAlgo); - - // Check for any key material available - byte[] keyBytes = trailer.getEncryptionKey(); - if (keyBytes != null) { - Encryption.Context cryptoContext = Encryption.newContext(conf); - Key key; - String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, - User.getCurrent().getShortName()); - try { - // First try the master key - key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes); - } catch (KeyException e) { - // If the current master key fails to unwrap, try the alternate, if - // one is configured - if (LOG.isDebugEnabled()) { - LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'"); - } - String alternateKeyName = - conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY); - if (alternateKeyName != null) { - try { - key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes); - } catch (KeyException ex) { - throw new IOException(ex); - } - } else { - throw new IOException(e); - } - } - // Use the algorithm the key wants - Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm()); - if (cipher == null) { - throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available"); - } - cryptoContext.setCipher(cipher); - cryptoContext.setKey(key); - builder.withEncryptionContext(cryptoContext); - } - - HFileContext context = builder.build(); - - if (LOG.isTraceEnabled()) { - LOG.trace("Reader" + (path != null ? " for " + path : "" ) + - " initialized with cacheConf: " + cacheConf + - " comparator: " + comparator.getClass().getSimpleName() + - " fileContext: " + context); - } - - return context; - } - - /** - * Create a Scanner on this file. No seeks or reads are done on creation. Call - * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is - * nothing to clean up in a Scanner. Letting go of your references to the - * scanner is sufficient. - * @param cacheBlocks - * True if we should cache blocks read in by this scanner. - * @param pread - * Use positional read rather than seek+read if true (pread is better - * for random reads, seek+read is better scanning). - * @param isCompaction - * is scanner being used for a compaction? - * @return Scanner on this file. - */ - @Override - public HFileScanner getScanner(boolean cacheBlocks, final boolean pread, - final boolean isCompaction) { - if (dataBlockEncoder.useEncodedScanner()) { - return new EncodedScannerV3(this, cacheBlocks, pread, isCompaction, this.hfileContext); - } - return new ScannerV3(this, cacheBlocks, pread, isCompaction); - } - - /** - * Implementation of {@link HFileScanner} interface. - */ - protected static class ScannerV3 extends ScannerV2 { - - private HFileReaderV3 reader; - private int currTagsLen; - - public ScannerV3(HFileReaderV3 r, boolean cacheBlocks, final boolean pread, - final boolean isCompaction) { - super(r, cacheBlocks, pread, isCompaction); - this.reader = r; - } - - @Override - protected int getCellBufSize() { - int kvBufSize = super.getCellBufSize(); - if (reader.hfileContext.isIncludesTags()) { - kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen; - } - return kvBufSize; - } - - protected void setNonSeekedState() { - super.setNonSeekedState(); - currTagsLen = 0; - } - - @Override - protected int getNextCellStartPosition() { - int nextKvPos = super.getNextCellStartPosition(); - if (reader.hfileContext.isIncludesTags()) { - nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen; - } - return nextKvPos; - } - - protected void readKeyValueLen() { - blockBuffer.mark(); - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit() - || currValueLen > blockBuffer.limit()) { - throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen " - + currValueLen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " - + blockBuffer.position() + " (without header)."); - } - ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen); - if (reader.hfileContext.isIncludesTags()) { - // Read short as unsigned, high byte first - currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); - if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) { - throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " - + blockBuffer.position() + " (without header)."); - } - ByteBufferUtils.skip(blockBuffer, currTagsLen); - } - readMvccVersion(); - blockBuffer.reset(); - } - - /** - * Within a loaded block, seek looking for the last key that is smaller than - * (or equal to?) the key we are interested in. - * A note on the seekBefore: if you have seekBefore = true, AND the first - * key in the block = key, then you'll get thrown exceptions. The caller has - * to check for that case and load the previous block as appropriate. - * @param key - * the key to find - * @param seekBefore - * find the key before the given key in case of exact match. - * @return 0 in case of an exact key match, 1 in case of an inexact match, - * -2 in case of an inexact match and furthermore, the input key - * less than the first key of current block(e.g. using a faked index - * key) - */ - @Override - protected int blockSeek(Cell key, boolean seekBefore) { - int klen, vlen, tlen = 0; - long memstoreTS = 0; - int memstoreTSLen = 0; - int lastKeyValueSize = -1; - KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue(); - do { - blockBuffer.mark(); - klen = blockBuffer.getInt(); - vlen = blockBuffer.getInt(); - if (klen < 0 || vlen < 0 || klen > blockBuffer.limit() - || vlen > blockBuffer.limit()) { - throw new IllegalStateException("Invalid klen " + klen + " or vlen " - + vlen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " - + blockBuffer.position() + " (without header)."); - } - ByteBufferUtils.skip(blockBuffer, klen + vlen); - if (reader.hfileContext.isIncludesTags()) { - // Read short as unsigned, high byte first - tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); - if (tlen < 0 || tlen > blockBuffer.limit()) { - throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " - + blockBuffer.position() + " (without header)."); - } - ByteBufferUtils.skip(blockBuffer, tlen); - } - if (this.reader.shouldIncludeMemstoreTS()) { - if (this.reader.decodeMemstoreTS) { - try { - memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position()); - memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); - } catch (Exception e) { - throw new RuntimeException("Error reading memstore timestamp", e); - } - } else { - memstoreTS = 0; - memstoreTSLen = 1; - } - } - blockBuffer.reset(); - int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2); - keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen); - int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv); - - if (comp == 0) { - if (seekBefore) { - if (lastKeyValueSize < 0) { - throw new IllegalStateException("blockSeek with seekBefore " - + "at the first key of the block: key=" - + CellUtil.getCellKeyAsString(key) - + ", blockOffset=" + block.getOffset() + ", onDiskSize=" - + block.getOnDiskSizeWithHeader()); - } - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); - readKeyValueLen(); - return 1; // non exact match. - } - currKeyLen = klen; - currValueLen = vlen; - currTagsLen = tlen; - if (this.reader.shouldIncludeMemstoreTS()) { - currMemstoreTS = memstoreTS; - currMemstoreTSLen = memstoreTSLen; - } - return 0; // indicate exact match - } else if (comp < 0) { - if (lastKeyValueSize > 0) - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); - readKeyValueLen(); - if (lastKeyValueSize == -1 && blockBuffer.position() == 0) { - return HConstants.INDEX_KEY_MAGIC; - } - return 1; - } - - // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; - // include tag length also if tags included with KV - if (reader.hfileContext.isIncludesTags()) { - lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT; - } - blockBuffer.position(blockBuffer.position() + lastKeyValueSize); - } while (blockBuffer.remaining() > 0); - - // Seek to the last key we successfully read. This will happen if this is - // the last key/value pair in the file, in which case the following call - // to next() has to return false. - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); - readKeyValueLen(); - return 1; // didn't exactly find it. - } - } - - /** - * ScannerV3 that operates on encoded data blocks. - */ - protected static class EncodedScannerV3 extends EncodedScannerV2 { - public EncodedScannerV3(HFileReaderV3 reader, boolean cacheBlocks, boolean pread, - boolean isCompaction, HFileContext context) { - super(reader, cacheBlocks, pread, isCompaction, context); - } - } - - @Override - public int getMajorVersion() { - return 3; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java new file mode 100644 index 0000000..047022d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java @@ -0,0 +1,40 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitationsME + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + +public class HFileWriterFactory extends HFile.WriterFactory { + HFileWriterFactory(Configuration conf, CacheConfig cacheConf) { + super(conf, cacheConf); + } + + @Override + public HFile.Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream, + KVComparator comparator, HFileContext context) + throws IOException { + return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, context); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java new file mode 100644 index 0000000..e2b6efd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -0,0 +1,641 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.security.EncryptionUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.BloomFilterWriter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.Writable; + +/** + * Common functionality needed by all versions of {@link HFile} writers. + */ +@InterfaceAudience.Private +public class HFileWriterImpl implements HFile.Writer { + private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class); + + /** The Cell previously appended. Becomes the last cell in the file.*/ + protected Cell lastCell = null; + + /** FileSystem stream to write into. */ + protected FSDataOutputStream outputStream; + + /** True if we opened the outputStream (and so will close it). */ + protected final boolean closeOutputStream; + + /** A "file info" block: a key-value map of file-wide metadata. */ + protected FileInfo fileInfo = new HFile.FileInfo(); + + /** Total # of key/value entries, i.e. how many times add() was called. */ + protected long entryCount = 0; + + /** Used for calculating the average key length. */ + protected long totalKeyLength = 0; + + /** Used for calculating the average value length. */ + protected long totalValueLength = 0; + + /** Total uncompressed bytes, maybe calculate a compression ratio later. */ + protected long totalUncompressedBytes = 0; + + /** Key comparator. Used to ensure we write in order. */ + protected final KVComparator comparator; + + /** Meta block names. */ + protected List metaNames = new ArrayList(); + + /** {@link Writable}s representing meta block data. */ + protected List metaData = new ArrayList(); + + /** + * First cell in a block. + * This reference should be short-lived since we write hfiles in a burst. + */ + protected Cell firstCellInBlock = null; + + + /** May be null if we were passed a stream. */ + protected final Path path; + + /** Cache configuration for caching data on write. */ + protected final CacheConfig cacheConf; + + /** + * Name for this object used when logging or in toString. Is either + * the result of a toString on stream or else name of passed file Path. + */ + protected final String name; + + /** + * The data block encoding which will be used. + * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding. + */ + protected final HFileDataBlockEncoder blockEncoder; + + protected final HFileContext hFileContext; + + private int maxTagsLength = 0; + + /** KeyValue version in FileInfo */ + public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); + + /** Version for KeyValue which includes memstore timestamp */ + public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; + + /** Inline block writers for multi-level block index and compound Blooms. */ + private List inlineBlockWriters = new ArrayList(); + + /** block writer */ + protected HFileBlock.Writer fsBlockWriter; + + private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; + private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; + + /** The offset of the first data block or -1 if the file is empty. */ + private long firstDataBlockOffset = -1; + + /** The offset of the last data block or 0 if the file is empty. */ + protected long lastDataBlockOffset; + + /** + * The last(stop) Cell of the previous data block. + * This reference should be short-lived since we write hfiles in a burst. + */ + private Cell lastCellOfPreviousBlock = null; + + /** Additional data items to be written to the "load-on-open" section. */ + private List additionalLoadOnOpenData = new ArrayList(); + + protected long maxMemstoreTS = 0; + + public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, + FSDataOutputStream outputStream, + KVComparator comparator, HFileContext fileContext) { + this.outputStream = outputStream; + this.path = path; + this.name = path != null ? path.getName() : outputStream.toString(); + this.hFileContext = fileContext; + DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); + if (encoding != DataBlockEncoding.NONE) { + this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); + } else { + this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; + } + this.comparator = comparator != null ? comparator + : KeyValue.COMPARATOR; + + closeOutputStream = path != null; + this.cacheConf = cacheConf; + finishInit(conf); + if (LOG.isTraceEnabled()) { + LOG.trace("Writer" + (path != null ? " for " + path : "") + + " initialized with cacheConf: " + cacheConf + + " comparator: " + comparator.getClass().getSimpleName() + + " fileContext: " + fileContext); + } + } + + /** + * Add to the file info. All added key/value pairs can be obtained using + * {@link HFile.Reader#loadFileInfo()}. + * + * @param k Key + * @param v Value + * @throws IOException in case the key or the value are invalid + */ + @Override + public void appendFileInfo(final byte[] k, final byte[] v) + throws IOException { + fileInfo.append(k, v, true); + } + + /** + * Sets the file info offset in the trailer, finishes up populating fields in + * the file info, and writes the file info into the given data output. The + * reason the data output is not always {@link #outputStream} is that we store + * file info as a block in version 2. + * + * @param trailer fixed file trailer + * @param out the data output to write the file info to + * @throws IOException + */ + protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) + throws IOException { + trailer.setFileInfoOffset(outputStream.getPos()); + finishFileInfo(); + fileInfo.write(out); + } + + /** + * Checks that the given Cell's key does not violate the key order. + * + * @param cell Cell whose key to check. + * @return true if the key is duplicate + * @throws IOException if the key or the key order is wrong + */ + protected boolean checkKey(final Cell cell) throws IOException { + boolean isDuplicateKey = false; + + if (cell == null) { + throw new IOException("Key cannot be null or empty"); + } + if (lastCell != null) { + int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell); + + if (keyComp > 0) { + throw new IOException("Added a key not lexically larger than" + + " previous. Current cell = " + cell + ", lastCell = " + lastCell); + } else if (keyComp == 0) { + isDuplicateKey = true; + } + } + return isDuplicateKey; + } + + /** Checks the given value for validity. */ + protected void checkValue(final byte[] value, final int offset, + final int length) throws IOException { + if (value == null) { + throw new IOException("Value cannot be null"); + } + } + + /** + * @return Path or null if we were passed a stream rather than a Path. + */ + @Override + public Path getPath() { + return path; + } + + @Override + public String toString() { + return "writer=" + (path != null ? path.toString() : null) + ", name=" + + name + ", compression=" + hFileContext.getCompression().getName(); + } + + public static Compression.Algorithm compressionByName(String algoName) { + if (algoName == null) + return HFile.DEFAULT_COMPRESSION_ALGORITHM; + return Compression.getCompressionAlgorithmByName(algoName); + } + + /** A helper method to create HFile output streams in constructors */ + protected static FSDataOutputStream createOutputStream(Configuration conf, + FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { + FsPermission perms = FSUtils.getFilePermissions(fs, conf, + HConstants.DATA_FILE_UMASK_KEY); + return FSUtils.create(fs, path, perms, favoredNodes); + } + + /** Additional initialization steps */ + protected void finishInit(final Configuration conf) { + if (fsBlockWriter != null) { + throw new IllegalStateException("finishInit called twice"); + } + + fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext); + + // Data block index writer + boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); + dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, + cacheIndexesOnWrite ? cacheConf : null, + cacheIndexesOnWrite ? name : null); + dataBlockIndexWriter.setMaxChunkSize( + HFileBlockIndex.getMaxChunkSize(conf)); + inlineBlockWriters.add(dataBlockIndexWriter); + + // Meta data block index writer + metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); + if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf); + } + + /** + * At a block boundary, write all the inline blocks and opens new block. + * + * @throws IOException + */ + protected void checkBlockBoundary() throws IOException { + if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return; + finishBlock(); + writeInlineBlocks(false); + newBlock(); + } + + /** Clean up the current data block */ + private void finishBlock() throws IOException { + if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; + + // Update the first data block offset for scanning. + if (firstDataBlockOffset == -1) { + firstDataBlockOffset = outputStream.getPos(); + } + // Update the last data block offset + lastDataBlockOffset = outputStream.getPos(); + fsBlockWriter.writeHeaderAndData(outputStream); + int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader(); + Cell indexEntry = + CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock); + dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), + lastDataBlockOffset, onDiskSize); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + if (cacheConf.shouldCacheDataOnWrite()) { + doCacheOnWrite(lastDataBlockOffset); + } + } + + /** Gives inline block writers an opportunity to contribute blocks. */ + private void writeInlineBlocks(boolean closing) throws IOException { + for (InlineBlockWriter ibw : inlineBlockWriters) { + while (ibw.shouldWriteBlock(closing)) { + long offset = outputStream.getPos(); + boolean cacheThisBlock = ibw.getCacheOnWrite(); + ibw.writeInlineBlock(fsBlockWriter.startWriting( + ibw.getInlineBlockType())); + fsBlockWriter.writeHeaderAndData(outputStream); + ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(), + fsBlockWriter.getUncompressedSizeWithoutHeader()); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + + if (cacheThisBlock) { + doCacheOnWrite(offset); + } + } + } + } + + /** + * Caches the last written HFile block. + * @param offset the offset of the block we want to cache. Used to determine + * the cache key. + */ + private void doCacheOnWrite(long offset) { + HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); + cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset), cacheFormatBlock); + } + + /** + * Ready a new block for writing. + * + * @throws IOException + */ + protected void newBlock() throws IOException { + // This is where the next block begins. + fsBlockWriter.startWriting(BlockType.DATA); + firstCellInBlock = null; + if (lastCell != null) { + lastCellOfPreviousBlock = lastCell; + } + } + + /** + * Add a meta block to the end of the file. Call before close(). Metadata + * blocks are expensive. Fill one with a bunch of serialized data rather than + * do a metadata block per metadata instance. If metadata is small, consider + * adding to file info using {@link #appendFileInfo(byte[], byte[])} + * + * @param metaBlockName + * name of the block + * @param content + * will call readFields to get data later (DO NOT REUSE) + */ + @Override + public void appendMetaBlock(String metaBlockName, Writable content) { + byte[] key = Bytes.toBytes(metaBlockName); + int i; + for (i = 0; i < metaNames.size(); ++i) { + // stop when the current key is greater than our own + byte[] cur = metaNames.get(i); + if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, + key.length) > 0) { + break; + } + } + metaNames.add(i, key); + metaData.add(i, content); + } + + @Override + public void close() throws IOException { + if (outputStream == null) { + return; + } + // Save data block encoder metadata in the file info. + blockEncoder.saveMetadata(this); + // Write out the end of the data blocks, then write meta data blocks. + // followed by fileinfo, data block index and meta block index. + + finishBlock(); + writeInlineBlocks(true); + + FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); + + // Write out the metadata blocks if any. + if (!metaNames.isEmpty()) { + for (int i = 0; i < metaNames.size(); ++i) { + // store the beginning offset + long offset = outputStream.getPos(); + // write the metadata content + DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META); + metaData.get(i).write(dos); + + fsBlockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + + // Add the new meta block to the meta index. + metaBlockIndexWriter.addEntry(metaNames.get(i), offset, + fsBlockWriter.getOnDiskSizeWithHeader()); + } + } + + // Load-on-open section. + + // Data block index. + // + // In version 2, this section of the file starts with the root level data + // block index. We call a function that writes intermediate-level blocks + // first, then root level, and returns the offset of the root level block + // index. + + long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); + trailer.setLoadOnOpenOffset(rootIndexOffset); + + // Meta block index. + metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting( + BlockType.ROOT_INDEX), "meta"); + fsBlockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + + if (this.hFileContext.isIncludesMvcc()) { + appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); + appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); + } + + // File info + writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO)); + fsBlockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + + // Load-on-open data supplied by higher levels, e.g. Bloom filters. + for (BlockWritable w : additionalLoadOnOpenData){ + fsBlockWriter.writeBlock(w, outputStream); + totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + } + + // Now finish off the trailer. + trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); + trailer.setUncompressedDataIndexSize( + dataBlockIndexWriter.getTotalUncompressedSize()); + trailer.setFirstDataBlockOffset(firstDataBlockOffset); + trailer.setLastDataBlockOffset(lastDataBlockOffset); + trailer.setComparatorClass(comparator.getClass()); + trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); + + + finishClose(trailer); + + fsBlockWriter.release(); + } + + @Override + public void addInlineBlockWriter(InlineBlockWriter ibw) { + inlineBlockWriters.add(ibw); + } + + @Override + public void addGeneralBloomFilter(final BloomFilterWriter bfw) { + this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); + } + + @Override + public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { + this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); + } + + private void addBloomFilter(final BloomFilterWriter bfw, + final BlockType blockType) { + if (bfw.getKeyCount() <= 0) + return; + + if (blockType != BlockType.GENERAL_BLOOM_META && + blockType != BlockType.DELETE_FAMILY_BLOOM_META) { + throw new RuntimeException("Block Type: " + blockType.toString() + + "is not supported"); + } + additionalLoadOnOpenData.add(new BlockWritable() { + @Override + public BlockType getBlockType() { + return blockType; + } + + @Override + public void writeToBlock(DataOutput out) throws IOException { + bfw.getMetaWriter().write(out); + Writable dataWriter = bfw.getDataWriter(); + if (dataWriter != null) + dataWriter.write(out); + } + }); + } + + @Override + public HFileContext getFileContext() { + return hFileContext; + } + + /** + * Add key/value to file. Keys must be added in an order that agrees with the + * Comparator passed on construction. + * + * @param cell + * Cell to add. Cannot be empty nor null. + * @throws IOException + */ + @Override + public void append(final Cell cell) throws IOException { + byte[] value = cell.getValueArray(); + int voffset = cell.getValueOffset(); + int vlength = cell.getValueLength(); + // checkKey uses comparator to check we are writing in order. + boolean dupKey = checkKey(cell); + checkValue(value, voffset, vlength); + if (!dupKey) { + checkBlockBoundary(); + } + + if (!fsBlockWriter.isWriting()) { + newBlock(); + } + + fsBlockWriter.write(cell); + + totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell); + totalValueLength += vlength; + + // Are we the first key in this block? + if (firstCellInBlock == null) { + // If cell is big, block will be closed and this firstCellInBlock reference will only last + // a short while. + firstCellInBlock = cell; + } + + // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinetly? + lastCell = cell; + entryCount++; + this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); + int tagsLength = cell.getTagsLength(); + if (tagsLength > this.maxTagsLength) { + this.maxTagsLength = tagsLength; + } + } + + protected void finishFileInfo() throws IOException { + if (lastCell != null) { + // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean + // byte buffer. Won't take a tuple. + byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); + fileInfo.append(FileInfo.LASTKEY, lastKey, false); + } + + // Average key length. + int avgKeyLen = + entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); + fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); + fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), + false); + + // Average value length. + int avgValueLen = + entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); + fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); + if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) { + // In case of Prefix Tree encoding, we always write tags information into HFiles even if all + // KVs are having no tags. + fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); + } else if (hFileContext.isIncludesTags()) { + // When tags are not being written in this file, MAX_TAGS_LEN is excluded + // from the FileInfo + fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); + boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) + && hFileContext.isCompressTags(); + fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); + } + } + + protected int getMajorVersion() { + return 3; + } + + protected int getMinorVersion() { + return HFileReaderImpl.MAX_MINOR_VERSION; + } + + protected void finishClose(FixedFileTrailer trailer) throws IOException { + // Write out encryption metadata before finalizing if we have a valid crypto context + Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); + if (cryptoContext != Encryption.Context.NONE) { + // Wrap the context's key and write it as the encryption metadata, the wrapper includes + // all information needed for decryption + trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(), + cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, + User.getCurrent().getShortName()), + cryptoContext.getKey())); + } + // Now we can finish the close + trailer.setMetaIndexCount(metaNames.size()); + trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize()); + trailer.setEntryCount(entryCount); + trailer.setCompressionCodec(hFileContext.getCompression()); + + trailer.serialize(outputStream); + + if (closeOutputStream) { + outputStream.close(); + outputStream = null; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java deleted file mode 100644 index 28c4655..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.io.hfile; - -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue.KVComparator; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.hfile.HFile.Writer; -import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; -import org.apache.hadoop.hbase.util.BloomFilterWriter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; - -/** - * Writes HFile format version 2. - */ -@InterfaceAudience.Private -public class HFileWriterV2 extends AbstractHFileWriter { - static final Log LOG = LogFactory.getLog(HFileWriterV2.class); - - /** Max memstore (mvcc) timestamp in FileInfo */ - public static final byte [] MAX_MEMSTORE_TS_KEY = - Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); - - /** KeyValue version in FileInfo */ - public static final byte [] KEY_VALUE_VERSION = - Bytes.toBytes("KEY_VALUE_VERSION"); - - /** Version for KeyValue which includes memstore timestamp */ - public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; - - /** Inline block writers for multi-level block index and compound Blooms. */ - private List inlineBlockWriters = - new ArrayList(); - - /** Unified version 2 block writer */ - protected HFileBlock.Writer fsBlockWriter; - - private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; - private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; - - /** The offset of the first data block or -1 if the file is empty. */ - private long firstDataBlockOffset = -1; - - /** The offset of the last data block or 0 if the file is empty. */ - protected long lastDataBlockOffset; - - /** - * The last(stop) Cell of the previous data block. - * This reference should be short-lived since we write hfiles in a burst. - */ - private Cell lastCellOfPreviousBlock = null; - - /** Additional data items to be written to the "load-on-open" section. */ - private List additionalLoadOnOpenData = - new ArrayList(); - - protected long maxMemstoreTS = 0; - - static class WriterFactoryV2 extends HFile.WriterFactory { - WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { - super(conf, cacheConf); - } - - @Override - public Writer createWriter(FileSystem fs, Path path, - FSDataOutputStream ostream, - KVComparator comparator, HFileContext context) throws IOException { - context.setIncludesTags(false);// HFile V2 does not deal with tags at all! - return new HFileWriterV2(conf, cacheConf, fs, path, ostream, - comparator, context); - } - } - - /** Constructor that takes a path, creates and closes the output stream. */ - public HFileWriterV2(Configuration conf, CacheConfig cacheConf, - FileSystem fs, Path path, FSDataOutputStream ostream, - final KVComparator comparator, final HFileContext context) throws IOException { - super(cacheConf, - ostream == null ? createOutputStream(conf, fs, path, null) : ostream, - path, comparator, context); - finishInit(conf); - } - - /** Additional initialization steps */ - protected void finishInit(final Configuration conf) { - if (fsBlockWriter != null) - throw new IllegalStateException("finishInit called twice"); - - fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext); - - // Data block index writer - boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); - dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, - cacheIndexesOnWrite ? cacheConf : null, - cacheIndexesOnWrite ? name : null); - dataBlockIndexWriter.setMaxChunkSize( - HFileBlockIndex.getMaxChunkSize(conf)); - inlineBlockWriters.add(dataBlockIndexWriter); - - // Meta data block index writer - metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); - if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf); - } - - /** - * At a block boundary, write all the inline blocks and opens new block. - * - * @throws IOException - */ - protected void checkBlockBoundary() throws IOException { - if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) - return; - - finishBlock(); - writeInlineBlocks(false); - newBlock(); - } - - /** Clean up the current data block */ - private void finishBlock() throws IOException { - if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) - return; - - // Update the first data block offset for scanning. - if (firstDataBlockOffset == -1) { - firstDataBlockOffset = outputStream.getPos(); - } - // Update the last data block offset - lastDataBlockOffset = outputStream.getPos(); - fsBlockWriter.writeHeaderAndData(outputStream); - int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader(); - - Cell indexEntry = - CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock); - dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), - lastDataBlockOffset, onDiskSize); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - if (cacheConf.shouldCacheDataOnWrite()) { - doCacheOnWrite(lastDataBlockOffset); - } - } - - /** Gives inline block writers an opportunity to contribute blocks. */ - private void writeInlineBlocks(boolean closing) throws IOException { - for (InlineBlockWriter ibw : inlineBlockWriters) { - while (ibw.shouldWriteBlock(closing)) { - long offset = outputStream.getPos(); - boolean cacheThisBlock = ibw.getCacheOnWrite(); - ibw.writeInlineBlock(fsBlockWriter.startWriting( - ibw.getInlineBlockType())); - fsBlockWriter.writeHeaderAndData(outputStream); - ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(), - fsBlockWriter.getUncompressedSizeWithoutHeader()); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - - if (cacheThisBlock) { - doCacheOnWrite(offset); - } - } - } - } - - /** - * Caches the last written HFile block. - * @param offset the offset of the block we want to cache. Used to determine - * the cache key. - */ - private void doCacheOnWrite(long offset) { - HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); - cacheConf.getBlockCache().cacheBlock( - new BlockCacheKey(name, offset), cacheFormatBlock); - } - - /** - * Ready a new block for writing. - * - * @throws IOException - */ - protected void newBlock() throws IOException { - // This is where the next block begins. - fsBlockWriter.startWriting(BlockType.DATA); - firstCellInBlock = null; - if (lastCell != null) { - lastCellOfPreviousBlock = lastCell; - } - } - - /** - * Add a meta block to the end of the file. Call before close(). Metadata - * blocks are expensive. Fill one with a bunch of serialized data rather than - * do a metadata block per metadata instance. If metadata is small, consider - * adding to file info using {@link #appendFileInfo(byte[], byte[])} - * - * @param metaBlockName - * name of the block - * @param content - * will call readFields to get data later (DO NOT REUSE) - */ - @Override - public void appendMetaBlock(String metaBlockName, Writable content) { - byte[] key = Bytes.toBytes(metaBlockName); - int i; - for (i = 0; i < metaNames.size(); ++i) { - // stop when the current key is greater than our own - byte[] cur = metaNames.get(i); - if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, - key.length) > 0) { - break; - } - } - metaNames.add(i, key); - metaData.add(i, content); - } - - /** - * Add key/value to file. Keys must be added in an order that agrees with the - * Comparator passed on construction. - * - * @param cell Cell to add. Cannot be empty nor null. - * @throws IOException - */ - @Override - public void append(final Cell cell) throws IOException { - byte[] value = cell.getValueArray(); - int voffset = cell.getValueOffset(); - int vlength = cell.getValueLength(); - // checkKey uses comparator to check we are writing in order. - boolean dupKey = checkKey(cell); - checkValue(value, voffset, vlength); - if (!dupKey) { - checkBlockBoundary(); - } - - if (!fsBlockWriter.isWriting()) { - newBlock(); - } - - fsBlockWriter.write(cell); - - totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell); - totalValueLength += vlength; - - // Are we the first key in this block? - if (firstCellInBlock == null) { - // If cell is big, block will be closed and this firstCellInBlock reference will only last - // a short while. - firstCellInBlock = cell; - } - - // TODO: What if cell is 10MB and we write infrequently? We'll hold on to the cell here - // indefinetly? - lastCell = cell; - entryCount++; - this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); - } - - @Override - public void close() throws IOException { - if (outputStream == null) { - return; - } - // Save data block encoder metadata in the file info. - blockEncoder.saveMetadata(this); - // Write out the end of the data blocks, then write meta data blocks. - // followed by fileinfo, data block index and meta block index. - - finishBlock(); - writeInlineBlocks(true); - - FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); - - // Write out the metadata blocks if any. - if (!metaNames.isEmpty()) { - for (int i = 0; i < metaNames.size(); ++i) { - // store the beginning offset - long offset = outputStream.getPos(); - // write the metadata content - DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META); - metaData.get(i).write(dos); - - fsBlockWriter.writeHeaderAndData(outputStream); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - - // Add the new meta block to the meta index. - metaBlockIndexWriter.addEntry(metaNames.get(i), offset, - fsBlockWriter.getOnDiskSizeWithHeader()); - } - } - - // Load-on-open section. - - // Data block index. - // - // In version 2, this section of the file starts with the root level data - // block index. We call a function that writes intermediate-level blocks - // first, then root level, and returns the offset of the root level block - // index. - - long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); - trailer.setLoadOnOpenOffset(rootIndexOffset); - - // Meta block index. - metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting( - BlockType.ROOT_INDEX), "meta"); - fsBlockWriter.writeHeaderAndData(outputStream); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - - if (this.hFileContext.isIncludesMvcc()) { - appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); - appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); - } - - // File info - writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO)); - fsBlockWriter.writeHeaderAndData(outputStream); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - - // Load-on-open data supplied by higher levels, e.g. Bloom filters. - for (BlockWritable w : additionalLoadOnOpenData){ - fsBlockWriter.writeBlock(w, outputStream); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - } - - // Now finish off the trailer. - trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); - trailer.setUncompressedDataIndexSize( - dataBlockIndexWriter.getTotalUncompressedSize()); - trailer.setFirstDataBlockOffset(firstDataBlockOffset); - trailer.setLastDataBlockOffset(lastDataBlockOffset); - trailer.setComparatorClass(comparator.getClass()); - trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); - - - finishClose(trailer); - - fsBlockWriter.release(); - } - - @Override - public void addInlineBlockWriter(InlineBlockWriter ibw) { - inlineBlockWriters.add(ibw); - } - - @Override - public void addGeneralBloomFilter(final BloomFilterWriter bfw) { - this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); - } - - @Override - public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { - this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); - } - - private void addBloomFilter(final BloomFilterWriter bfw, - final BlockType blockType) { - if (bfw.getKeyCount() <= 0) - return; - - if (blockType != BlockType.GENERAL_BLOOM_META && - blockType != BlockType.DELETE_FAMILY_BLOOM_META) { - throw new RuntimeException("Block Type: " + blockType.toString() + - "is not supported"); - } - additionalLoadOnOpenData.add(new BlockWritable() { - @Override - public BlockType getBlockType() { - return blockType; - } - - @Override - public void writeToBlock(DataOutput out) throws IOException { - bfw.getMetaWriter().write(out); - Writable dataWriter = bfw.getDataWriter(); - if (dataWriter != null) - dataWriter.write(out); - } - }); - } - - protected int getMajorVersion() { - return 2; - } - - protected int getMinorVersion() { - return HFileReaderV2.MAX_MINOR_VERSION; - } - - @Override - public HFileContext getFileContext() { - return hFileContext; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java deleted file mode 100644 index 086395c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue.KVComparator; -import org.apache.hadoop.hbase.io.crypto.Encryption; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.io.hfile.HFile.Writer; -import org.apache.hadoop.hbase.security.EncryptionUtil; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * {@link HFile} writer for version 3. - */ -@InterfaceAudience.Private -public class HFileWriterV3 extends HFileWriterV2 { - - private static final Log LOG = LogFactory.getLog(HFileWriterV3.class); - - private int maxTagsLength = 0; - - static class WriterFactoryV3 extends HFile.WriterFactory { - WriterFactoryV3(Configuration conf, CacheConfig cacheConf) { - super(conf, cacheConf); - } - - @Override - public Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream, - final KVComparator comparator, HFileContext fileContext) - throws IOException { - return new HFileWriterV3(conf, cacheConf, fs, path, ostream, comparator, fileContext); - } - } - - /** Constructor that takes a path, creates and closes the output stream. */ - public HFileWriterV3(Configuration conf, CacheConfig cacheConf, FileSystem fs, Path path, - FSDataOutputStream ostream, final KVComparator comparator, - final HFileContext fileContext) throws IOException { - super(conf, cacheConf, fs, path, ostream, comparator, fileContext); - if (LOG.isTraceEnabled()) { - LOG.trace("Writer" + (path != null ? " for " + path : "") + - " initialized with cacheConf: " + cacheConf + - " comparator: " + comparator.getClass().getSimpleName() + - " fileContext: " + fileContext); - } - } - - /** - * Add key/value to file. Keys must be added in an order that agrees with the - * Comparator passed on construction. - * - * @param cell - * Cell to add. Cannot be empty nor null. - * @throws IOException - */ - @Override - public void append(final Cell cell) throws IOException { - // Currently get the complete arrays - super.append(cell); - int tagsLength = cell.getTagsLength(); - if (tagsLength > this.maxTagsLength) { - this.maxTagsLength = tagsLength; - } - } - - protected void finishFileInfo() throws IOException { - super.finishFileInfo(); - if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) { - // In case of Prefix Tree encoding, we always write tags information into HFiles even if all - // KVs are having no tags. - fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); - } else if (hFileContext.isIncludesTags()) { - // When tags are not being written in this file, MAX_TAGS_LEN is excluded - // from the FileInfo - fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); - boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) - && hFileContext.isCompressTags(); - fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); - } - } - - @Override - protected int getMajorVersion() { - return 3; - } - - @Override - protected int getMinorVersion() { - return HFileReaderV3.MAX_MINOR_VERSION; - } - - @Override - protected void finishClose(FixedFileTrailer trailer) throws IOException { - // Write out encryption metadata before finalizing if we have a valid crypto context - Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); - if (cryptoContext != Encryption.Context.NONE) { - // Wrap the context's key and write it as the encryption metadata, the wrapper includes - // all information needed for decryption - trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(), - cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, - User.getCurrent().getShortName()), - cryptoContext.getKey())); - } - // Now we can finish the close - super.finishClose(trailer); - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index f2d5c6f..26ae097 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -129,7 +129,7 @@ public class HFileOutputFormat2 // Invented config. Add to hbase-*.xml if other than default compression. final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); - final Algorithm defaultCompression = AbstractHFileWriter + final Algorithm defaultCompression = HFileWriterImpl .compressionByName(defaultCompressionStr); final boolean compactionExclude = conf.getBoolean( "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); @@ -483,7 +483,7 @@ public class HFileOutputFormat2 Map compressionMap = new TreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry e : stringMap.entrySet()) { - Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue()); + Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); compressionMap.put(e.getKey(), algorithm); } return compressionMap; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d4354b0..a515f8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -493,6 +494,7 @@ public class HRegionServer extends HasThread implements throws IOException { this.fsOk = true; this.conf = conf; + HFile.checkHFileVersion(this.conf); checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); FSUtils.setupShortCircuitRead(this.conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 8910042..9fa257a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -136,7 +136,7 @@ public interface Region extends ConfigurationObserver { */ long getOldestHfileTs(boolean majorCompactioOnly) throws IOException; - /** + /** * @return map of column family names to max sequence id that was read from storage when this * region was opened */ @@ -157,7 +157,7 @@ public interface Region extends ConfigurationObserver { /////////////////////////////////////////////////////////////////////////// // Metrics - + /** @return read requests count for this region */ long getReadRequestsCount(); @@ -181,7 +181,7 @@ public interface Region extends ConfigurationObserver { /** @return the number of mutations processed bypassing the WAL */ long getNumMutationsWithoutWAL(); - + /** @return the size of data processed bypassing the WAL, in bytes */ long getDataInMemoryWithoutWAL(); @@ -216,7 +216,7 @@ public interface Region extends ConfigurationObserver { /** * This method needs to be called before any public call that reads or - * modifies data. + * modifies data. * Acquires a read lock and checks if the region is closing or closed. *

{@link #closeRegionOperation} MUST then always be called after * the operation has completed, whether it succeeded or failed. @@ -226,7 +226,7 @@ public interface Region extends ConfigurationObserver { /** * This method needs to be called before any public call that reads or - * modifies data. + * modifies data. * Acquires a read lock and checks if the region is closing or closed. *

{@link #closeRegionOperation} MUST then always be called after * the operation has completed, whether it succeeded or failed. @@ -413,7 +413,7 @@ public interface Region extends ConfigurationObserver { /** * Perform atomic mutations within the region. - * + * * @param mutations The list of mutations to perform. * mutations can contain operations for multiple rows. * Caller has to ensure that all rows are contained in this region. @@ -588,8 +588,8 @@ public interface Region extends ConfigurationObserver { byte[] now) throws IOException; /** - * Replace any cell timestamps set to HConstants#LATEST_TIMESTAMP with the - * provided current timestamp. + * Replace any cell timestamps set to {@link org.apache.hadoop.hbase.HConstants#LATEST_TIMESTAMP} + * with the provided current timestamp. * @param values * @param now */ @@ -609,13 +609,13 @@ public interface Region extends ConfigurationObserver { CANNOT_FLUSH_MEMSTORE_EMPTY, CANNOT_FLUSH } - + /** @return the detailed result code */ Result getResult(); /** @return true if the memstores were flushed, else false */ boolean isFlushSucceeded(); - + /** @return True if the flush requested a compaction, else false */ boolean isCompactionNeeded(); } @@ -647,7 +647,7 @@ public interface Region extends ConfigurationObserver { * Synchronously compact all stores in the region. *

This operation could block for a long time, so don't call it from a * time-sensitive thread. - *

Note that no locks are taken to prevent possible conflicts between + *

Note that no locks are taken to prevent possible conflicts between * compaction and splitting activities. The regionserver does not normally compact * and split in parallel. However by calling this method you may introduce * unexpected and unhandled concurrency. Don't do this unless you know what diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index c1a6b76..345dd9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; @@ -409,7 +408,7 @@ public class StoreFile { } this.reader.setSequenceID(this.sequenceid); - b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); if (b != null) { this.maxMemstoreTS = Bytes.toLong(b); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 3c3ea6b..ae820b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; @@ -142,7 +142,7 @@ public abstract class Compactor { fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID()); } else { - tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY); + tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY); if (tmp != null) { fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java index cdef12f..a9cc1c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -120,7 +120,7 @@ public class CompressionTest { throws Exception { Configuration conf = HBaseConfiguration.create(); HFileContext context = new HFileContextBuilder() - .withCompression(AbstractHFileWriter.compressionByName(codec)).build(); + .withCompression(HFileWriterImpl.compressionByName(codec)).build(); HFile.Writer writer = HFile.getWriterFactoryNoCache(conf) .withPath(fs, path) .withFileContext(context) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java index ea10f60..cb12bea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.io.crypto.aes.AES; -import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -326,7 +326,7 @@ public class HFilePerformanceEvaluation { void setUp() throws Exception { HFileContextBuilder builder = new HFileContextBuilder() - .withCompression(AbstractHFileWriter.compressionByName(codec)) + .withCompression(HFileWriterImpl.compressionByName(codec)) .withBlockSize(RFILE_BLOCKSIZE); if (cipher == "aes") { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 00639cf..0622f55 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -242,7 +242,6 @@ public class TestCacheOnWrite { public void setUp() throws IOException { conf = TEST_UTIL.getConfiguration(); this.conf.set("dfs.datanode.data.dir.perm", "700"); - conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE); conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); @@ -272,12 +271,7 @@ public class TestCacheOnWrite { } private void readStoreFile(boolean useTags) throws IOException { - AbstractHFileReader reader; - if (useTags) { - reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf); - } else { - reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf); - } + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf); LOG.info("HFile information: " + reader); HFileContext meta = new HFileContextBuilder().withCompression(compress) .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) @@ -378,11 +372,6 @@ public class TestCacheOnWrite { } private void writeStoreFile(boolean useTags) throws IOException { - if(useTags) { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - } else { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2); - } Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "test_cache_on_write"); HFileContext meta = new HFileContextBuilder().withCompression(compress) @@ -422,11 +411,6 @@ public class TestCacheOnWrite { private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) throws IOException, InterruptedException { - if (useTags) { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - } else { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2); - } // TODO: need to change this test if we add a cache size threshold for // compactions, or if we implement some other kind of intelligent logic for // deciding what blocks to cache-on-write on compaction. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java index 1b6731a..cb56c78 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java @@ -90,7 +90,7 @@ public class TestFixedFileTrailer { @Test public void testTrailer() throws IOException { FixedFileTrailer t = new FixedFileTrailer(version, - HFileReaderV2.PBUF_TRAILER_MINOR_VERSION); + HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION); t.setDataIndexCount(3); t.setEntryCount(((long) Integer.MAX_VALUE) + 1); @@ -123,7 +123,7 @@ public class TestFixedFileTrailer { { DataInputStream dis = new DataInputStream(bais); FixedFileTrailer t2 = new FixedFileTrailer(version, - HFileReaderV2.PBUF_TRAILER_MINOR_VERSION); + HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION); t2.deserialize(dis); assertEquals(-1, bais.read()); // Ensure we have read everything. checkLoadedTrailer(version, t, t2); @@ -172,7 +172,7 @@ public class TestFixedFileTrailer { public void testTrailerForV2NonPBCompatibility() throws Exception { if (version == 2) { FixedFileTrailer t = new FixedFileTrailer(version, - HFileReaderV2.MINOR_VERSION_NO_CHECKSUM); + HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM); t.setDataIndexCount(3); t.setEntryCount(((long) Integer.MAX_VALUE) + 1); t.setLastDataBlockOffset(291); @@ -199,7 +199,7 @@ public class TestFixedFileTrailer { { DataInputStream dis = new DataInputStream(bais); FixedFileTrailer t2 = new FixedFileTrailer(version, - HFileReaderV2.MINOR_VERSION_NO_CHECKSUM); + HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM); t2.deserialize(dis); assertEquals(-1, bais.read()); // Ensure we have read everything. checkLoadedTrailer(version, t, t2); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java index 7625842..cf2aca5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java @@ -82,8 +82,6 @@ public class TestForceCacheImportantBlocks { public static Collection parameters() { // HFile versions return Arrays.asList( - new Object[] { 2, true }, - new Object[] { 2, false }, new Object[] { 3, true }, new Object[] { 3, false } ); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index 3855629..9e4b1c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -246,7 +246,7 @@ public class TestHFile extends HBaseTestCase { FSDataOutputStream fout = createFSOutput(ncTFile); HFileContext meta = new HFileContextBuilder() .withBlockSize(minBlockSize) - .withCompression(AbstractHFileWriter.compressionByName(codec)) + .withCompression(HFileWriterImpl.compressionByName(codec)) .build(); Writer writer = HFile.getWriterFactory(conf, cacheConf) .withOutputStream(fout) @@ -339,7 +339,7 @@ public class TestHFile extends HBaseTestCase { Path mFile = new Path(ROOT_DIR, "meta.hfile"); FSDataOutputStream fout = createFSOutput(mFile); HFileContext meta = new HFileContextBuilder() - .withCompression(AbstractHFileWriter.compressionByName(compress)) + .withCompression(HFileWriterImpl.compressionByName(compress)) .withBlockSize(minBlockSize).build(); Writer writer = HFile.getWriterFactory(conf, cacheConf) .withOutputStream(fout) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 939c019..0ee9d14 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -590,7 +590,7 @@ public class TestHFileBlockIndex { } // Manually compute the mid-key and validate it. - HFileReaderV2 reader2 = (HFileReaderV2) reader; + HFile.Reader reader2 = reader; HFileBlock.FSReader fsReader = reader2.getUncachedBlockReader(); HFileBlock.BlockIterator iter = fsReader.blockRange(0, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java index c0683f8..ab811f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java @@ -55,8 +55,7 @@ public class TestHFileInlineToRootChunkConversion { CacheConfig cacheConf = new CacheConfig(conf); conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, maxChunkSize); HFileContext context = new HFileContextBuilder().withBlockSize(16).build(); - HFileWriterV2 hfw = - (HFileWriterV2) new HFileWriterV2.WriterFactoryV2(conf, cacheConf) + HFile.Writer hfw = new HFileWriterFactory(conf, cacheConf) .withFileContext(context) .withPath(fs, hfPath).create(); List keys = new ArrayList(); @@ -78,7 +77,7 @@ public class TestHFileInlineToRootChunkConversion { } hfw.close(); - HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs, hfPath, cacheConf, conf); + HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, conf); // Scanner doesn't do Cells yet. Fix. HFileScanner scanner = reader.getScanner(true, true); for (int i = 0; i < keys.size(); ++i) { @@ -86,4 +85,4 @@ public class TestHFileInlineToRootChunkConversion { } reader.close(); } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java index 76a8200..26adb49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java @@ -130,7 +130,7 @@ public class TestHFileSeek extends TestCase { try { HFileContext context = new HFileContextBuilder() .withBlockSize(options.minBlockSize) - .withCompression(AbstractHFileWriter.compressionByName(options.compress)) + .withCompression(HFileWriterImpl.compressionByName(options.compress)) .build(); Writer writer = HFile.getWriterFactoryNoCache(conf) .withOutputStream(fout) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index 42e918a..ca063bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -56,7 +56,7 @@ import org.junit.experimental.categories.Category; /** * Testing writing a version 2 {@link HFile}. This is a low-level test written - * during the development of {@link HFileWriterV2}. + * during the development of {@link HFileWriterImpl}. */ @Category({IOTests.class, SmallTests.class}) public class TestHFileWriterV2 { @@ -99,8 +99,7 @@ public class TestHFileWriterV2 { .withBlockSize(4096) .withCompression(compressAlgo) .build(); - HFileWriterV2 writer = (HFileWriterV2) - new HFileWriterV2.WriterFactoryV2(conf, new CacheConfig(conf)) + HFile.Writer writer = new HFileWriterFactory(conf, new CacheConfig(conf)) .withPath(fs, hfilePath) .withFileContext(context) .create(); @@ -136,7 +135,6 @@ public class TestHFileWriterV2 { FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize); - assertEquals(2, trailer.getMajorVersion()); assertEquals(entryCount, trailer.getEntryCount()); HFileContext meta = new HFileContextBuilder() @@ -177,8 +175,7 @@ public class TestHFileWriterV2 { // File info FileInfo fileInfo = new FileInfo(); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); - byte [] keyValueFormatVersion = fileInfo.get( - HFileWriterV2.KEY_VALUE_VERSION); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); boolean includeMemstoreTS = keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index f96e8ef..2ca9273 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -60,8 +60,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; /** - * Testing writing a version 3 {@link HFile}. This is a low-level test written - * during the development of {@link HFileWriterV3}. + * Testing writing a version 3 {@link HFile}. */ @RunWith(Parameterized.class) @Category({IOTests.class, SmallTests.class}) @@ -120,8 +119,7 @@ public class TestHFileWriterV3 { .withBlockSize(4096) .withIncludesTags(useTags) .withCompression(compressAlgo).build(); - HFileWriterV3 writer = (HFileWriterV3) - new HFileWriterV3.WriterFactoryV3(conf, new CacheConfig(conf)) + HFile.Writer writer = new HFileWriterFactory(conf, new CacheConfig(conf)) .withPath(fs, hfilePath) .withFileContext(context) .withComparator(KeyValue.COMPARATOR) @@ -206,8 +204,7 @@ public class TestHFileWriterV3 { // File info FileInfo fileInfo = new FileInfo(); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); - byte [] keyValueFormatVersion = fileInfo.get( - HFileWriterV3.KEY_VALUE_VERSION); + byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION); boolean includeMemstoreTS = keyValueFormatVersion != null && Bytes.toInt(keyValueFormatVersion) > 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java index 2fd3684..0067417 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java @@ -89,8 +89,7 @@ public class TestLazyDataBlockDecompression { */ private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path, HFileContext cxt, int entryCount) throws IOException { - HFileWriterV2 writer = (HFileWriterV2) - new HFileWriterV2.WriterFactoryV2(conf, cc) + HFile.Writer writer = new HFileWriterFactory(conf, cc) .withPath(fs, path) .withFileContext(cxt) .create(); @@ -118,7 +117,7 @@ public class TestLazyDataBlockDecompression { long fileSize = fs.getFileStatus(path).getLen(); FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize); - HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig, + HFile.Reader reader = new HFileReaderImpl(path, trailer, fsdis, fileSize, cacheConfig, fsdis.getHfs(), conf); reader.loadFileInfo(); long offset = trailer.getFirstDataBlockOffset(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 4ceafb4..6a12616 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -55,7 +55,6 @@ public class TestPrefetch { @Before public void setUp() throws IOException { conf = TEST_UTIL.getConfiguration(); - conf.setInt(HFile.FORMAT_VERSION_KEY, 3); conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); fs = HFileSystem.get(conf); CacheConfig.blockCacheDisabled = false; @@ -70,10 +69,9 @@ public class TestPrefetch { private void readStoreFile(Path storeFilePath) throws Exception { // Open the file - HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs, - storeFilePath, cacheConf, conf); + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf); - while (!((HFileReaderV3)reader).prefetchComplete()) { + while (!reader.prefetchComplete()) { // Sleep for a bit Thread.sleep(1000); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java index 3a0fdf7..9d7de02 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java @@ -37,7 +37,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; /** - * Test {@link HFileScanner#reseekTo(byte[])} + * Test {@link HFileScanner#reseekTo(org.apache.hadoop.hbase.Cell)} */ @Category({IOTests.class, SmallTests.class}) public class TestReseekTo { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java index b9a126f..69bc09d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java @@ -73,11 +73,6 @@ public class TestSeekTo extends HBaseTestCase { Path makeNewFile(TagUsage tagUsage) throws IOException { Path ncTFile = new Path(testDir, "basic.hfile"); - if (tagUsage != TagUsage.NO_TAG) { - conf.setInt("hfile.format.version", 3); - } else { - conf.setInt("hfile.format.version", 2); - } FSDataOutputStream fout = this.fs.create(ncTFile); int blocksize = toKV("a", tagUsage).getLength() * 3; HFileContext context = new HFileContextBuilder().withBlockSize(blocksize) @@ -142,7 +137,7 @@ public class TestSeekTo extends HBaseTestCase { @Test public void testSeekBeforeWithReSeekTo() throws Exception { - testSeekBeforeWithReSeekToInternals(TagUsage.NO_TAG); + testSeekBeforeInternals(TagUsage.NO_TAG); testSeekBeforeWithReSeekToInternals(TagUsage.ONLY_TAG); testSeekBeforeWithReSeekToInternals(TagUsage.PARTIAL_TAG); } @@ -232,7 +227,7 @@ public class TestSeekTo extends HBaseTestCase { @Test public void testSeekTo() throws Exception { - testSeekToInternals(TagUsage.NO_TAG); + testSeekBeforeInternals(TagUsage.NO_TAG); testSeekToInternals(TagUsage.ONLY_TAG); testSeekToInternals(TagUsage.PARTIAL_TAG); } @@ -262,7 +257,7 @@ public class TestSeekTo extends HBaseTestCase { @Test public void testBlockContainingKey() throws Exception { - testBlockContainingKeyInternals(TagUsage.NO_TAG); + testSeekBeforeInternals(TagUsage.NO_TAG); testBlockContainingKeyInternals(TagUsage.ONLY_TAG); testBlockContainingKeyInternals(TagUsage.PARTIAL_TAG); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java index 1927334..6544c72 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java @@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.io.hfile.HFileReaderV2; +import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; @@ -602,8 +602,9 @@ public class DataBlockEncodingTool { // run the utilities DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName); int majorVersion = reader.getHFileVersion(); - comp.useHBaseChecksum = majorVersion > 2 - || (majorVersion == 2 && reader.getHFileMinorVersion() >= HFileReaderV2.MINOR_VERSION_WITH_CHECKSUM); + comp.useHBaseChecksum = majorVersion > 2 || + (majorVersion == 2 && + reader.getHFileMinorVersion() >= HFileReaderImpl.MINOR_VERSION_WITH_CHECKSUM); comp.checkStatistics(scanner, kvLimit); if (doVerify) { comp.verifyCodecs(scanner, kvLimit); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index dc142d6..b7ebd23 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileBlock; -import org.apache.hadoop.hbase.io.hfile.HFileReaderV2; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2; import org.apache.hadoop.hbase.wal.DefaultWALProvider; @@ -221,7 +220,7 @@ public class TestCacheOnWriteInSchema { BlockCache cache = cacheConf.getBlockCache(); StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.ROWCOL); - HFileReaderV2 reader = (HFileReaderV2) sf.createReader().getHFileReader(); + HFile.Reader reader = sf.createReader().getHFileReader(); try { // Open a scanner with (on read) caching disabled HFileScanner scanner = reader.getScanner(false, false); -- 2.2.1