Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java (revision 1439006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java (working copy) @@ -202,21 +202,6 @@ assertFalse(st.prepare()); } - @Test public void testWholesomeSplitWithHFileV1() throws IOException { - int defaultVersion = TEST_UTIL.getConfiguration().getInt( - HFile.FORMAT_VERSION_KEY, 2); - TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 1); - try { - for (Store store : this.parent.stores.values()) { - store.getFamily().setBloomFilterType(BloomType.ROW); - } - testWholesomeSplit(); - } finally { - TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, - defaultVersion); - } - } - @Test public void testWholesomeSplit() throws IOException { final int rowcount = TEST_UTIL.loadRegion(this.parent, CF, true); assertTrue(rowcount > 0); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (revision 1439006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (working copy) @@ -587,60 +587,6 @@ } } - public void testBloomEdgeCases() throws Exception { - float err = (float)0.005; - FileSystem fs = FileSystem.getLocal(conf); - Path f = new Path(ROOT_DIR, getName()); - conf.setFloat(BloomFilterFactory.IO_STOREFILE_BLOOM_ERROR_RATE, err); - conf.setBoolean(BloomFilterFactory.IO_STOREFILE_BLOOM_ENABLED, true); - conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_MAX_KEYS, 1000); - - // This test only runs for HFile format version 1. - conf.setInt(HFile.FORMAT_VERSION_KEY, 1); - - // this should not create a bloom because the max keys is too small - StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs, - StoreFile.DEFAULT_BLOCKSIZE_SMALL) - .withFilePath(f) - .withBloomType(BloomType.ROW) - .withMaxKeyCount(2000) - .withChecksumType(CKTYPE) - .withBytesPerChecksum(CKBYTES) - .build(); - assertFalse(writer.hasGeneralBloom()); - writer.close(); - fs.delete(f, true); - - conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_MAX_KEYS, - Integer.MAX_VALUE); - - // TODO: commented out because we run out of java heap space on trunk - // the below config caused IllegalArgumentException in our production cluster - // however, the resulting byteSize is < MAX_INT, so this should work properly - writer = new StoreFile.WriterBuilder(conf, cacheConf, fs, - StoreFile.DEFAULT_BLOCKSIZE_SMALL) - .withFilePath(f) - .withBloomType(BloomType.ROW) - .withMaxKeyCount(27244696) - .build(); - assertTrue(writer.hasGeneralBloom()); - bloomWriteRead(writer, fs); - - // this, however, is too large and should not create a bloom - // because Java can't create a contiguous array > MAX_INT - writer = new StoreFile.WriterBuilder(conf, cacheConf, fs, - StoreFile.DEFAULT_BLOCKSIZE_SMALL) - .withFilePath(f) - .withBloomType(BloomType.ROW) - .withMaxKeyCount(Integer.MAX_VALUE) - .withChecksumType(CKTYPE) - .withBytesPerChecksum(CKBYTES) - .build(); - assertFalse(writer.hasGeneralBloom()); - writer.close(); - fs.delete(f, true); - } - public void testSeqIdComparator() { assertOrdering(StoreFile.Comparators.SEQ_ID, mockStoreFile(true, 1000, -1, "/foo/123"), Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java (revision 1439006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestUpgradeFromHFileV1ToEncoding.java (working copy) @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.encoding; - -import static org.apache.hadoop.hbase.io.encoding.TestChangingEncoding.CF; -import static org.apache.hadoop.hbase.io.encoding.TestChangingEncoding.CF_BYTES; - -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(LargeTests.class) -public class TestUpgradeFromHFileV1ToEncoding { - - private static final Log LOG = - LogFactory.getLog(TestUpgradeFromHFileV1ToEncoding.class); - - private static final String TABLE = "UpgradeTable"; - private static final byte[] TABLE_BYTES = Bytes.toBytes(TABLE); - - private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final Configuration conf = TEST_UTIL.getConfiguration(); - - private static final int NUM_HFILE_V1_BATCHES = 10; - private static final int NUM_HFILE_V2_BATCHES = 20; - - private static final int NUM_SLAVES = 3; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // Use a small flush size to create more HFiles. - conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); - conf.setInt(HFile.FORMAT_VERSION_KEY, 1); // Use HFile v1 initially - TEST_UTIL.startMiniCluster(NUM_SLAVES); - LOG.debug("Started an HFile v1 cluster"); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testUpgrade() throws Exception { - int numBatches = 0; - HTableDescriptor htd = new HTableDescriptor(TABLE); - - // We don't want a split in this test. - htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); - htd.setMaxFileSize(Long.MAX_VALUE); - HColumnDescriptor hcd = new HColumnDescriptor(CF); - htd.addFamily(hcd); - HBaseAdmin admin = new HBaseAdmin(conf); - admin.createTable(htd); - admin.close(); - for (int i = 0; i < NUM_HFILE_V1_BATCHES; ++i) { - TestChangingEncoding.writeTestDataBatch(conf, TABLE, numBatches++); - } - TEST_UTIL.shutdownMiniHBaseCluster(); - - conf.setInt(HFile.FORMAT_VERSION_KEY, 2); - TEST_UTIL.startMiniHBaseCluster(1, NUM_SLAVES); - LOG.debug("Started an HFile v2 cluster"); - admin = new HBaseAdmin(conf); - htd = admin.getTableDescriptor(TABLE_BYTES); - hcd = htd.getFamily(CF_BYTES); - hcd.setDataBlockEncoding(DataBlockEncoding.PREFIX); - admin.disableTable(TABLE); - admin.modifyColumn(TABLE, hcd); - admin.enableTable(TABLE); - admin.close(); - for (int i = 0; i < NUM_HFILE_V2_BATCHES; ++i) { - TestChangingEncoding.writeTestDataBatch(conf, TABLE, numBatches++); - } - - LOG.debug("Verifying all 'batches', both HFile v1 and encoded HFile v2"); - verifyBatches(numBatches); - - LOG.debug("Doing a manual compaction"); - admin.compact(TABLE); - Thread.sleep(TimeUnit.SECONDS.toMillis(10)); - - LOG.debug("Verify all the data again"); - verifyBatches(numBatches); - } - - private void verifyBatches(int numBatches) throws Exception { - for (int i = 0; i < numBatches; ++i) { - TestChangingEncoding.verifyTestDataBatch(conf, TABLE, i); - } - } - -} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (revision 1439006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (working copy) @@ -258,41 +258,6 @@ } @Test - public void testReaderV1() throws IOException { - for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { - for (boolean pread : new boolean[] { false, true }) { - byte[] block = createTestV1Block(algo); - Path path = new Path(TEST_UTIL.getDataTestDir(), - "blocks_v1_"+ algo); - LOG.info("Creating temporary file at " + path); - FSDataOutputStream os = fs.create(path); - int totalSize = 0; - int numBlocks = 50; - for (int i = 0; i < numBlocks; ++i) { - os.write(block); - totalSize += block.length; - } - os.close(); - - FSDataInputStream is = fs.open(path); - HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo, - totalSize); - HFileBlock b; - int numBlocksRead = 0; - long pos = 0; - while (pos < totalSize) { - b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread); - b.sanityCheck(); - pos += block.length; - numBlocksRead++; - } - assertEquals(numBlocks, numBlocksRead); - is.close(); - } - } - } - - @Test public void testReaderV2() throws IOException { for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { for (boolean pread : new boolean[] { false, true }) { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java (revision 1439006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java (working copy) @@ -80,8 +80,6 @@ public static Collection parameters() { // HFile versions return Arrays.asList(new Object[][] { - new Object[] { new Integer(1), false }, - new Object[] { new Integer(1), true }, new Object[] { new Integer(2), false }, new Object[] { new Integer(2), true } }); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java (revision 1439006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderV1.java (working copy) @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.io.hfile; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.net.URL; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.SmallTests; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mortbay.log.Log; - -@Category(SmallTests.class) -public class TestHFileReaderV1 { - - private static final HBaseTestingUtility TEST_UTIL = - new HBaseTestingUtility(); - - private Configuration conf; - private FileSystem fs; - - private static final int N = 1000; - - @Before - public void setUp() throws IOException { - conf = TEST_UTIL.getConfiguration(); - fs = FileSystem.get(conf); - } - - @Test - public void testReadingExistingVersion1HFile() throws IOException { - URL url = TestHFileReaderV1.class.getResource( - "8e8ab58dcf39412da19833fcd8f687ac"); - Path existingHFilePath = new Path(url.getPath()); - HFile.Reader reader = - HFile.createReader(fs, existingHFilePath, new CacheConfig(conf)); - reader.loadFileInfo(); - FixedFileTrailer trailer = reader.getTrailer(); - - assertEquals(N, reader.getEntries()); - assertEquals(N, trailer.getEntryCount()); - assertEquals(1, trailer.getMajorVersion()); - assertEquals(Compression.Algorithm.GZ, trailer.getCompressionCodec()); - - for (boolean pread : new boolean[] { false, true }) { - int totalDataSize = 0; - int n = 0; - - HFileScanner scanner = reader.getScanner(false, pread); - assertTrue(scanner.seekTo()); - do { - totalDataSize += scanner.getKey().limit() + scanner.getValue().limit() - + Bytes.SIZEOF_INT * 2; - ++n; - } while (scanner.next()); - - // Add magic record sizes, one per data block. - totalDataSize += 8 * trailer.getDataIndexCount(); - - assertEquals(N, n); - assertEquals(trailer.getTotalUncompressedBytes(), totalDataSize); - } - reader.close(); - } -} \ No newline at end of file Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java (revision 1439006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java (working copy) @@ -169,41 +169,6 @@ } @Test - public void testReaderV1() throws IOException { - for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { - for (boolean pread : new boolean[] { false, true }) { - byte[] block = createTestV1Block(algo); - Path path = new Path(TEST_UTIL.getDataTestDir(), - "blocks_v1_"+ algo); - LOG.info("Creating temporary file at " + path); - FSDataOutputStream os = fs.create(path); - int totalSize = 0; - int numBlocks = 50; - for (int i = 0; i < numBlocks; ++i) { - os.write(block); - totalSize += block.length; - } - os.close(); - - FSDataInputStream is = fs.open(path); - HFileBlock.FSReader hbr = new HFileBlock.FSReaderV1(is, algo, - totalSize); - HFileBlock b; - int numBlocksRead = 0; - long pos = 0; - while (pos < totalSize) { - b = hbr.readBlockData(pos, block.length, uncompressedSizeV1, pread); - b.sanityCheck(); - pos += block.length; - numBlocksRead++; - } - assertEquals(numBlocks, numBlocksRead); - is.close(); - } - } - } - - @Test public void testReaderV2() throws IOException { for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { for (boolean pread : new boolean[] { false, true }) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1439006) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; @@ -1535,7 +1534,7 @@ bloom = null; shouldCheckBloom = true; } else { - bloom = reader.getMetaBlock(HFileWriterV1.BLOOM_FILTER_DATA_KEY, + bloom = reader.getMetaBlock(HFileWriterV2.BLOOM_FILTER_DATA_KEY, true); shouldCheckBloom = bloom != null; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (revision 1439006) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (working copy) @@ -1317,110 +1317,6 @@ } /** - * Reads version 1 blocks from the file system. In version 1 blocks, - * everything is compressed, including the magic record, if compression is - * enabled. Everything might be uncompressed if no compression is used. This - * reader returns blocks represented in the uniform version 2 format in - * memory. - */ - static class FSReaderV1 extends AbstractFSReader { - - /** Header size difference between version 1 and 2 */ - private static final int HEADER_DELTA = HEADER_SIZE_NO_CHECKSUM - - MAGIC_LENGTH; - - public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo, - long fileSize) throws IOException { - super(istream, istream, compressAlgo, fileSize, 0, null, null); - } - - /** - * Read a version 1 block. There is no uncompressed header, and the block - * type (the magic record) is part of the compressed data. This - * implementation assumes that the bounded range file input stream is - * needed to stop the decompressor reading into next block, because the - * decompressor just grabs a bunch of data without regard to whether it is - * coming to end of the compressed section. - * - * The block returned is still a version 2 block, and in particular, its - * first {@link #HEADER_SIZE} bytes contain a valid version 2 header. - * - * @param offset the offset of the block to read in the file - * @param onDiskSizeWithMagic the on-disk size of the version 1 block, - * including the magic record, which is the part of compressed - * data if using compression - * @param uncompressedSizeWithMagic uncompressed size of the version 1 - * block, including the magic record - */ - @Override - public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic, - int uncompressedSizeWithMagic, boolean pread) throws IOException { - if (uncompressedSizeWithMagic <= 0) { - throw new IOException("Invalid uncompressedSize=" - + uncompressedSizeWithMagic + " for a version 1 block"); - } - - if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE) - { - throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic - + " (maximum allowed: " + Integer.MAX_VALUE + ")"); - } - - int onDiskSize = (int) onDiskSizeWithMagic; - - if (uncompressedSizeWithMagic < MAGIC_LENGTH) { - throw new IOException("Uncompressed size for a version 1 block is " - + uncompressedSizeWithMagic + " but must be at least " - + MAGIC_LENGTH); - } - - // The existing size already includes magic size, and we are inserting - // a version 2 header. - ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic - + HEADER_DELTA); - - int onDiskSizeWithoutHeader; - if (compressAlgo == Compression.Algorithm.NONE) { - // A special case when there is no compression. - if (onDiskSize != uncompressedSizeWithMagic) { - throw new IOException("onDiskSize=" + onDiskSize - + " and uncompressedSize=" + uncompressedSizeWithMagic - + " must be equal for version 1 with no compression"); - } - - // The first MAGIC_LENGTH bytes of what this will read will be - // overwritten. - readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA, - onDiskSize, false, offset, pread); - - onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH; - } else { - InputStream bufferedBoundedStream = createBufferedBoundedStream( - offset, onDiskSize, pread); - Compression.decompress(buf.array(), buf.arrayOffset() - + HEADER_DELTA, bufferedBoundedStream, onDiskSize, - uncompressedSizeWithMagic, this.compressAlgo); - - // We don't really have a good way to exclude the "magic record" size - // from the compressed block's size, since it is compressed as well. - onDiskSizeWithoutHeader = onDiskSize; - } - - BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset() - + HEADER_DELTA, MAGIC_LENGTH); - - // We set the uncompressed size of the new HFile block we are creating - // to the size of the data portion of the block without the magic record, - // since the magic record gets moved to the header. - HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader, - uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER, - offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(), - onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM); - return b; - } - } - - /** * We always prefetch the header of the next block, so that we know its * on-disk size in advance and can read it in one operation. */ Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1439006) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy) @@ -447,8 +447,6 @@ CacheConfig cacheConf) { int version = getFormatVersion(conf); switch (version) { - case 1: - return new HFileWriterV1.WriterFactoryV1(conf, cacheConf); case 2: return new HFileWriterV2.WriterFactoryV2(conf, cacheConf); default: @@ -557,9 +555,6 @@ throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae); } switch (trailer.getMajorVersion()) { - case 1: - return new HFileReaderV1(path, trailer, fsdis, size, closeIStream, - cacheConf); case 2: return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum, size, closeIStream, Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1439006) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -50,6 +50,9 @@ public class HFileWriterV2 extends AbstractHFileWriter { static final Log LOG = LogFactory.getLog(HFileWriterV2.class); + /** Meta data block name for bloom filter bits. */ + public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA"; + /** Max memstore (mvcc) timestamp in FileInfo */ public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (revision 1439006) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (working copy) @@ -1,689 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; -import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.io.hfile.HFile.Writer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.RawComparator; - -import com.google.common.base.Preconditions; - -/** - * {@link HFile} reader for version 1. Does not support data block encoding, - * even in cache only, i.e. HFile v1 blocks are always brought into cache - * unencoded. - */ -@InterfaceAudience.Private -public class HFileReaderV1 extends AbstractHFileReader { - private static final Log LOG = LogFactory.getLog(HFileReaderV1.class); - - private volatile boolean fileInfoLoaded = false; - - /** - * Opens a HFile. You must load the index before you can - * use it by calling {@link #loadFileInfo()}. - * - * @param fsdis input stream. Caller is responsible for closing the passed - * stream. - * @param size Length of the stream. - * @param cacheConf cache references and configuration - */ - public HFileReaderV1(Path path, FixedFileTrailer trailer, - final FSDataInputStream fsdis, final long size, - final boolean closeIStream, - final CacheConfig cacheConf) throws IOException { - super(path, trailer, fsdis, size, closeIStream, cacheConf); - - trailer.expectMajorVersion(1); - fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize); - } - - private byte[] readAllIndex(final FSDataInputStream in, - final long indexOffset, final int indexSize) throws IOException { - byte[] allIndex = new byte[indexSize]; - in.seek(indexOffset); - IOUtils.readFully(in, allIndex, 0, allIndex.length); - - return allIndex; - } - - /** - * Read in the index and file info. - * - * @return A map of fileinfo data. - * @see Writer#appendFileInfo(byte[], byte[]) - * @throws IOException - */ - @Override - public FileInfo loadFileInfo() throws IOException { - if (fileInfoLoaded) - return fileInfo; - - // Read in the fileinfo and get what we need from it. - istream.seek(trailer.getFileInfoOffset()); - fileInfo = new FileInfo(); - fileInfo.read(istream); - lastKey = fileInfo.get(FileInfo.LASTKEY); - avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN)); - avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN)); - - // Comparator is stored in the file info in version 1. - String clazzName = Bytes.toString(fileInfo.get(FileInfo.COMPARATOR)); - comparator = getComparator(clazzName); - - dataBlockIndexReader = - new HFileBlockIndex.BlockIndexReader(comparator, 1); - metaBlockIndexReader = - new HFileBlockIndex.BlockIndexReader(Bytes.BYTES_RAWCOMPARATOR, 1); - - int sizeToLoadOnOpen = (int) (fileSize - trailer.getLoadOnOpenDataOffset() - - trailer.getTrailerSize()); - byte[] dataAndMetaIndex = readAllIndex(istream, - trailer.getLoadOnOpenDataOffset(), sizeToLoadOnOpen); - - ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex); - DataInputStream dis = new DataInputStream(bis); - - // Read in the data index. - if (trailer.getDataIndexCount() > 0) - BlockType.INDEX_V1.readAndCheck(dis); - dataBlockIndexReader.readRootIndex(dis, trailer.getDataIndexCount()); - - // Read in the metadata index. - if (trailer.getMetaIndexCount() > 0) - BlockType.INDEX_V1.readAndCheck(dis); - metaBlockIndexReader.readRootIndex(dis, trailer.getMetaIndexCount()); - - fileInfoLoaded = true; - return fileInfo; - } - - /** - * Creates comparator from the given class name. - * - * @param clazzName the comparator class name read from the trailer - * @return an instance of the comparator to use - * @throws IOException in case comparator class name is invalid - */ - @SuppressWarnings("unchecked") - private RawComparator getComparator(final String clazzName) - throws IOException { - if (clazzName == null || clazzName.length() == 0) { - return null; - } - try { - return (RawComparator)Class.forName(clazzName).newInstance(); - } catch (InstantiationException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { - throw new IOException(e); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } - } - - /** - * Create a Scanner on this file. No seeks or reads are done on creation. Call - * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is - * nothing to clean up in a Scanner. Letting go of your references to the - * scanner is sufficient. - * - * @param cacheBlocks True if we should cache blocks read in by this scanner. - * @param pread Use positional read rather than seek+read if true (pread is - * better for random reads, seek+read is better scanning). - * @param isCompaction is scanner being used for a compaction? - * @return Scanner on this file. - */ - @Override - public HFileScanner getScanner(boolean cacheBlocks, final boolean pread, - final boolean isCompaction) { - return new ScannerV1(this, cacheBlocks, pread, isCompaction); - } - - /** - * @param key Key to search. - * @return Block number of the block containing the key or -1 if not in this - * file. - */ - protected int blockContainingKey(final byte[] key, int offset, int length) { - Preconditions.checkState(!dataBlockIndexReader.isEmpty(), - "Block index not loaded"); - return dataBlockIndexReader.rootBlockContainingKey(key, offset, length); - } - - /** - * @param metaBlockName - * @param cacheBlock Add block to cache, if found - * @return Block wrapped in a ByteBuffer - * @throws IOException - */ - @Override - public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) - throws IOException { - if (trailer.getMetaIndexCount() == 0) { - return null; // there are no meta blocks - } - if (metaBlockIndexReader == null) { - throw new IOException("Meta index not loaded"); - } - - byte[] nameBytes = Bytes.toBytes(metaBlockName); - int block = metaBlockIndexReader.rootBlockContainingKey(nameBytes, 0, - nameBytes.length); - if (block == -1) - return null; - long offset = metaBlockIndexReader.getRootBlockOffset(block); - long nextOffset; - if (block == metaBlockIndexReader.getRootBlockCount() - 1) { - nextOffset = trailer.getFileInfoOffset(); - } else { - nextOffset = metaBlockIndexReader.getRootBlockOffset(block + 1); - } - - long startTimeNs = System.nanoTime(); - - BlockCacheKey cacheKey = new BlockCacheKey(name, offset, - DataBlockEncoding.NONE, BlockType.META); - - BlockCategory effectiveCategory = BlockCategory.META; - if (metaBlockName.equals(HFileWriterV1.BLOOM_FILTER_META_KEY) || - metaBlockName.equals(HFileWriterV1.BLOOM_FILTER_DATA_KEY)) { - effectiveCategory = BlockCategory.BLOOM; - } - - // Per meta key from any given file, synchronize reads for said block - synchronized (metaBlockIndexReader.getRootBlockKey(block)) { - // Check cache for block. If found return. - if (cacheConf.isBlockCacheEnabled()) { - HFileBlock cachedBlock = - (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, - cacheConf.shouldCacheBlockOnRead(effectiveCategory), false); - if (cachedBlock != null) { - return cachedBlock.getBufferWithoutHeader(); - } - // Cache Miss, please load. - } - - HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, - nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block), - true); - hfileBlock.expectType(BlockType.META); - - final long delta = System.nanoTime() - startTimeNs; - HFile.offerReadLatency(delta, true); - - // Cache the block - if (cacheBlock && cacheConf.shouldCacheBlockOnRead(effectiveCategory)) { - cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, - cacheConf.isInMemory()); - } - - return hfileBlock.getBufferWithoutHeader(); - } - } - - /** - * Read in a file block. - * @param block Index of block to read. - * @param pread Use positional read instead of seek+read (positional is - * better doing random reads whereas seek+read is better scanning). - * @param isCompaction is this block being read as part of a compaction - * @return Block wrapped in a ByteBuffer. - * @throws IOException - */ - ByteBuffer readBlockBuffer(int block, boolean cacheBlock, - final boolean pread, final boolean isCompaction) throws IOException { - if (dataBlockIndexReader == null) { - throw new IOException("Block index not loaded"); - } - if (block < 0 || block >= dataBlockIndexReader.getRootBlockCount()) { - throw new IOException("Requested block is out of range: " + block + - ", max: " + dataBlockIndexReader.getRootBlockCount()); - } - - long offset = dataBlockIndexReader.getRootBlockOffset(block); - BlockCacheKey cacheKey = new BlockCacheKey(name, offset); - - // For any given block from any given file, synchronize reads for said - // block. - // Without a cache, this synchronizing is needless overhead, but really - // the other choice is to duplicate work (which the cache would prevent you - // from doing). - synchronized (dataBlockIndexReader.getRootBlockKey(block)) { - // Check cache for block. If found return. - if (cacheConf.isBlockCacheEnabled()) { - HFileBlock cachedBlock = - (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, - cacheConf.shouldCacheDataOnRead(), false); - if (cachedBlock != null) { - return cachedBlock.getBufferWithoutHeader(); - } - // Carry on, please load. - } - - // Load block from filesystem. - long startTimeNs = System.nanoTime(); - long nextOffset; - - if (block == dataBlockIndexReader.getRootBlockCount() - 1) { - // last block! The end of data block is first meta block if there is - // one or if there isn't, the fileinfo offset. - nextOffset = (metaBlockIndexReader.getRootBlockCount() == 0) ? - this.trailer.getFileInfoOffset() : - metaBlockIndexReader.getRootBlockOffset(0); - } else { - nextOffset = dataBlockIndexReader.getRootBlockOffset(block + 1); - } - - HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread); - hfileBlock.expectType(BlockType.DATA); - - final long delta = System.nanoTime() - startTimeNs; - HFile.offerReadLatency(delta, pread); - - // Cache the block - if (cacheBlock && cacheConf.shouldCacheBlockOnRead( - hfileBlock.getBlockType().getCategory())) { - cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, - cacheConf.isInMemory()); - } - return hfileBlock.getBufferWithoutHeader(); - } - } - - /** - * @return Last key in the file. May be null if file has no entries. - * Note that this is not the last rowkey, but rather the byte form of - * the last KeyValue. - */ - public byte[] getLastKey() { - if (!fileInfoLoaded) { - throw new RuntimeException("Load file info first"); - } - return dataBlockIndexReader.isEmpty() ? null : lastKey; - } - - /** - * @return Midkey for this file. We work with block boundaries only so - * returned midkey is an approximation only. - * - * @throws IOException - */ - @Override - public byte[] midkey() throws IOException { - Preconditions.checkState(isFileInfoLoaded(), "File info is not loaded"); - Preconditions.checkState(!dataBlockIndexReader.isEmpty(), - "Data block index is not loaded or is empty"); - return dataBlockIndexReader.midkey(); - } - - @Override - public void close() throws IOException { - close(cacheConf.shouldEvictOnClose()); - } - - @Override - public void close(boolean evictOnClose) throws IOException { - if (evictOnClose && cacheConf.isBlockCacheEnabled()) { - int numEvicted = 0; - for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) { - if (cacheConf.getBlockCache().evictBlock( - new BlockCacheKey(name, - dataBlockIndexReader.getRootBlockOffset(i), - DataBlockEncoding.NONE, BlockType.DATA))) { - numEvicted++; - } - } - LOG.debug("On close of file " + name + " evicted " + numEvicted - + " block(s) of " + dataBlockIndexReader.getRootBlockCount() - + " total blocks"); - } - if (this.closeIStream && this.istream != null) { - this.istream.close(); - this.istream = null; - } - } - - protected abstract static class AbstractScannerV1 - extends AbstractHFileReader.Scanner { - protected int currBlock; - - /** - * This masks a field with the same name in the superclass and saves us the - * runtime overhead of casting from abstract reader to reader V1. - */ - protected HFileReaderV1 reader; - - public AbstractScannerV1(HFileReaderV1 reader, boolean cacheBlocks, - final boolean pread, final boolean isCompaction) { - super(reader, cacheBlocks, pread, isCompaction); - this.reader = (HFileReaderV1) reader; - } - - /** - * Within a loaded block, seek looking for the first key - * that is smaller than (or equal to?) the key we are interested in. - * - * A note on the seekBefore - if you have seekBefore = true, AND the - * first key in the block = key, then you'll get thrown exceptions. - * @param key to find - * @param seekBefore find the key before the exact match. - */ - protected abstract int blockSeek(byte[] key, int offset, int length, - boolean seekBefore); - - protected abstract void loadBlock(int bloc, boolean rewind) - throws IOException; - - @Override - public int seekTo(byte[] key, int offset, int length) throws IOException { - int b = reader.blockContainingKey(key, offset, length); - if (b < 0) return -1; // falls before the beginning of the file! :-( - // Avoid re-reading the same block (that'd be dumb). - loadBlock(b, true); - return blockSeek(key, offset, length, false); - } - - @Override - public int reseekTo(byte[] key, int offset, int length) - throws IOException { - if (blockBuffer != null && currKeyLen != 0) { - ByteBuffer bb = getKey(); - int compared = reader.getComparator().compare(key, offset, - length, bb.array(), bb.arrayOffset(), bb.limit()); - if (compared < 1) { - // If the required key is less than or equal to current key, then - // don't do anything. - return compared; - } - } - - int b = reader.blockContainingKey(key, offset, length); - if (b < 0) { - return -1; - } - loadBlock(b, false); - return blockSeek(key, offset, length, false); - } - - @Override - public boolean seekBefore(byte[] key, int offset, int length) - throws IOException { - int b = reader.blockContainingKey(key, offset, length); - if (b < 0) - return false; // key is before the start of the file. - - // Question: does this block begin with 'key'? - byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b); - if (reader.getComparator().compare(firstkKey, 0, firstkKey.length, - key, offset, length) == 0) { - // Ok the key we're interested in is the first of the block, so go back - // by one. - if (b == 0) { - // we have a 'problem', the key we want is the first of the file. - return false; - } - b--; - // TODO shortcut: seek forward in this block to the last key of the - // block. - } - loadBlock(b, true); - blockSeek(key, offset, length, true); - return true; - } - } - - /** - * Implementation of {@link HFileScanner} interface. - */ - - protected static class ScannerV1 extends AbstractScannerV1 { - private HFileReaderV1 reader; - - public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks, - final boolean pread, final boolean isCompaction) { - super(reader, cacheBlocks, pread, isCompaction); - this.reader = reader; - } - - @Override - public KeyValue getKeyValue() { - if (blockBuffer == null) { - return null; - } - return new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position() - 8); - } - - @Override - public ByteBuffer getKey() { - Preconditions.checkState(blockBuffer != null && currKeyLen > 0, - "you need to seekTo() before calling getKey()"); - - ByteBuffer keyBuff = blockBuffer.slice(); - keyBuff.limit(currKeyLen); - keyBuff.rewind(); - // Do keyBuff.asReadOnly()? - return keyBuff; - } - - @Override - public ByteBuffer getValue() { - if (blockBuffer == null || currKeyLen == 0) { - throw new RuntimeException( - "you need to seekTo() before calling getValue()"); - } - - // TODO: Could this be done with one ByteBuffer rather than create two? - ByteBuffer valueBuff = blockBuffer.slice(); - valueBuff.position(currKeyLen); - valueBuff = valueBuff.slice(); - valueBuff.limit(currValueLen); - valueBuff.rewind(); - return valueBuff; - } - - @Override - public boolean next() throws IOException { - if (blockBuffer == null) { - throw new IOException("Next called on non-seeked scanner"); - } - - try { - blockBuffer.position(blockBuffer.position() + currKeyLen - + currValueLen); - } catch (IllegalArgumentException e) { - LOG.error("Current pos = " + blockBuffer.position() + - "; currKeyLen = " + currKeyLen + - "; currValLen = " + currValueLen + - "; block limit = " + blockBuffer.limit() + - "; HFile name = " + reader.getName() + - "; currBlock id = " + currBlock, e); - throw e; - } - if (blockBuffer.remaining() <= 0) { - currBlock++; - if (currBlock >= reader.getDataBlockIndexReader().getRootBlockCount()) { - // damn we are at the end - currBlock = 0; - blockBuffer = null; - return false; - } - blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, - isCompaction); - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - blockFetches++; - return true; - } - - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - return true; - } - - @Override - protected int blockSeek(byte[] key, int offset, int length, - boolean seekBefore) { - int klen, vlen; - int lastLen = 0; - do { - klen = blockBuffer.getInt(); - vlen = blockBuffer.getInt(); - int comp = reader.getComparator().compare(key, offset, length, - blockBuffer.array(), - blockBuffer.arrayOffset() + blockBuffer.position(), klen); - if (comp == 0) { - if (seekBefore) { - blockBuffer.position(blockBuffer.position() - lastLen - 16); - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - return 1; // non exact match. - } - currKeyLen = klen; - currValueLen = vlen; - return 0; // indicate exact match - } - if (comp < 0) { - // go back one key: - blockBuffer.position(blockBuffer.position() - lastLen - 16); - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - return 1; - } - blockBuffer.position(blockBuffer.position() + klen + vlen); - lastLen = klen + vlen; - } while (blockBuffer.remaining() > 0); - - // ok we are at the end, so go back a littleeeeee.... - // The 8 in the below is intentionally different to the 16s in the above - // Do the math you you'll figure it. - blockBuffer.position(blockBuffer.position() - lastLen - 8); - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - return 1; // didn't exactly find it. - } - - @Override - public String getKeyString() { - return Bytes.toStringBinary(blockBuffer.array(), - blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen); - } - - @Override - public String getValueString() { - return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset() + - blockBuffer.position() + currKeyLen, currValueLen); - } - - @Override - public boolean seekTo() throws IOException { - if (reader.getDataBlockIndexReader().isEmpty()) { - return false; - } - if (blockBuffer != null && currBlock == 0) { - blockBuffer.rewind(); - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - return true; - } - currBlock = 0; - blockBuffer = reader.readBlockBuffer(currBlock, cacheBlocks, pread, - isCompaction); - currKeyLen = blockBuffer.getInt(); - currValueLen = blockBuffer.getInt(); - blockFetches++; - return true; - } - - @Override - protected void loadBlock(int bloc, boolean rewind) throws IOException { - if (blockBuffer == null) { - blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, - isCompaction); - currBlock = bloc; - blockFetches++; - } else { - if (bloc != currBlock) { - blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread, - isCompaction); - currBlock = bloc; - blockFetches++; - } else { - // we are already in the same block, just rewind to seek again. - if (rewind) { - blockBuffer.rewind(); - } - else { - // Go back by (size of rowlength + size of valuelength) = 8 bytes - blockBuffer.position(blockBuffer.position()-8); - } - } - } - } - - } - - @Override - public HFileBlock readBlock(long offset, long onDiskBlockSize, - boolean cacheBlock, boolean pread, boolean isCompaction, - BlockType expectedBlockType) { - throw new UnsupportedOperationException(); - } - - @Override - public DataInput getGeneralBloomFilterMetadata() throws IOException { - // Shouldn't cache Bloom filter blocks, otherwise server would abort when - // splitting, see HBASE-6479 - ByteBuffer buf = getMetaBlock(HFileWriterV1.BLOOM_FILTER_META_KEY, false); - if (buf == null) - return null; - ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(), - buf.arrayOffset(), buf.limit()); - return new DataInputStream(bais); - } - - @Override - public DataInput getDeleteBloomFilterMetadata() throws IOException { - return null; - } - - @Override - public boolean isFileInfoLoaded() { - return fileInfoLoaded; - } - -} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (revision 1439006) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (working copy) @@ -1,448 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; -import org.apache.hadoop.hbase.io.hfile.HFile.Writer; -import org.apache.hadoop.hbase.regionserver.MemStore; -import org.apache.hadoop.hbase.util.ChecksumType; -import org.apache.hadoop.hbase.util.BloomFilterWriter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.Compressor; - -/** - * Writes version 1 HFiles. Mainly used for testing backwards-compatibility. - */ -@InterfaceAudience.Private -public class HFileWriterV1 extends AbstractHFileWriter { - - /** Meta data block name for bloom filter parameters. */ - static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META"; - - /** Meta data block name for bloom filter bits. */ - public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA"; - - private static final Log LOG = LogFactory.getLog(HFileWriterV1.class); - - // A stream made per block written. - private DataOutputStream out; - - // Offset where the current block began. - private long blockBegin; - - // First keys of every block. - private ArrayList blockKeys = new ArrayList(); - - // Block offset in backing stream. - private ArrayList blockOffsets = new ArrayList(); - - // Raw (decompressed) data size. - private ArrayList blockDataSizes = new ArrayList(); - - private Compressor compressor; - - // Additional byte array output stream used to fill block cache - private ByteArrayOutputStream baos; - private DataOutputStream baosDos; - - static class WriterFactoryV1 extends HFile.WriterFactory { - WriterFactoryV1(Configuration conf, CacheConfig cacheConf) { - super(conf, cacheConf); - } - - @Override - public Writer createWriter(FileSystem fs, Path path, - FSDataOutputStream ostream, int blockSize, - Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder, - KeyComparator comparator, final ChecksumType checksumType, - final int bytesPerChecksum) throws IOException { - // version 1 does not implement checksums - return new HFileWriterV1(conf, cacheConf, fs, path, ostream, blockSize, - compressAlgo, dataBlockEncoder, comparator); - } - } - - /** Constructor that takes a path, creates and closes the output stream. */ - public HFileWriterV1(Configuration conf, CacheConfig cacheConf, - FileSystem fs, Path path, FSDataOutputStream ostream, - int blockSize, Compression.Algorithm compress, - HFileDataBlockEncoder blockEncoder, - final KeyComparator comparator) throws IOException { - super(cacheConf, ostream == null ? createOutputStream(conf, fs, path) : ostream, path, - blockSize, compress, blockEncoder, comparator); - } - - /** - * If at block boundary, opens new block. - * - * @throws IOException - */ - private void checkBlockBoundary() throws IOException { - if (this.out != null && this.out.size() < blockSize) - return; - finishBlock(); - newBlock(); - } - - /** - * Do the cleanup if a current block. - * - * @throws IOException - */ - private void finishBlock() throws IOException { - if (this.out == null) - return; - long startTimeNs = System.nanoTime(); - - int size = releaseCompressingStream(this.out); - this.out = null; - blockKeys.add(firstKeyInBlock); - blockOffsets.add(Long.valueOf(blockBegin)); - blockDataSizes.add(Integer.valueOf(size)); - this.totalUncompressedBytes += size; - - HFile.offerWriteLatency(System.nanoTime() - startTimeNs); - - if (cacheConf.shouldCacheDataOnWrite()) { - baosDos.flush(); - // we do not do data block encoding on disk for HFile v1 - byte[] bytes = baos.toByteArray(); - HFileBlock block = new HFileBlock(BlockType.DATA, - (int) (outputStream.getPos() - blockBegin), bytes.length, -1, - ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER, - blockBegin, MemStore.NO_PERSISTENT_TS, - HFileBlock.MINOR_VERSION_NO_CHECKSUM, // minor version - 0, // bytesPerChecksum - ChecksumType.NULL.getCode(), // checksum type - (int) (outputStream.getPos() - blockBegin) + - HFileBlock.HEADER_SIZE_NO_CHECKSUM); // onDiskDataSizeWithHeader - - block = blockEncoder.diskToCacheFormat(block, false); - cacheConf.getBlockCache().cacheBlock( - new BlockCacheKey(name, blockBegin, DataBlockEncoding.NONE, - block.getBlockType()), block); - baosDos.close(); - } - } - - /** - * Ready a new block for writing. - * - * @throws IOException - */ - private void newBlock() throws IOException { - // This is where the next block begins. - blockBegin = outputStream.getPos(); - this.out = getCompressingStream(); - BlockType.DATA.write(out); - firstKeyInBlock = null; - if (cacheConf.shouldCacheDataOnWrite()) { - this.baos = new ByteArrayOutputStream(); - this.baosDos = new DataOutputStream(baos); - baosDos.write(HFileBlock.DUMMY_HEADER_NO_CHECKSUM); - } - } - - /** - * Sets up a compressor and creates a compression stream on top of - * this.outputStream. Get one per block written. - * - * @return A compressing stream; if 'none' compression, returned stream does - * not compress. - * - * @throws IOException - * - * @see {@link #releaseCompressingStream(DataOutputStream)} - */ - private DataOutputStream getCompressingStream() throws IOException { - this.compressor = compressAlgo.getCompressor(); - // Get new DOS compression stream. In tfile, the DOS, is not closed, - // just finished, and that seems to be fine over there. TODO: Check - // no memory retention of the DOS. Should I disable the 'flush' on the - // DOS as the BCFile over in tfile does? It wants to make it so flushes - // don't go through to the underlying compressed stream. Flush on the - // compressed downstream should be only when done. I was going to but - // looks like when we call flush in here, its legitimate flush that - // should go through to the compressor. - OutputStream os = this.compressAlgo.createCompressionStream( - this.outputStream, this.compressor, 0); - return new DataOutputStream(os); - } - - /** - * Let go of block compressor and compressing stream gotten in call {@link - * #getCompressingStream}. - * - * @param dos - * - * @return How much was written on this stream since it was taken out. - * - * @see #getCompressingStream() - * - * @throws IOException - */ - private int releaseCompressingStream(final DataOutputStream dos) - throws IOException { - dos.flush(); - this.compressAlgo.returnCompressor(this.compressor); - this.compressor = null; - return dos.size(); - } - - /** - * Add a meta block to the end of the file. Call before close(). Metadata - * blocks are expensive. Fill one with a bunch of serialized data rather than - * do a metadata block per metadata instance. If metadata is small, consider - * adding to file info using {@link #appendFileInfo(byte[], byte[])} - * - * @param metaBlockName - * name of the block - * @param content - * will call readFields to get data later (DO NOT REUSE) - */ - public void appendMetaBlock(String metaBlockName, Writable content) { - byte[] key = Bytes.toBytes(metaBlockName); - int i; - for (i = 0; i < metaNames.size(); ++i) { - // stop when the current key is greater than our own - byte[] cur = metaNames.get(i); - if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, - key.length) > 0) { - break; - } - } - metaNames.add(i, key); - metaData.add(i, content); - } - - /** - * Add key/value to file. Keys must be added in an order that agrees with the - * Comparator passed on construction. - * - * @param kv - * KeyValue to add. Cannot be empty nor null. - * @throws IOException - */ - public void append(final KeyValue kv) throws IOException { - append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), - kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); - } - - /** - * Add key/value to file. Keys must be added in an order that agrees with the - * Comparator passed on construction. - * - * @param key - * Key to add. Cannot be empty nor null. - * @param value - * Value to add. Cannot be empty nor null. - * @throws IOException - */ - public void append(final byte[] key, final byte[] value) throws IOException { - append(key, 0, key.length, value, 0, value.length); - } - - /** - * Add key/value to file. Keys must be added in an order that agrees with the - * Comparator passed on construction. - * - * @param key - * @param koffset - * @param klength - * @param value - * @param voffset - * @param vlength - * @throws IOException - */ - private void append(final byte[] key, final int koffset, final int klength, - final byte[] value, final int voffset, final int vlength) - throws IOException { - boolean dupKey = checkKey(key, koffset, klength); - checkValue(value, voffset, vlength); - if (!dupKey) { - checkBlockBoundary(); - } - // Write length of key and value and then actual key and value bytes. - this.out.writeInt(klength); - totalKeyLength += klength; - this.out.writeInt(vlength); - totalValueLength += vlength; - this.out.write(key, koffset, klength); - this.out.write(value, voffset, vlength); - // Are we the first key in this block? - if (this.firstKeyInBlock == null) { - // Copy the key. - this.firstKeyInBlock = new byte[klength]; - System.arraycopy(key, koffset, this.firstKeyInBlock, 0, klength); - } - this.lastKeyBuffer = key; - this.lastKeyOffset = koffset; - this.lastKeyLength = klength; - this.entryCount++; - // If we are pre-caching blocks on write, fill byte array stream - if (cacheConf.shouldCacheDataOnWrite()) { - this.baosDos.writeInt(klength); - this.baosDos.writeInt(vlength); - this.baosDos.write(key, koffset, klength); - this.baosDos.write(value, voffset, vlength); - } - } - - public void close() throws IOException { - if (this.outputStream == null) { - return; - } - // Save data block encoder metadata in the file info. - blockEncoder.saveMetadata(this); - // Write out the end of the data blocks, then write meta data blocks. - // followed by fileinfo, data block index and meta block index. - - finishBlock(); - - FixedFileTrailer trailer = new FixedFileTrailer(1, - HFileBlock.MINOR_VERSION_NO_CHECKSUM); - - // Write out the metadata blocks if any. - ArrayList metaOffsets = null; - ArrayList metaDataSizes = null; - if (metaNames.size() > 0) { - metaOffsets = new ArrayList(metaNames.size()); - metaDataSizes = new ArrayList(metaNames.size()); - for (int i = 0; i < metaNames.size(); ++i) { - // store the beginning offset - long curPos = outputStream.getPos(); - metaOffsets.add(curPos); - // write the metadata content - DataOutputStream dos = getCompressingStream(); - BlockType.META.write(dos); - metaData.get(i).write(dos); - int size = releaseCompressingStream(dos); - // store the metadata size - metaDataSizes.add(size); - } - } - - writeFileInfo(trailer, outputStream); - - // Write the data block index. - trailer.setLoadOnOpenOffset(writeBlockIndex(this.outputStream, - this.blockKeys, this.blockOffsets, this.blockDataSizes)); - LOG.info("Wrote a version 1 block index with " + this.blockKeys.size() - + " keys"); - - if (metaNames.size() > 0) { - // Write the meta index. - writeBlockIndex(this.outputStream, metaNames, metaOffsets, metaDataSizes); - } - - // Now finish off the trailer. - trailer.setDataIndexCount(blockKeys.size()); - - finishClose(trailer); - } - - @Override - protected void finishFileInfo() throws IOException { - super.finishFileInfo(); - - // In version 1, we store comparator name in the file info. - fileInfo.append(FileInfo.COMPARATOR, - Bytes.toBytes(comparator.getClass().getName()), false); - } - - @Override - public void addInlineBlockWriter(InlineBlockWriter bloomWriter) { - // Inline blocks only exist in HFile format version 2. - throw new UnsupportedOperationException(); - } - - /** - * Version 1 general Bloom filters are stored in two meta blocks with two different - * keys. - */ - @Override - public void addGeneralBloomFilter(BloomFilterWriter bfw) { - appendMetaBlock(BLOOM_FILTER_META_KEY, - bfw.getMetaWriter()); - Writable dataWriter = bfw.getDataWriter(); - if (dataWriter != null) { - appendMetaBlock(BLOOM_FILTER_DATA_KEY, dataWriter); - } - } - - @Override - public void addDeleteFamilyBloomFilter(BloomFilterWriter bfw) - throws IOException { - throw new IOException("Delete Bloom filter is not supported in HFile V1"); - } - - /** - * Write out the index in the version 1 format. This conforms to the legacy - * version 1 format, but can still be read by - * {@link HFileBlockIndex.BlockIndexReader#readRootIndex(java.io.DataInputStream, - * int)}. - * - * @param out the stream to write to - * @param keys - * @param offsets - * @param uncompressedSizes in contrast with a version 2 root index format, - * the sizes stored in the version 1 are uncompressed sizes - * @return - * @throws IOException - */ - private static long writeBlockIndex(final FSDataOutputStream out, - final List keys, final List offsets, - final List uncompressedSizes) throws IOException { - long pos = out.getPos(); - // Don't write an index if nothing in the index. - if (keys.size() > 0) { - BlockType.INDEX_V1.write(out); - // Write the index. - for (int i = 0; i < keys.size(); ++i) { - out.writeLong(offsets.get(i).longValue()); - out.writeInt(uncompressedSizes.get(i).intValue()); - byte[] key = keys.get(i); - Bytes.writeByteArray(out, key); - } - } - return pos; - } - -}