From 99632e2fcf2231206e75519b3d3bb90898cf959d Mon Sep 17 00:00:00 2001 From: mbautin Date: Mon, 19 Mar 2012 19:12:19 +0000 Subject: [PATCH] HBASE-5521 [jira] Move compression/decompression to an encoder specific encoding context Author: Yongqiang He Summary: https://issues.apache.org/jira/browse/HBASE-5521 As part of working on HBASE-5313, we want to add a new columnar encoder/decoder. It makes sense to move compression to be part of encoder/decoder: 1) a scanner for a columnar encoded block can do lazy decompression to a specific part of a key value object 2) avoid an extra bytes copy from encoder to hblock-writer. If there is no encoder specified for a writer, the HBlock.Writer will use a default compression-context to do something very similar to today's code. Test Plan: existing unit tests verified by mbautin and tedyu. And no new test added here since this code is just a preparation for columnar encoder. Will add testcase later in that diff. Reviewers: dhruba, tedyu, sc, mbautin Reviewed By: mbautin Differential Revision: https://reviews.facebook.net/D2097 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1302602 13f79535-47bb-0310-9956-ffa450edef68 --- .../io/encoding/BufferedDataBlockEncoder.java | 52 ++ .../hbase/io/encoding/CopyKeyDataBlockEncoder.java | 3 +- .../hadoop/hbase/io/encoding/DataBlockEncoder.java | 59 ++- .../hbase/io/encoding/DataBlockEncoding.java | 31 +- .../hbase/io/encoding/DiffKeyDeltaEncoder.java | 2 +- .../hadoop/hbase/io/encoding/EncodedDataBlock.java | 28 +- .../hbase/io/encoding/FastDiffDeltaEncoder.java | 2 +- .../io/encoding/HFileBlockDecodingContext.java | 50 ++ .../encoding/HFileBlockDefaultDecodingContext.java | 65 +++ .../encoding/HFileBlockDefaultEncodingContext.java | 208 +++++++ .../io/encoding/HFileBlockEncodingContext.java | 80 +++ .../hbase/io/encoding/PrefixKeyDeltaEncoder.java | 2 +- .../apache/hadoop/hbase/io/hfile/Compression.java | 51 ++- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 588 +++++++------------- .../hbase/io/hfile/HFileDataBlockEncoder.java | 40 ++- .../hbase/io/hfile/HFileDataBlockEncoderImpl.java | 131 ++++- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 2 +- .../hbase/io/hfile/NoOpDataBlockEncoder.java | 36 ++- .../hbase/io/encoding/TestDataBlockEncoders.java | 101 +++-- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 66 ++- .../io/hfile/TestHFileBlockCompatibility.java | 178 +++---- .../hbase/io/hfile/TestHFileDataBlockEncoder.java | 22 +- .../hbase/regionserver/DataBlockEncodingTool.java | 11 +- 23 files changed, 1159 insertions(+), 649 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java create mode 100644 src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java create mode 100644 src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java create mode 100644 src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index ce77e96..5ce410e 100644 --- src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -24,6 +24,8 @@ import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.RawComparator; @@ -301,4 +303,54 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } } + @Override + public HFileBlockEncodingContext newDataBlockEncodingContext( + Algorithm compressionAlgorithm, + DataBlockEncoding encoding, byte[] header) { + return new HFileBlockDefaultEncodingContext( + compressionAlgorithm, encoding, header); + } + + @Override + public HFileBlockDecodingContext newDataBlockDecodingContext( + Algorithm compressionAlgorithm) { + return new HFileBlockDefaultDecodingContext(compressionAlgorithm); + } + + /** + * Compress KeyValues and write them to output buffer. + * @param out Where to write compressed data. + * @param in Source of KeyValue for compression. + * @param includesMemstoreTS true if including memstore timestamp after every + * key-value pair + * @throws IOException If there is an error writing to output stream. + */ + public abstract void internalEncodeKeyValues(DataOutputStream out, + ByteBuffer in, boolean includesMemstoreTS) throws IOException; + + @Override + public void compressKeyValues(ByteBuffer in, + boolean includesMemstoreTS, + HFileBlockEncodingContext blkEncodingCtx) throws IOException { + if (!(blkEncodingCtx.getClass().getName().equals( + HFileBlockDefaultEncodingContext.class.getName()))) { + throw new IOException (this.getClass().getName() + " only accepts " + + HFileBlockDefaultEncodingContext.class.getName() + " as the " + + "encoding context."); + } + + HFileBlockDefaultEncodingContext encodingCtx = + (HFileBlockDefaultEncodingContext) blkEncodingCtx; + encodingCtx.prepareEncoding(); + DataOutputStream dataOut = + ((HFileBlockDefaultEncodingContext) encodingCtx) + .getOutputStreamForEncoder(); + internalEncodeKeyValues(dataOut, in, includesMemstoreTS); + if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) { + encodingCtx.postEncoding(BlockType.ENCODED_DATA); + } else { + encodingCtx.postEncoding(BlockType.DATA); + } + } + } diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java index 9cb7559..0092d08 100644 --- src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java +++ src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java @@ -33,7 +33,7 @@ import org.apache.hadoop.io.RawComparator; @InterfaceAudience.Private public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { @Override - public void compressKeyValues(DataOutputStream out, + public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in, boolean includesMemstoreTS) throws IOException { in.rewind(); ByteBufferUtils.putInt(out, in.limit()); @@ -94,4 +94,5 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { } }; } + } diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java index b9ac373..5788c07 100644 --- src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java +++ src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java @@ -17,12 +17,12 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.io.RawComparator; /** @@ -34,19 +34,32 @@ import org.apache.hadoop.io.RawComparator; *
  • knowledge of Key Value format
  • * * It is designed to work fast enough to be feasible as in memory compression. + * + * After encoding, it also optionally compresses the encoded data if a + * compression algorithm is specified in HFileBlockEncodingContext argument of + * {@link #compressKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}. */ @InterfaceAudience.Private public interface DataBlockEncoder { + /** - * Compress KeyValues and write them to output buffer. - * @param out Where to write compressed data. - * @param in Source of KeyValue for compression. - * @param includesMemstoreTS true if including memstore timestamp after every - * key-value pair - * @throws IOException If there is an error writing to output stream. + * Compress KeyValues. It will first encode key value pairs, and then + * optionally do the compression for the encoded data. + * + * @param in + * Source of KeyValue for compression. + * @param includesMemstoreTS + * true if including memstore timestamp after every key-value pair + * @param encodingContext + * the encoding context which will contain encoded uncompressed bytes + * as well as compressed encoded bytes if compression is enabled, and + * also it will reuse resources across multiple calls. + * @throws IOException + * If there is an error writing to output stream. */ - public void compressKeyValues(DataOutputStream out, - ByteBuffer in, boolean includesMemstoreTS) throws IOException; + public void compressKeyValues( + ByteBuffer in, boolean includesMemstoreTS, + HFileBlockEncodingContext encodingContext) throws IOException; /** * Uncompress. @@ -94,6 +107,34 @@ public interface DataBlockEncoder { boolean includesMemstoreTS); /** + * Creates a encoder specific encoding context + * + * @param compressionAlgorithm + * compression algorithm used if the final data needs to be + * compressed + * @param encoding + * encoding strategy used + * @param headerBytes + * header bytes to be written, put a dummy header here if the header + * is unknown + * @return a newly created encoding context + */ + public HFileBlockEncodingContext newDataBlockEncodingContext( + Algorithm compressionAlgorithm, DataBlockEncoding encoding, + byte[] headerBytes); + + /** + * Creates an encoder specific decoding context, which will prepare the data + * before actual decoding + * + * @param compressionAlgorithm + * compression algorithm used if the data needs to be decompressed + * @return a newly created decoding context + */ + public HFileBlockDecodingContext newDataBlockDecodingContext( + Algorithm compressionAlgorithm); + + /** * An interface which enable to seek while underlying data is encoded. * * It works on one HFileBlock, but it is reusable. See diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java index a277045..79abff3 100644 --- src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java +++ src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java @@ -103,6 +103,18 @@ public enum DataBlockEncoding { stream.write(idInBytes); } + + /** + * Writes id bytes to the given array starting from offset. + * + * @param dest output array + * @param offset starting offset of the output array + * @throws IOException + */ + public void writeIdInBytes(byte[] dest, int offset) throws IOException { + System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE); + } + /** * Return new data block encoder for given algorithm type. * @return data block encoder if algorithm is specified, null if none is @@ -113,25 +125,6 @@ public enum DataBlockEncoding { } /** - * Provide access to all data block encoders, even those which are not - * exposed in the enum. Useful for testing and benchmarking. - * @return list of all data block encoders. - */ - public static List getAllEncoders() { - ArrayList encoders = new ArrayList(); - for (DataBlockEncoding algo : values()) { - DataBlockEncoder encoder = algo.getEncoder(); - if (encoder != null) { - encoders.add(encoder); - } - } - - // Add encoders that are only used in testing. - encoders.add(new CopyKeyDataBlockEncoder()); - return encoders; - } - - /** * Find and create data block encoder for given id; * @param encoderId id of data block encoder. * @return Newly created data block encoder. diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java index 5180bd2..b0a5551 100644 --- src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java +++ src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java @@ -316,7 +316,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public void compressKeyValues(DataOutputStream out, + public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in, boolean includesMemstoreTS) throws IOException { in.rewind(); ByteBufferUtils.putInt(out, in.limit()); diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java index 47c2d00..370e888 100644 --- src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java +++ src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; @@ -27,6 +26,8 @@ import java.util.Iterator; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.io.compress.Compressor; /** @@ -40,17 +41,22 @@ public class EncodedDataBlock { ByteArrayOutputStream uncompressedOutputStream; ByteBuffer uncompressedBuffer; private byte[] cacheCompressData; - private ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); private boolean includesMemstoreTS; + private final HFileBlockEncodingContext encodingCxt; + /** * Create a buffer which will be encoded using dataBlockEncoder. * @param dataBlockEncoder Algorithm used for compression. + * @param encoding encoding type used */ public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, - boolean includesMemstoreTS) { + boolean includesMemstoreTS, DataBlockEncoding encoding) { this.dataBlockEncoder = dataBlockEncoder; uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE); + encodingCxt = + dataBlockEncoder.newDataBlockEncodingContext(Compression.Algorithm.NONE, + encoding, HFileBlock.DUMMY_HEADER); } /** @@ -175,7 +181,7 @@ public class EncodedDataBlock { if (cacheCompressData != null) { return cacheCompressData; } - cacheCompressData = doCompressData(); + cacheCompressData = encodeData(); return cacheCompressData; } @@ -190,22 +196,20 @@ public class EncodedDataBlock { } /** - * Do the compression. - * @return Compressed byte buffer. + * Do the encoding . + * @return encoded byte buffer. */ - public byte[] doCompressData() { - compressedStream.reset(); - DataOutputStream dataOut = new DataOutputStream(compressedStream); + public byte[] encodeData() { try { this.dataBlockEncoder.compressKeyValues( - dataOut, getUncompressedBuffer(), includesMemstoreTS); + getUncompressedBuffer(), includesMemstoreTS, encodingCxt); } catch (IOException e) { throw new RuntimeException(String.format( - "Bug in decoding part of algorithm %s. " + + "Bug in encoding part of algorithm %s. " + "Probably it requested more bytes than are available.", toString()), e); } - return compressedStream.toByteArray(); + return encodingCxt.getUncompressedBytesWithHeader(); } @Override diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index 48448d3..e30d8fd 100644 --- src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -343,7 +343,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public void compressKeyValues(DataOutputStream out, + public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in, boolean includesMemstoreTS) throws IOException { in.rewind(); ByteBufferUtils.putInt(out, in.limit()); diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java new file mode 100644 index 0000000..04a115c --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; + +/** + * A decoding context that is created by a reader's encoder, and is shared + * across the reader's all read operations. + * + * @see HFileBlockEncodingContext for encoding + */ +public interface HFileBlockDecodingContext { + + /** + * @return the compression algorithm used by this decoding context + */ + public Compression.Algorithm getCompression(); + + /** + * Perform all actions that need to be done before the encoder's real + * decoding process. Decompression needs to be done if + * {@link #getCompression()} returns a valid compression algorithm. + * + * @param block HFile block object + * @param onDiskBlock on disk bytes to be decoded + * @param offset data start offset in onDiskBlock + * @throws IOException + */ + public void prepareDecoding(HFileBlock block, byte[] onDiskBlock, + int offset) throws IOException; + +} diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java new file mode 100644 index 0000000..0a99a43 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; + +/** + * A default implementation of {@link HFileBlockDecodingContext}. It assumes the + * block data section is compressed as a whole. + * + * @see HFileBlockDefaultEncodingContext for the default compression context + * + */ +public class HFileBlockDefaultDecodingContext implements + HFileBlockDecodingContext { + + private final Compression.Algorithm compressAlgo; + + public HFileBlockDefaultDecodingContext( + Compression.Algorithm compressAlgo) { + this.compressAlgo = compressAlgo; + } + + @Override + public void prepareDecoding(HFileBlock block, + byte[] onDiskBlock, int offset) throws IOException { + DataInputStream dis = + new DataInputStream(new ByteArrayInputStream( + onDiskBlock, offset, + block.getOnDiskSizeWithoutHeader())); + + ByteBuffer buffer = block.getBufferWithoutHeader(); + Compression.decompress(buffer.array(), buffer.arrayOffset(), + (InputStream) dis, block.getOnDiskSizeWithoutHeader(), + block.getUncompressedSizeWithoutHeader(), compressAlgo); + } + + @Override + public Algorithm getCompression() { + return compressAlgo; + } + +} diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java new file mode 100644 index 0000000..965d5cf --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; + +/** + * A default implementation of {@link HFileBlockEncodingContext}. It will + * compress the data section as one continuous buffer. + * + * @see HFileBlockDefaultDecodingContext for the decompression part + * + */ +public class HFileBlockDefaultEncodingContext implements + HFileBlockEncodingContext { + + private byte[] onDiskBytesWithHeader; + private byte[] uncompressedBytesWithHeader; + private BlockType blockType; + private final DataBlockEncoding encodingAlgo; + + /** Compressor, which is also reused between consecutive blocks. */ + private Compressor compressor; + + /** Compression output stream */ + private CompressionOutputStream compressionStream; + + /** Underlying stream to write compressed bytes to */ + private ByteArrayOutputStream compressedByteStream; + + /** Compression algorithm for all blocks this instance writes. */ + private final Compression.Algorithm compressionAlgorithm; + + private ByteArrayOutputStream encodedStream = new ByteArrayOutputStream(); + private DataOutputStream dataOut = new DataOutputStream(encodedStream); + + private final byte[] dummyHeader; + + /** + * @param compressionAlgorithm compression algorithm used + * @param encoding encoding used + * @param headerBytes dummy header bytes + */ + public HFileBlockDefaultEncodingContext( + Compression.Algorithm compressionAlgorithm, + DataBlockEncoding encoding, byte[] headerBytes) { + this.encodingAlgo = encoding; + this.compressionAlgorithm = + compressionAlgorithm == null ? NONE : compressionAlgorithm; + if (this.compressionAlgorithm != NONE) { + compressor = compressionAlgorithm.getCompressor(); + compressedByteStream = new ByteArrayOutputStream(); + try { + compressionStream = + compressionAlgorithm.createPlainCompressionStream( + compressedByteStream, compressor); + } catch (IOException e) { + throw new RuntimeException( + "Could not create compression stream for algorithm " + + compressionAlgorithm, e); + } + } + if (headerBytes == null) { + dummyHeader = HFileBlock.DUMMY_HEADER; + } else { + dummyHeader = headerBytes; + } + } + + /** + * @param compressionAlgorithm compression algorithm + * @param encoding encoding + */ + public HFileBlockDefaultEncodingContext( + Compression.Algorithm compressionAlgorithm, + DataBlockEncoding encoding) { + this(compressionAlgorithm, encoding, null); + } + + /** + * prepare to start a new encoding. + * @throws IOException + */ + void prepareEncoding() throws IOException { + encodedStream.reset(); + dataOut.write(dummyHeader); + if (encodingAlgo != null + && encodingAlgo != DataBlockEncoding.NONE) { + encodingAlgo.writeIdInBytes(dataOut); + } + } + + @Override + public void postEncoding(BlockType blockType) + throws IOException { + dataOut.flush(); + compressAfterEncoding(encodedStream.toByteArray(), blockType); + this.blockType = blockType; + } + + /** + * @param uncompressedBytesWithHeader + * @param blockType + * @throws IOException + */ + public void compressAfterEncoding(byte[] uncompressedBytesWithHeader, + BlockType blockType) throws IOException { + compressAfterEncoding(uncompressedBytesWithHeader, blockType, dummyHeader); + } + + /** + * @param uncompressedBytesWithHeader + * @param blockType + * @param headerBytes + * @throws IOException + */ + protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, + BlockType blockType, byte[] headerBytes) throws IOException { + this.uncompressedBytesWithHeader = uncompressedBytesWithHeader; + if (compressionAlgorithm != NONE) { + compressedByteStream.reset(); + compressedByteStream.write(headerBytes); + compressionStream.resetState(); + compressionStream.write(uncompressedBytesWithHeader, + headerBytes.length, uncompressedBytesWithHeader.length + - headerBytes.length); + + compressionStream.flush(); + compressionStream.finish(); + onDiskBytesWithHeader = compressedByteStream.toByteArray(); + } else { + onDiskBytesWithHeader = uncompressedBytesWithHeader; + } + this.blockType = blockType; + } + + @Override + public byte[] getOnDiskBytesWithHeader() { + return onDiskBytesWithHeader; + } + + @Override + public byte[] getUncompressedBytesWithHeader() { + return uncompressedBytesWithHeader; + } + + @Override + public BlockType getBlockType() { + return blockType; + } + + /** + * Releases the compressor this writer uses to compress blocks into the + * compressor pool. + */ + @Override + public void close() { + if (compressor != null) { + compressionAlgorithm.returnCompressor(compressor); + compressor = null; + } + } + + @Override + public Algorithm getCompression() { + return this.compressionAlgorithm; + } + + public DataOutputStream getOutputStreamForEncoder() { + return this.dataOut; + } + + @Override + public DataBlockEncoding getDataBlockEncoding() { + return this.encodingAlgo; + } + + @Override + public int getHeaderSize() { + return this.dummyHeader.length; + } + +} diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java new file mode 100644 index 0000000..45f2749 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.Compression; + +/** + * An encoding context that is created by a writer's encoder, and is shared + * across the writer's whole lifetime. + * + * @see HFileBlockDecodingContext for decoding + * + */ +public interface HFileBlockEncodingContext { + + /** + * @return encoded and compressed bytes with header which are ready to write + * out to disk + */ + public byte[] getOnDiskBytesWithHeader(); + + /** + * @return encoded but not heavily compressed bytes with header which can be + * cached in block cache + */ + public byte[] getUncompressedBytesWithHeader(); + + /** + * @return the block type after encoding + */ + public BlockType getBlockType(); + + /** + * @return the compression algorithm used by this encoding context + */ + public Compression.Algorithm getCompression(); + + /** + * @return the header size used + */ + public int getHeaderSize(); + + /** + * @return the {@link DataBlockEncoding} encoding used + */ + public DataBlockEncoding getDataBlockEncoding(); + + /** + * Do any action that needs to be performed after the encoding. + * Compression is also included if {@link #getCompression()} returns non-null + * compression algorithm + * + * @param blockType + * @throws IOException + */ + public void postEncoding(BlockType blockType) throws IOException; + + /** + * Releases the resources used. + */ + public void close(); + +} diff --git src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 98ea7c9..f1edcb5 100644 --- src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -75,7 +75,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public void compressKeyValues(DataOutputStream writeHere, + public void internalEncodeKeyValues(DataOutputStream writeHere, ByteBuffer in, boolean includesMemstoreTS) throws IOException { in.rewind(); ByteBufferUtils.putInt(writeHere, in.limit()); diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java index 7a32610..78289f2 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; @@ -234,7 +235,7 @@ public final class Compression { * Creates a compression stream without any additional wrapping into * buffering streams. */ - CompressionOutputStream createPlainCompressionStream( + public CompressionOutputStream createPlainCompressionStream( OutputStream downStream, Compressor compressor) throws IOException { CompressionCodec codec = getCodec(conf); ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); @@ -323,4 +324,52 @@ public final class Compression { return ret; } + + /** + * Decompresses data from the given stream using the configured compression + * algorithm. It will throw an exception if the dest buffer does not have + * enough space to hold the decompressed data. + * + * @param dest + * the output bytes buffer + * @param destOffset + * start writing position of the output buffer + * @param bufferedBoundedStream + * a stream to read compressed data from, bounded to the exact amount + * of compressed data + * @param compressedSize + * compressed data size, header not included + * @param uncompressedSize + * uncompressed data size, header not included + * @param compressAlgo + * compression algorithm used + * @throws IOException + */ + public static void decompress(byte[] dest, int destOffset, + InputStream bufferedBoundedStream, int compressedSize, + int uncompressedSize, Compression.Algorithm compressAlgo) + throws IOException { + + if (dest.length - destOffset < uncompressedSize) { + throw new IllegalArgumentException( + "Output buffer does not have enough space to hold " + + uncompressedSize + " decompressed bytes, available: " + + (dest.length - destOffset)); + } + + Decompressor decompressor = null; + try { + decompressor = compressAlgo.getDecompressor(); + InputStream is = compressAlgo.createDecompressionStream( + bufferedBoundedStream, decompressor, 0); + + IOUtils.readFully(is, dest, destOffset, uncompressedSize); + is.close(); + } finally { + if (decompressor != null) { + compressAlgo.returnDecompressor(decompressor); + } + } + } + } diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 8a31b37..88792c1 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH; -import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; @@ -29,7 +28,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.zip.Checksum; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FSDataInputStream; @@ -38,6 +36,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.regionserver.MemStore; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; @@ -45,14 +47,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompoundBloomFilter; -import org.apache.hadoop.hbase.util.ChecksumFactory; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; import com.google.common.base.Preconditions; @@ -114,8 +111,8 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * There is a 1 byte checksum type, followed by a 4 byte bytesPerChecksum * followed by another 4 byte value to store sizeofDataOnDisk. */ - static final int HEADER_SIZE = HEADER_SIZE_NO_CHECKSUM + Bytes.SIZEOF_BYTE + - 2 * Bytes.SIZEOF_INT; + public static final int HEADER_SIZE = HEADER_SIZE_NO_CHECKSUM + + Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT; /** * The size of block header when blockType is {@link BlockType#ENCODED_DATA}. @@ -125,7 +122,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { + DataBlockEncoding.ID_SIZE; /** Just an array of bytes of the right size. */ - static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE]; + public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE]; static final byte[] DUMMY_HEADER_NO_CHECKSUM = new byte[HEADER_SIZE_NO_CHECKSUM]; @@ -303,7 +300,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * @return the on-disk size of the data part of the block, header and * checksum not included. */ - int getOnDiskSizeWithoutHeader() { + public int getOnDiskSizeWithoutHeader() { return onDiskSizeWithoutHeader; } @@ -342,7 +339,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * * @return the buffer with header skipped */ - ByteBuffer getBufferWithoutHeader() { + public ByteBuffer getBufferWithoutHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(), buf.limit() - headerSize() - totalChecksumBytes()).slice(); } @@ -644,6 +641,11 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { /** Data block encoder used for data blocks */ private final HFileDataBlockEncoder dataBlockEncoder; + private HFileBlockEncodingContext dataBlockEncodingCtx; + + /** block encoding context for non-data blocks */ + private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; + /** * The stream we use to accumulate data in uncompressed format for each * block. We reset this stream at the end of each block and reuse it. The @@ -652,15 +654,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { */ private ByteArrayOutputStream baosInMemory; - /** Compressor, which is also reused between consecutive blocks. */ - private Compressor compressor; - - /** Compression output stream */ - private CompressionOutputStream compressionStream; - - /** Underlying stream to write compressed bytes to */ - private ByteArrayOutputStream compressedByteStream; - /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} @@ -682,12 +675,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { private byte[] onDiskBytesWithHeader; /** - * The size of the data on disk that does not include the checksums. - * (header + data) - */ - private int onDiskDataSizeWithHeader; - - /** * The size of the checksum data on disk. It is used only if data is * not compressed. If data is compressed, then the checksums are already * part of onDiskBytesWithHeader. If data is uncompressed, then this @@ -734,28 +721,23 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { public Writer(Compression.Algorithm compressionAlgorithm, HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, ChecksumType checksumType, int bytesPerChecksum) { - compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm; + compressAlgo = compressionAlgorithm == null ? Compression.Algorithm.NONE : + compressionAlgorithm; this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; + defaultBlockEncodingCtx = + new HFileBlockDefaultEncodingContext(compressionAlgorithm, null); + dataBlockEncodingCtx = + this.dataBlockEncoder.newOnDiskDataBlockEncodingContext( + compressionAlgorithm, DUMMY_HEADER); - baosInMemory = new ByteArrayOutputStream(); - if (compressAlgo != NONE) { - compressor = compressionAlgorithm.getCompressor(); - compressedByteStream = new ByteArrayOutputStream(); - try { - compressionStream = - compressionAlgorithm.createPlainCompressionStream( - compressedByteStream, compressor); - } catch (IOException e) { - throw new RuntimeException("Could not create compression stream " + - "for algorithm " + compressionAlgorithm, e); - } - } if (bytesPerChecksum < HEADER_SIZE) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " + HEADER_SIZE + " but the configured value is " + bytesPerChecksum); } + + baosInMemory = new ByteArrayOutputStream(); prevOffsetByType = new long[BlockType.values().length]; for (int i = 0; i < prevOffsetByType.length; ++i) @@ -828,7 +810,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { */ private void finishBlock() throws IOException { userDataStream.flush(); - // This does an array copy, so it is safe to cache this byte array. uncompressedBytesWithHeader = baosInMemory.toByteArray(); prevOffset = prevOffsetByType[blockType.getId()]; @@ -837,81 +818,32 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { // cache-on-write. In a way, the block is ready, but not yet encoded or // compressed. state = State.BLOCK_READY; - encodeDataBlockForDisk(); - - doCompressionAndChecksumming(); - } - - /** - * Do compression if it is enabled, or re-use the uncompressed buffer if - * it is not. Fills in the compressed block's header if doing compression. - * Also, compute the checksums. In the case of no-compression, write the - * checksums to its own seperate data structure called onDiskChecksum. In - * the case when compression is enabled, the checksums are written to the - * outputbyte stream 'baos'. - */ - private void doCompressionAndChecksumming() throws IOException { - // do the compression - if (compressAlgo != NONE) { - compressedByteStream.reset(); - compressedByteStream.write(DUMMY_HEADER); - - compressionStream.resetState(); - - compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE, - uncompressedBytesWithHeader.length - HEADER_SIZE); - - compressionStream.flush(); - compressionStream.finish(); - - // generate checksums - onDiskDataSizeWithHeader = compressedByteStream.size(); // data size - - // reserve space for checksums in the output byte stream - ChecksumUtil.reserveSpaceForChecksums(compressedByteStream, - onDiskDataSizeWithHeader, bytesPerChecksum); - - - onDiskBytesWithHeader = compressedByteStream.toByteArray(); - putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, - uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); - - // generate checksums for header and data. The checksums are - // part of onDiskBytesWithHeader itself. - ChecksumUtil.generateChecksums( - onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader, - onDiskBytesWithHeader, onDiskDataSizeWithHeader, - checksumType, bytesPerChecksum); + if (blockType == BlockType.DATA) { + encodeDataBlockForDisk(); + } else { + defaultBlockEncodingCtx.compressAfterEncoding( + uncompressedBytesWithHeader, blockType); + onDiskBytesWithHeader = + defaultBlockEncodingCtx.getOnDiskBytesWithHeader(); + } - // Checksums are already part of onDiskBytesWithHeader - onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY; + int numBytes = (int) ChecksumUtil.numBytes( + onDiskBytesWithHeader.length, + bytesPerChecksum); - //set the header for the uncompressed bytes (for cache-on-write) - putHeader(uncompressedBytesWithHeader, 0, - onDiskBytesWithHeader.length + onDiskChecksum.length, - uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); + // put the header for on disk bytes + putHeader(onDiskBytesWithHeader, 0, + onDiskBytesWithHeader.length + numBytes, + uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length); + //set the header for the uncompressed bytes (for cache-on-write) + putHeader(uncompressedBytesWithHeader, 0, + onDiskBytesWithHeader.length + numBytes, + uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length); - } else { - // If we are not using any compression, then the - // checksums are written to its own array onDiskChecksum. - onDiskBytesWithHeader = uncompressedBytesWithHeader; - - onDiskDataSizeWithHeader = onDiskBytesWithHeader.length; - int numBytes = (int)ChecksumUtil.numBytes( - uncompressedBytesWithHeader.length, - bytesPerChecksum); - onDiskChecksum = new byte[numBytes]; - - //set the header for the uncompressed bytes - putHeader(uncompressedBytesWithHeader, 0, - onDiskBytesWithHeader.length + onDiskChecksum.length, - uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader); - - ChecksumUtil.generateChecksums( - uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length, - onDiskChecksum, 0, - checksumType, bytesPerChecksum); - } + onDiskChecksum = new byte[numBytes]; + ChecksumUtil.generateChecksums( + onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, + onDiskChecksum, 0, checksumType, bytesPerChecksum); } /** @@ -919,35 +851,20 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * {@link #dataBlockEncoder}. */ private void encodeDataBlockForDisk() throws IOException { - if (blockType != BlockType.DATA) { - return; // skip any non-data block - } - // do data block encoding, if data block encoder is set - ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader, - HEADER_SIZE, uncompressedBytesWithHeader.length - - HEADER_SIZE).slice(); - Pair encodingResult = - dataBlockEncoder.beforeWriteToDisk(rawKeyValues, - includesMemstoreTS, DUMMY_HEADER); - - BlockType encodedBlockType = encodingResult.getSecond(); - if (encodedBlockType == BlockType.ENCODED_DATA) { - uncompressedBytesWithHeader = encodingResult.getFirst().array(); - blockType = BlockType.ENCODED_DATA; - } else { - // There is no encoding configured. Do some extra sanity-checking. - if (encodedBlockType != BlockType.DATA) { - throw new IOException("Unexpected block type coming out of data " + - "block encoder: " + encodedBlockType); - } - if (userDataStream.size() != - uncompressedBytesWithHeader.length - HEADER_SIZE) { - throw new IOException("Uncompressed size mismatch: " - + userDataStream.size() + " vs. " - + (uncompressedBytesWithHeader.length - HEADER_SIZE)); - } - } + ByteBuffer rawKeyValues = + ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE, + uncompressedBytesWithHeader.length - HEADER_SIZE).slice(); + + //do the encoding + dataBlockEncoder.beforeWriteToDisk(rawKeyValues, + includesMemstoreTS, dataBlockEncodingCtx, blockType); + + uncompressedBytesWithHeader = + dataBlockEncodingCtx.getUncompressedBytesWithHeader(); + onDiskBytesWithHeader = + dataBlockEncodingCtx.getOnDiskBytesWithHeader(); + blockType = dataBlockEncodingCtx.getBlockType(); } /** @@ -966,7 +883,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { offset = Bytes.putLong(dest, offset, prevOffset); offset = Bytes.putByte(dest, offset, checksumType.getCode()); offset = Bytes.putInt(dest, offset, bytesPerChecksum); - offset = Bytes.putInt(dest, offset, onDiskDataSizeWithHeader); + offset = Bytes.putInt(dest, offset, onDiskDataSize); } /** @@ -986,7 +903,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { } startOffset = offset; - writeHeaderAndData((DataOutputStream) out); + finishBlockAndWriteHeaderAndData((DataOutputStream) out); } /** @@ -998,17 +915,11 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * @param out the output stream to write the * @throws IOException */ - private void writeHeaderAndData(DataOutputStream out) throws IOException { + private void finishBlockAndWriteHeaderAndData(DataOutputStream out) + throws IOException { ensureBlockReady(); out.write(onDiskBytesWithHeader); - if (compressAlgo == NONE) { - if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) { - throw new IOException("A " + blockType - + " without compression should have checksums " - + " stored separately."); - } - out.write(onDiskChecksum); - } + out.write(onDiskChecksum); } /** @@ -1023,34 +934,29 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { */ byte[] getHeaderAndDataForTest() throws IOException { ensureBlockReady(); - if (compressAlgo == NONE) { - if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) { - throw new IOException("A " + blockType - + " without compression should have checksums " - + " stored separately."); - } - // This is not very optimal, because we are doing an extra copy. - // But this method is used only by unit tests. - byte[] output = new byte[onDiskBytesWithHeader.length + - onDiskChecksum.length]; - System.arraycopy(onDiskBytesWithHeader, 0, - output, 0, onDiskBytesWithHeader.length); - System.arraycopy(onDiskChecksum, 0, - output, onDiskBytesWithHeader.length, - onDiskChecksum.length); - return output; - } - return onDiskBytesWithHeader; + // This is not very optimal, because we are doing an extra copy. + // But this method is used only by unit tests. + byte[] output = + new byte[onDiskBytesWithHeader.length + + onDiskChecksum.length]; + System.arraycopy(onDiskBytesWithHeader, 0, output, 0, + onDiskBytesWithHeader.length); + System.arraycopy(onDiskChecksum, 0, output, + onDiskBytesWithHeader.length, onDiskChecksum.length); + return output; } /** - * Releases the compressor this writer uses to compress blocks into the - * compressor pool. Needs to be called before the writer is discarded. + * Releases resources used by this writer. */ - public void releaseCompressor() { - if (compressor != null) { - compressAlgo.returnCompressor(compressor); - compressor = null; + public void release() { + if (dataBlockEncodingCtx != null) { + dataBlockEncodingCtx.close(); + dataBlockEncodingCtx = null; + } + if (defaultBlockEncodingCtx != null) { + defaultBlockEncodingCtx.close(); + defaultBlockEncodingCtx = null; } } @@ -1252,7 +1158,7 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { private int minorVersion; /** The size of the header */ - protected int hdrSize; + protected final int hdrSize; /** The filesystem used to access data */ protected HFileSystem hfs; @@ -1377,36 +1283,6 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { } /** - * Decompresses data from the given stream using the configured compression - * algorithm. - * @param dest - * @param destOffset - * @param bufferedBoundedStream - * a stream to read compressed data from, bounded to the exact - * amount of compressed data - * @param uncompressedSize - * uncompressed data size, header not included - * @throws IOException - */ - protected void decompress(byte[] dest, int destOffset, - InputStream bufferedBoundedStream, - int uncompressedSize) throws IOException { - Decompressor decompressor = null; - try { - decompressor = compressAlgo.getDecompressor(); - InputStream is = compressAlgo.createDecompressionStream( - bufferedBoundedStream, decompressor, 0); - - IOUtils.readFully(is, dest, destOffset, uncompressedSize); - is.close(); - } finally { - if (decompressor != null) { - compressAlgo.returnDecompressor(decompressor); - } - } - } - - /** * Creates a buffered stream reading a certain slice of the file system * input stream. We need this because the decompression we use seems to * expect the input stream to be bounded. @@ -1511,8 +1387,9 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { } else { InputStream bufferedBoundedStream = createBufferedBoundedStream( offset, onDiskSize, pread); - decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA, - bufferedBoundedStream, uncompressedSizeWithMagic); + Compression.decompress(buf.array(), buf.arrayOffset() + + HEADER_DELTA, bufferedBoundedStream, onDiskSize, + uncompressedSizeWithMagic, this.compressAlgo); // We don't really have a good way to exclude the "magic record" size // from the compressed block's size, since it is compressed as well. @@ -1566,6 +1443,10 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { protected HFileDataBlockEncoder dataBlockEncoder = NoOpDataBlockEncoder.INSTANCE; + private HFileBlockDecodingContext encodedBlockDecodingCtx; + + private HFileBlockDefaultDecodingContext defaultDecodingCtx; + private ThreadLocal prefetchedHeaderForThread = new ThreadLocal() { @Override @@ -1598,6 +1479,10 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { useHBaseChecksum = false; } this.useHBaseChecksumConfigured = useHBaseChecksum; + defaultDecodingCtx = + new HFileBlockDefaultDecodingContext(compressAlgo); + encodedBlockDecodingCtx = + new HFileBlockDefaultDecodingContext(compressAlgo); } /** @@ -1716,9 +1601,8 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { * @return the HFileBlock or null if there is a HBase checksum mismatch */ private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, - long onDiskSizeWithHeaderL, - int uncompressedSize, boolean pread, boolean verifyChecksum) - throws IOException { + long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread, + boolean verifyChecksum) throws IOException { if (offset < 0) { throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" + onDiskSizeWithHeaderL @@ -1738,8 +1622,20 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { } int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL; + // See if we can avoid reading the header. This is desirable, because + // we will not incur a backward seek operation if we have already + // read this block's header as part of the previous read's look-ahead. + // And we also want to skip reading the header again if it has already + // been read. + PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); + ByteBuffer headerBuf = prefetchedHeader.offset == offset ? + prefetchedHeader.buf : null; + + int nextBlockOnDiskSize = 0; + // Allocate enough space to fit the next block's header too. + byte[] onDiskBlock = null; - HFileBlock b; + HFileBlock b = null; if (onDiskSizeWithHeader > 0) { // We know the total on-disk size but not the uncompressed size. Read // the entire block into memory, then parse the header and decompress @@ -1749,172 +1645,117 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { // block's header (e.g. this block's header) when reading the previous // block. This is the faster and more preferable case. - int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize; - assert onDiskSizeWithoutHeader >= 0; - - // See if we can avoid reading the header. This is desirable, because - // we will not incur a seek operation to seek back if we have already - // read this block's header as part of the previous read's look-ahead. - PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - byte[] header = prefetchedHeader.offset == offset - ? prefetchedHeader.header : null; - // Size that we have to skip in case we have already read the header. - int preReadHeaderSize = header == null ? 0 : hdrSize; - - if (compressAlgo == Compression.Algorithm.NONE) { - // Just read the whole thing. Allocate enough space to read the - // next block's header too. - - ByteBuffer headerAndData = ByteBuffer.allocate(onDiskSizeWithHeader - + hdrSize); - headerAndData.limit(onDiskSizeWithHeader); - - if (header != null) { - System.arraycopy(header, 0, headerAndData.array(), 0, - hdrSize); - } - - int nextBlockOnDiskSizeWithHeader = readAtOffset(is, - headerAndData.array(), headerAndData.arrayOffset() - + preReadHeaderSize, onDiskSizeWithHeader - - preReadHeaderSize, true, offset + preReadHeaderSize, - pread); - - b = new HFileBlock(headerAndData, getMinorVersion()); - b.assumeUncompressed(); - b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); - b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSizeWithHeader; - if (verifyChecksum && - !validateBlockChecksum(b, headerAndData.array(), hdrSize)) { - return null; // checksum mismatch - } - if (b.nextBlockOnDiskSizeWithHeader > 0) - setNextBlockHeader(offset, b); + int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; + onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; + nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, + preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, + true, offset + preReadHeaderSize, pread); + if (headerBuf != null) { + // the header has been read when reading the previous block, copy + // to this block's header + System.arraycopy(headerBuf.array(), + headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); } else { - // Allocate enough space to fit the next block's header too. - byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; - - int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, - preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, - true, offset + preReadHeaderSize, pread); - - if (header == null) - header = onDiskBlock; - - try { - b = new HFileBlock(ByteBuffer.wrap(header, 0, hdrSize), - getMinorVersion()); - } catch (IOException ex) { - // Seen in load testing. Provide comprehensive debug info. - throw new IOException("Failed to read compressed block at " - + offset + ", onDiskSizeWithoutHeader=" + onDiskSizeWithHeader - + ", preReadHeaderSize=" + preReadHeaderSize - + ", header.length=" + header.length + ", header bytes: " - + Bytes.toStringBinary(header, 0, hdrSize), ex); - } - b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); - b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; - if (verifyChecksum && - !validateBlockChecksum(b, onDiskBlock, hdrSize)) { - return null; // checksum mismatch - } - - DataInputStream dis = new DataInputStream(new ByteArrayInputStream( - onDiskBlock, hdrSize, onDiskSizeWithoutHeader)); - - // This will allocate a new buffer but keep header bytes. - b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0); - - decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis, - b.uncompressedSizeWithoutHeader); - - // Copy next block's header bytes into the new block if we have them. - if (nextBlockOnDiskSize > 0) { - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(), - b.buf.arrayOffset() + hdrSize - + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), - hdrSize); - - setNextBlockHeader(offset, b); - } + headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); } - + // We know the total on-disk size but not the uncompressed size. Read + // the entire block into memory, then parse the header and decompress + // from memory if using compression. Here we have already read the + // block's header + try { + b = new HFileBlock(headerBuf, getMinorVersion()); + } catch (IOException ex) { + // Seen in load testing. Provide comprehensive debug info. + throw new IOException("Failed to read compressed block at " + + offset + + ", onDiskSizeWithoutHeader=" + + onDiskSizeWithHeader + + ", preReadHeaderSize=" + + hdrSize + + ", header.length=" + + prefetchedHeader.header.length + + ", header bytes: " + + Bytes.toStringBinary(prefetchedHeader.header, 0, + hdrSize), ex); + } + // if the caller specifies a onDiskSizeWithHeader, validate it. + int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize; + assert onDiskSizeWithoutHeader >= 0; + b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); } else { - // We don't know the on-disk size. Read the header first, determine the - // on-disk size from it, and read the remaining data, thereby incurring - // two read operations. This might happen when we are doing the first - // read in a series of reads or a random read, and we don't have access - // to the block index. This is costly and should happen very rarely. - - // Check if we have read this block's header as part of reading the - // previous block. If so, don't read the header again. - PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - ByteBuffer headerBuf = prefetchedHeader.offset == offset ? - prefetchedHeader.buf : null; - + // Check headerBuf to see if we have read this block's header as part of + // reading the previous block. This is an optimization of peeking into + // the next block's header (e.g.this block's header) when reading the + // previous block. This is the faster and more preferable case. If the + // header is already there, don't read the header again. + + // Unfortunately, we still have to do a separate read operation to + // read the header. if (headerBuf == null) { - // Unfortunately, we still have to do a separate read operation to - // read the header. + // From the header, determine the on-disk size of the given hfile + // block, and read the remaining data, thereby incurring two read + // operations. This might happen when we are doing the first read + // in a series of reads or a random read, and we don't have access + // to the block index. This is costly and should happen very rarely. headerBuf = ByteBuffer.allocate(hdrSize); - readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, - false, offset, pread); + readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), + hdrSize, false, offset, pread); } b = new HFileBlock(headerBuf, getMinorVersion()); + onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; + System.arraycopy(headerBuf.array(), + headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); + nextBlockOnDiskSize = + readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() + - hdrSize, true, offset + hdrSize, pread); + onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; + } - // This will also allocate enough room for the next block's header. - b.allocateBuffer(true); - - if (compressAlgo == Compression.Algorithm.NONE) { - - // Avoid creating bounded streams and using a "codec" that does - // nothing. - b.assumeUncompressed(); - b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, b.buf.array(), - b.buf.arrayOffset() + hdrSize, - b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), - true, offset + hdrSize, - pread); - if (verifyChecksum && - !validateBlockChecksum(b, b.buf.array(), hdrSize)) { - return null; // checksum mismatch - } + boolean isCompressed = + compressAlgo != null + && compressAlgo != Compression.Algorithm.NONE; + if (!isCompressed) { + b.assumeUncompressed(); + } - if (b.nextBlockOnDiskSizeWithHeader > 0) { - setNextBlockHeader(offset, b); - } + if (verifyChecksum && + !validateBlockChecksum(b, onDiskBlock, hdrSize)) { + return null; // checksum mismatch + } + + if (isCompressed) { + // This will allocate a new buffer but keep header bytes. + b.allocateBuffer(nextBlockOnDiskSize > 0); + if (b.blockType.equals(BlockType.ENCODED_DATA)) { + encodedBlockDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize); } else { - // Allocate enough space for the block's header and compressed data. - byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader() - + hdrSize]; - - b.nextBlockOnDiskSizeWithHeader = readAtOffset(is, compressedBytes, - hdrSize, b.onDiskSizeWithoutHeader, true, offset - + hdrSize, pread); - if (verifyChecksum && - !validateBlockChecksum(b, compressedBytes, hdrSize)) { - return null; // checksum mismatch - } - DataInputStream dis = new DataInputStream(new ByteArrayInputStream( - compressedBytes, hdrSize, b.onDiskSizeWithoutHeader)); - - decompress(b.buf.array(), b.buf.arrayOffset() + hdrSize, dis, - b.uncompressedSizeWithoutHeader); - - if (b.nextBlockOnDiskSizeWithHeader > 0) { - // Copy the next block's header into the new block. - int nextHeaderOffset = b.buf.arrayOffset() + hdrSize - + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(); - System.arraycopy(compressedBytes, - compressedBytes.length - hdrSize, - b.buf.array(), - nextHeaderOffset, - hdrSize); - - setNextBlockHeader(offset, b); - } + defaultDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize); } + if (nextBlockOnDiskSize > 0) { + // Copy next block's header bytes into the new block if we have them. + System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(), + b.buf.arrayOffset() + hdrSize + + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), + hdrSize); + } + } else { + // The onDiskBlock will become the headerAndDataBuffer for this block. + // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already + // contains the header of next block, so no need to set next + // block's header in it. + b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, + onDiskSizeWithHeader), getMinorVersion()); + } + + b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; + + // Set prefetched header + if (b.nextBlockOnDiskSizeWithHeader > 0) { + prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); + System.arraycopy(onDiskBlock, onDiskSizeWithHeader, + prefetchedHeader.header, 0, hdrSize); } b.includesMemstoreTS = includesMemstoreTS; @@ -1922,21 +1763,14 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { return b; } - private void setNextBlockHeader(long offset, HFileBlock b) { - PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); - int nextHeaderOffset = b.buf.arrayOffset() + hdrSize - + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(); - System.arraycopy(b.buf.array(), nextHeaderOffset, - prefetchedHeader.header, 0, hdrSize); - } - void setIncludesMemstoreTS(boolean enabled) { includesMemstoreTS = enabled; } void setDataBlockEncoder(HFileDataBlockEncoder encoder) { this.dataBlockEncoder = encoder; + encodedBlockDecodingCtx = encoder.newOnDiskDataBlockDecodingContext( + this.compressAlgo); } /** diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java index 55d958d..8a109c4 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java @@ -21,8 +21,10 @@ import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.Pair; /** * Controls what kind of data block encoding is used. If data block encoding is @@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.util.Pair; */ @InterfaceAudience.Private public interface HFileDataBlockEncoder { + /** * Converts a block from the on-disk format to the in-cache format. Called in * the following cases: @@ -51,12 +54,14 @@ public interface HFileDataBlockEncoder { * Should be called before an encoded or unencoded data block is written to * disk. * @param in KeyValues next to each other - * @param dummyHeader A dummy header to be written as a placeholder - * @return a non-null on-heap buffer containing the contents of the - * HFileBlock with unfilled header and block type + * @param encodingResult the encoded result + * @param blockType block type + * @throws IOException */ - public Pair beforeWriteToDisk( - ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader); + public void beforeWriteToDisk( + ByteBuffer in, boolean includesMemstoreTS, + HFileBlockEncodingContext encodingResult, + BlockType blockType) throws IOException; /** * Decides whether we should use a scanner over encoded blocks. @@ -85,4 +90,27 @@ public interface HFileDataBlockEncoder { */ public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction); + /** + * Create an encoder specific encoding context object for writing. And the + * encoding context should also perform compression if compressionAlgorithm is + * valid. + * + * @param compressionAlgorithm compression algorithm + * @param headerBytes header bytes + * @return a new {@link HFileBlockEncodingContext} object + */ + public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext( + Algorithm compressionAlgorithm, byte[] headerBytes); + + /** + * create a encoder specific decoding context for reading. And the + * decoding context should also do decompression if compressionAlgorithm + * is valid. + * + * @param compressionAlgorithm + * @return a new {@link HFileBlockDecodingContext} object + */ + public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext( + Algorithm compressionAlgorithm); + } diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java index 8371bf2..9f14bcd 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java @@ -16,18 +16,20 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import com.google.common.base.Preconditions; @@ -39,6 +41,7 @@ import com.google.common.base.Preconditions; public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { private final DataBlockEncoding onDisk; private final DataBlockEncoding inCache; + private final HFileBlockEncodingContext inCacheEncodeCtx; public HFileDataBlockEncoderImpl(DataBlockEncoding encoding) { this(encoding, encoding); @@ -54,10 +57,36 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { */ public HFileDataBlockEncoderImpl(DataBlockEncoding onDisk, DataBlockEncoding inCache) { + this(onDisk, inCache, null); + } + + /** + * Do data block encoding with specified options. + * @param onDisk What kind of data block encoding will be used before writing + * HFileBlock to disk. This must be either the same as inCache or + * {@link DataBlockEncoding#NONE}. + * @param inCache What kind of data block encoding will be used in block + * cache. + * @param dummyHeader dummy header bytes + */ + public HFileDataBlockEncoderImpl(DataBlockEncoding onDisk, + DataBlockEncoding inCache, byte[] dummyHeader) { + dummyHeader = dummyHeader == null ? HFileBlock.DUMMY_HEADER : dummyHeader; this.onDisk = onDisk != null ? onDisk : DataBlockEncoding.NONE; this.inCache = inCache != null ? inCache : DataBlockEncoding.NONE; + if (inCache != DataBlockEncoding.NONE) { + inCacheEncodeCtx = + this.inCache.getEncoder().newDataBlockEncodingContext( + Algorithm.NONE, this.inCache, dummyHeader); + } else { + // create a default encoding context + inCacheEncodeCtx = + new HFileBlockDefaultEncodingContext(Algorithm.NONE, + this.inCache, dummyHeader); + } + Preconditions.checkArgument(onDisk == DataBlockEncoding.NONE || onDisk == inCache, "on-disk encoding (" + onDisk + ") must be " + "either the same as in-cache encoding (" + inCache + ") or " + @@ -131,7 +160,8 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { return block; } // Encode the unencoded block with the in-cache encoding. - return encodeDataBlock(block, inCache, block.doesIncludeMemstoreTS()); + return encodeDataBlock(block, inCache, block.doesIncludeMemstoreTS(), + inCacheEncodeCtx); } if (block.getBlockType() == BlockType.ENCODED_DATA) { @@ -149,21 +179,25 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { } /** - * Precondition: a non-encoded buffer. - * Postcondition: on-disk encoding. + * Precondition: a non-encoded buffer. Postcondition: on-disk encoding. + * + * The encoded results can be stored in {@link HFileBlockEncodingContext}. + * + * @throws IOException */ @Override - public Pair beforeWriteToDisk(ByteBuffer in, - boolean includesMemstoreTS, byte[] dummyHeader) { + public void beforeWriteToDisk(ByteBuffer in, + boolean includesMemstoreTS, + HFileBlockEncodingContext encodeCtx, + BlockType blockType) throws IOException { if (onDisk == DataBlockEncoding.NONE) { // there is no need to encode the block before writing it to disk - return new Pair(in, BlockType.DATA); + ((HFileBlockDefaultEncodingContext) encodeCtx).compressAfterEncoding( + in.array(), blockType); + return; } - - ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in, - onDisk, includesMemstoreTS, dummyHeader); - return new Pair(encodedBuffer, - BlockType.ENCODED_DATA); + encodeBufferToHFileBlockBuffer(in, onDisk, + includesMemstoreTS, encodeCtx); } @Override @@ -174,34 +208,42 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { return inCache != DataBlockEncoding.NONE; } - private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in, + /** + * Encode a block of key value pairs. + * + * @param in input data to encode + * @param algo encoding algorithm + * @param includesMemstoreTS includes memstore timestamp or not + * @param encodeCtx where will the output data be stored + */ + private void encodeBufferToHFileBlockBuffer(ByteBuffer in, DataBlockEncoding algo, boolean includesMemstoreTS, - byte[] dummyHeader) { - ByteArrayOutputStream encodedStream = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(encodedStream); + HFileBlockEncodingContext encodeCtx) { DataBlockEncoder encoder = algo.getEncoder(); try { - encodedStream.write(dummyHeader); - algo.writeIdInBytes(dataOut); - encoder.compressKeyValues(dataOut, in, - includesMemstoreTS); + encoder.compressKeyValues(in, includesMemstoreTS, encodeCtx); } catch (IOException e) { - throw new RuntimeException(String.format("Bug in data block encoder " + - "'%s', it probably requested too much data", algo.toString()), e); + throw new RuntimeException(String.format( + "Bug in data block encoder " + + "'%s', it probably requested too much data, " + + "exception message: %s.", + algo.toString(), e.getMessage()), e); } - return ByteBuffer.wrap(encodedStream.toByteArray()); } private HFileBlock encodeDataBlock(HFileBlock block, - DataBlockEncoding algo, boolean includesMemstoreTS) { - ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer( - block.getBufferWithoutHeader(), algo, includesMemstoreTS, - HFileBlock.DUMMY_HEADER); - int sizeWithoutHeader = compressedBuffer.limit() - block.headerSize(); + DataBlockEncoding algo, boolean includesMemstoreTS, + HFileBlockEncodingContext encodingCtx) { + encodeBufferToHFileBlockBuffer( + block.getBufferWithoutHeader(), algo, includesMemstoreTS, encodingCtx); + byte[] encodedUncompressedBytes = + encodingCtx.getUncompressedBytesWithHeader(); + ByteBuffer bufferWrapper = ByteBuffer.wrap(encodedUncompressedBytes); + int sizeWithoutHeader = bufferWrapper.limit() - encodingCtx.getHeaderSize(); HFileBlock encodedBlock = new HFileBlock(BlockType.ENCODED_DATA, block.getOnDiskSizeWithoutHeader(), sizeWithoutHeader, block.getPrevBlockOffset(), - compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(), + bufferWrapper, HFileBlock.FILL_HEADER, block.getOffset(), includesMemstoreTS, block.getMinorVersion(), block.getBytesPerChecksum(), block.getChecksumType(), block.getOnDiskDataSizeWithHeader()); @@ -215,4 +257,31 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { inCache + ")"; } + @Override + public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext( + Algorithm compressionAlgorithm, byte[] dummyHeader) { + if (onDisk != null) { + DataBlockEncoder encoder = onDisk.getEncoder(); + if (encoder != null) { + return encoder.newDataBlockEncodingContext( + compressionAlgorithm, onDisk, dummyHeader); + } + } + return new HFileBlockDefaultEncodingContext(compressionAlgorithm, + null, dummyHeader); + } + + @Override + public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext( + Algorithm compressionAlgorithm) { + if (onDisk != null) { + DataBlockEncoder encoder = onDisk.getEncoder(); + if (encoder != null) { + return encoder.newDataBlockDecodingContext( + compressionAlgorithm); + } + } + return new HFileBlockDefaultDecodingContext(compressionAlgorithm); + } + } diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index 3392400..76b77c5 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -432,7 +432,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { finishClose(trailer); - fsBlockWriter.releaseCompressor(); + fsBlockWriter.release(); } @Override diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java index 8a6351f..80b62a7 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java @@ -16,12 +16,17 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.Pair; /** * Does not perform any kind of encoding/decoding. @@ -45,9 +50,19 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder { } @Override - public Pair beforeWriteToDisk( - ByteBuffer in, boolean includesMemstoreTS, byte[] dummyHeader) { - return new Pair(in, BlockType.DATA); + public void beforeWriteToDisk(ByteBuffer in, + boolean includesMemstoreTS, + HFileBlockEncodingContext encodeCtx, BlockType blockType) + throws IOException { + if (!(encodeCtx.getClass().getName().equals( + HFileBlockDefaultEncodingContext.class.getName()))) { + throw new IOException (this.getClass().getName() + " only accepts " + + HFileBlockDefaultEncodingContext.class.getName() + "."); + } + + HFileBlockDefaultEncodingContext defaultContext = + (HFileBlockDefaultEncodingContext) encodeCtx; + defaultContext.compressAfterEncoding(in.array(), blockType); } @Override @@ -79,4 +94,17 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder { return getClass().getSimpleName(); } + @Override + public HFileBlockEncodingContext newOnDiskDataBlockEncodingContext( + Algorithm compressionAlgorithm, byte[] dummyHeader) { + return new HFileBlockDefaultEncodingContext(compressionAlgorithm, + null, dummyHeader); + } + + @Override + public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext( + Algorithm compressionAlgorithm) { + return new HFileBlockDefaultDecodingContext(compressionAlgorithm); + } + } diff --git src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java index f7b7397..b35772d 100644 --- src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java +++ src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java @@ -20,9 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -34,6 +32,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,6 +51,9 @@ public class TestDataBlockEncoders { static int NUMBER_OF_KV = 10000; static int NUM_RANDOM_SEEKS = 10000; + private static int ENCODED_DATA_OFFSET = + HFileBlock.HEADER_SIZE + DataBlockEncoding.ID_SIZE; + private RedundantKVGenerator generator = new RedundantKVGenerator(); private Random randomizer = new Random(42l); @@ -65,17 +68,44 @@ public class TestDataBlockEncoders { this.includesMemstoreTS = includesMemstoreTS; } - private void testAlgorithm(ByteBuffer dataset, DataBlockEncoder encoder) + private HFileBlockEncodingContext getEncodingContext( + Compression.Algorithm algo, DataBlockEncoding encoding) { + DataBlockEncoder encoder = encoding.getEncoder(); + if (encoder != null) { + return encoder.newDataBlockEncodingContext(algo, encoding, + HFileBlock.DUMMY_HEADER); + } else { + return new HFileBlockDefaultEncodingContext(algo, encoding); + } + } + + private byte[] encodeBytes(DataBlockEncoding encoding, + ByteBuffer dataset) throws IOException { + DataBlockEncoder encoder = encoding.getEncoder(); + HFileBlockEncodingContext encodingCtx = + getEncodingContext(Compression.Algorithm.NONE, encoding); + + encoder.compressKeyValues(dataset, includesMemstoreTS, + encodingCtx); + + byte[] encodedBytesWithHeader = + encodingCtx.getUncompressedBytesWithHeader(); + byte[] encodedData = + new byte[encodedBytesWithHeader.length - ENCODED_DATA_OFFSET]; + System.arraycopy(encodedBytesWithHeader, ENCODED_DATA_OFFSET, encodedData, + 0, encodedData.length); + return encodedData; + } + + private void testAlgorithm(ByteBuffer dataset, DataBlockEncoding encoding) throws IOException { // encode - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(baos); - encoder.compressKeyValues(dataOut, dataset, includesMemstoreTS); - - // decode - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + byte[] encodedBytes = encodeBytes(encoding, dataset); + //decode + ByteArrayInputStream bais = new ByteArrayInputStream(encodedBytes); DataInputStream dis = new DataInputStream(bais); ByteBuffer actualDataset; + DataBlockEncoder encoder = encoding.getEncoder(); actualDataset = encoder.uncompressKeyValues(dis, includesMemstoreTS); dataset.rewind(); @@ -142,17 +172,17 @@ public class TestDataBlockEncoders { ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, includesMemstoreTS); - List dataBlockEncoders = - DataBlockEncoding.getAllEncoders(); // create all seekers List encodedSeekers = new ArrayList(); - for (DataBlockEncoder encoder : dataBlockEncoders) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(baos); - encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS); - ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray()); + for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + if (encoding.getEncoder() == null) { + continue; + } + ByteBuffer encodedBuffer = + ByteBuffer.wrap(encodeBytes(encoding, originalBuffer)); + DataBlockEncoder encoder = encoding.getEncoder(); DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS); seeker.setCurrentBuffer(encodedBuffer); @@ -195,20 +225,19 @@ public class TestDataBlockEncoders { ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, includesMemstoreTS); - List dataBlockEncoders = - DataBlockEncoding.getAllEncoders(); - for (DataBlockEncoder encoder : dataBlockEncoders) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(baos); + for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + if (encoding.getEncoder() == null) { + continue; + } + DataBlockEncoder encoder = encoding.getEncoder(); + ByteBuffer encodedBuffer = null; try { - encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS); + encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer)); } catch (IOException e) { throw new RuntimeException(String.format( "Bug while encoding using '%s'", encoder.toString()), e); } - - ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray()); DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(KeyValue.KEY_COMPARATOR, includesMemstoreTS); seeker.setCurrentBuffer(encodedBuffer); @@ -255,20 +284,19 @@ public class TestDataBlockEncoders { ByteBuffer originalBuffer = RedundantKVGenerator.convertKvToByteBuffer(sampleKv, includesMemstoreTS); - List dataBlockEncoders = - DataBlockEncoding.getAllEncoders(); - for (DataBlockEncoder encoder : dataBlockEncoders) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(baos); + for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + if (encoding.getEncoder() == null) { + continue; + } + DataBlockEncoder encoder = encoding.getEncoder(); + ByteBuffer encodedBuffer = null; try { - encoder.compressKeyValues(dataOut, originalBuffer, includesMemstoreTS); + encodedBuffer = ByteBuffer.wrap(encodeBytes(encoding, originalBuffer)); } catch (IOException e) { throw new RuntimeException(String.format( "Bug while encoding using '%s'", encoder.toString()), e); } - - ByteBuffer encodedBuffer = ByteBuffer.wrap(baos.toByteArray()); ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer); KeyValue firstKv = sampleKv.get(0); if (0 != Bytes.compareTo( @@ -327,16 +355,17 @@ public class TestDataBlockEncoders { private void testEncodersOnDataset(ByteBuffer onDataset) throws IOException{ - List dataBlockEncoders = - DataBlockEncoding.getAllEncoders(); ByteBuffer dataset = ByteBuffer.allocate(onDataset.capacity()); onDataset.rewind(); dataset.put(onDataset); onDataset.rewind(); dataset.flip(); - for (DataBlockEncoder encoder : dataBlockEncoders) { - testAlgorithm(dataset, encoder); + for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + if (encoding.getEncoder() == null) { + continue; + } + testAlgorithm(dataset, encoding); // ensure that dataset is unchanged dataset.rewind(); diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 6456ccb..ac9be76 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -50,12 +50,15 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.DoubleOutputStream; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*; @@ -203,7 +206,7 @@ public class TestHFileBlock { writeTestBlockContents(dos); byte[] headerAndData = hbw.getHeaderAndDataForTest(); assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); - hbw.releaseCompressor(); + hbw.release(); return hbw; } @@ -371,9 +374,8 @@ public class TestHFileBlock { final List encodedBlocks = new ArrayList(); for (int blockId = 0; blockId < numBlocks; ++blockId) { DataOutputStream dos = hbw.startWriting(BlockType.DATA); - writeEncodedBlock(encoding, dos, encodedSizes, encodedBlocks, - blockId, includesMemstoreTS); - + writeEncodedBlock(algo, encoding, dos, encodedSizes, encodedBlocks, + blockId, includesMemstoreTS, HFileBlock.DUMMY_HEADER); hbw.writeHeaderAndData(os); totalSize += hbw.getOnDiskSizeWithHeader(); } @@ -392,7 +394,6 @@ public class TestHFileBlock { assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); pos += b.getOnDiskSizeWithHeader(); - assertEquals((int) encodedSizes.get(blockId), b.getUncompressedSizeWithoutHeader()); ByteBuffer actualBuffer = b.getBufferWithoutHeader(); @@ -417,35 +418,52 @@ public class TestHFileBlock { } } - static void writeEncodedBlock(DataBlockEncoding encoding, - DataOutputStream dos, final List encodedSizes, + static void writeEncodedBlock(Algorithm algo, DataBlockEncoding encoding, + DataOutputStream dos, final List encodedSizes, final List encodedBlocks, int blockId, - boolean includesMemstoreTS) throws IOException { + boolean includesMemstoreTS, byte[] dummyHeader) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DoubleOutputStream doubleOutputStream = new DoubleOutputStream(dos, baos); - - final int rawBlockSize = writeTestKeyValues(doubleOutputStream, - blockId, includesMemstoreTS); - + writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS); ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray()); rawBuf.rewind(); - final int encodedSize; - final ByteBuffer encodedBuf; - if (encoding == DataBlockEncoding.NONE) { - encodedSize = rawBlockSize; - encodedBuf = rawBuf; + DataBlockEncoder encoder = encoding.getEncoder(); + int headerLen = dummyHeader.length; + byte[] encodedResultWithHeader = null; + if (encoder != null) { + HFileBlockEncodingContext encodingCtx = + encoder.newDataBlockEncodingContext(algo, encoding, dummyHeader); + encoder.compressKeyValues(rawBuf, includesMemstoreTS, + encodingCtx); + encodedResultWithHeader = + encodingCtx.getUncompressedBytesWithHeader(); } else { - ByteArrayOutputStream encodedOut = new ByteArrayOutputStream(); - encoding.getEncoder().compressKeyValues( - new DataOutputStream(encodedOut), - rawBuf.duplicate(), includesMemstoreTS); + HFileBlockDefaultEncodingContext defaultEncodingCtx = + new HFileBlockDefaultEncodingContext(algo, encoding, dummyHeader); + byte[] rawBufWithHeader = + new byte[rawBuf.array().length + headerLen]; + System.arraycopy(rawBuf.array(), 0, rawBufWithHeader, + headerLen, rawBuf.array().length); + defaultEncodingCtx.compressAfterEncoding(rawBufWithHeader, + BlockType.DATA); + encodedResultWithHeader = + defaultEncodingCtx.getUncompressedBytesWithHeader(); + } + final int encodedSize = + encodedResultWithHeader.length - headerLen; + if (encoder != null) { // We need to account for the two-byte encoding algorithm ID that // comes after the 24-byte block header but before encoded KVs. - encodedSize = encodedOut.size() + DataBlockEncoding.ID_SIZE; - encodedBuf = ByteBuffer.wrap(encodedOut.toByteArray()); + headerLen += DataBlockEncoding.ID_SIZE; } + byte[] encodedDataSection = + new byte[encodedResultWithHeader.length - headerLen]; + System.arraycopy(encodedResultWithHeader, headerLen, + encodedDataSection, 0, encodedDataSection.length); + final ByteBuffer encodedBuf = + ByteBuffer.wrap(encodedDataSection); encodedSizes.add(encodedSize); encodedBlocks.add(encodedBuf); } diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index 4d9b158..8609425 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -19,7 +19,11 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.junit.Assert.*; +import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.GZ; +import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -28,40 +32,23 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.DoubleOutputStream; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; -import org.apache.hadoop.hbase.util.Pair; -import com.google.common.base.Preconditions; - -import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*; +import org.apache.hadoop.io.compress.Compressor; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -69,6 +56,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import com.google.common.base.Preconditions; + /** * This class has unit tests to prove that older versions of * HFiles (without checksums) are compatible with current readers. @@ -129,7 +118,8 @@ public class TestHFileBlockCompatibility { includesMemstoreTS); DataOutputStream dos = hbw.startWriting(blockType); TestHFileBlock.writeTestBlockContents(dos); - byte[] headerAndData = hbw.getHeaderAndData(); + // make sure the block is ready by calling hbw.getHeaderAndData() + hbw.getHeaderAndData(); assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); hbw.releaseCompressor(); return hbw; @@ -145,7 +135,7 @@ public class TestHFileBlockCompatibility { // variations across operating systems. // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. testV2Block[osOffset] = 3; - } + } return Bytes.toStringBinary(testV2Block); } @@ -173,8 +163,9 @@ public class TestHFileBlockCompatibility { + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\s\\xA0\\x0F\\x00\\x00"; final int correctGzipBlockLength = 82; - assertEquals(correctTestBlockStr, createTestBlockStr(GZ, - correctGzipBlockLength)); + + String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength); + assertEquals(correctTestBlockStr, returnedStr); } @Test @@ -288,16 +279,19 @@ public class TestHFileBlockCompatibility { + algo + "_" + encoding.toString()); FSDataOutputStream os = fs.create(path); HFileDataBlockEncoder dataBlockEncoder = - new HFileDataBlockEncoderImpl(encoding); - Writer hbw = new Writer(algo, dataBlockEncoder, - includesMemstoreTS); + new HFileDataBlockEncoderImpl(encoding, encoding, + TestHFileBlockCompatibility.Writer.DUMMY_HEADER); + TestHFileBlockCompatibility.Writer hbw = + new TestHFileBlockCompatibility.Writer(algo, + dataBlockEncoder, includesMemstoreTS); long totalSize = 0; final List encodedSizes = new ArrayList(); final List encodedBlocks = new ArrayList(); for (int blockId = 0; blockId < numBlocks; ++blockId) { DataOutputStream dos = hbw.startWriting(BlockType.DATA); - TestHFileBlock.writeEncodedBlock(encoding, dos, encodedSizes, encodedBlocks, - blockId, includesMemstoreTS); + TestHFileBlock.writeEncodedBlock(algo, encoding, dos, encodedSizes, + encodedBlocks, blockId, includesMemstoreTS, + TestHFileBlockCompatibility.Writer.DUMMY_HEADER); hbw.writeHeaderAndData(os); totalSize += hbw.getOnDiskSizeWithHeader(); @@ -332,8 +326,8 @@ public class TestHFileBlockCompatibility { expectedBuffer.rewind(); // test if content matches, produce nice message - TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, - pread); + TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer, + algo, encoding, pread); } is.close(); } @@ -378,6 +372,10 @@ public class TestHFileBlockCompatibility { /** Data block encoder used for data blocks */ private final HFileDataBlockEncoder dataBlockEncoder; + private HFileBlockEncodingContext dataBlockEncodingCtx; + /** block encoding context for non-data blocks */ + private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; + /** * The stream we use to accumulate data in uncompressed format for each * block. We reset this stream at the end of each block and reuse it. The @@ -389,12 +387,6 @@ public class TestHFileBlockCompatibility { /** Compressor, which is also reused between consecutive blocks. */ private Compressor compressor; - /** Compression output stream */ - private CompressionOutputStream compressionStream; - - /** Underlying stream to write compressed bytes to */ - private ByteArrayOutputStream compressedByteStream; - /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} @@ -449,19 +441,14 @@ public class TestHFileBlockCompatibility { this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; + defaultBlockEncodingCtx = + new HFileBlockDefaultEncodingContext(compressionAlgorithm, + null, DUMMY_HEADER); + dataBlockEncodingCtx = + this.dataBlockEncoder.newOnDiskDataBlockEncodingContext( + compressionAlgorithm, DUMMY_HEADER); + baosInMemory = new ByteArrayOutputStream(); - if (compressAlgo != NONE) { - compressor = compressionAlgorithm.getCompressor(); - compressedByteStream = new ByteArrayOutputStream(); - try { - compressionStream = - compressionAlgorithm.createPlainCompressionStream( - compressedByteStream, compressor); - } catch (IOException e) { - throw new RuntimeException("Could not create compression stream " + - "for algorithm " + compressionAlgorithm, e); - } - } prevOffsetByType = new long[BlockType.values().length]; for (int i = 0; i < prevOffsetByType.length; ++i) @@ -532,48 +519,31 @@ public class TestHFileBlockCompatibility { */ private void finishBlock() throws IOException { userDataStream.flush(); - // This does an array copy, so it is safe to cache this byte array. uncompressedBytesWithHeader = baosInMemory.toByteArray(); - LOG.warn("Writer.finishBlock user data size with header before compression " + - uncompressedBytesWithHeader.length); prevOffset = prevOffsetByType[blockType.getId()]; // We need to set state before we can package the block up for // cache-on-write. In a way, the block is ready, but not yet encoded or // compressed. state = State.BLOCK_READY; - encodeDataBlockForDisk(); - - doCompression(); - putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length, - uncompressedBytesWithHeader.length); - } - - /** - * Do compression if it is enabled, or re-use the uncompressed buffer if - * it is not. Fills in the compressed block's header if doing compression. - */ - private void doCompression() throws IOException { - // do the compression - if (compressAlgo != NONE) { - compressedByteStream.reset(); - compressedByteStream.write(DUMMY_HEADER); - - compressionStream.resetState(); - - compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE, - uncompressedBytesWithHeader.length - HEADER_SIZE); - - compressionStream.flush(); - compressionStream.finish(); - - onDiskBytesWithHeader = compressedByteStream.toByteArray(); - putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, - uncompressedBytesWithHeader.length); + if (blockType == BlockType.DATA) { + encodeDataBlockForDisk(); } else { - onDiskBytesWithHeader = uncompressedBytesWithHeader; + defaultBlockEncodingCtx.compressAfterEncoding( + uncompressedBytesWithHeader, blockType); + onDiskBytesWithHeader = + defaultBlockEncodingCtx.getOnDiskBytesWithHeader(); } + + // put the header for on disk bytes + putHeader(onDiskBytesWithHeader, 0, + onDiskBytesWithHeader.length, + uncompressedBytesWithHeader.length); + //set the header for the uncompressed bytes (for cache-on-write) + putHeader(uncompressedBytesWithHeader, 0, + onDiskBytesWithHeader.length, + uncompressedBytesWithHeader.length); } /** @@ -581,35 +551,20 @@ public class TestHFileBlockCompatibility { * {@link #dataBlockEncoder}. */ private void encodeDataBlockForDisk() throws IOException { - if (blockType != BlockType.DATA) { - return; // skip any non-data block - } - // do data block encoding, if data block encoder is set - ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader, - HEADER_SIZE, uncompressedBytesWithHeader.length - - HEADER_SIZE).slice(); - Pair encodingResult = - dataBlockEncoder.beforeWriteToDisk(rawKeyValues, - includesMemstoreTS, DUMMY_HEADER); - - BlockType encodedBlockType = encodingResult.getSecond(); - if (encodedBlockType == BlockType.ENCODED_DATA) { - uncompressedBytesWithHeader = encodingResult.getFirst().array(); - blockType = BlockType.ENCODED_DATA; - } else { - // There is no encoding configured. Do some extra sanity-checking. - if (encodedBlockType != BlockType.DATA) { - throw new IOException("Unexpected block type coming out of data " + - "block encoder: " + encodedBlockType); - } - if (userDataStream.size() != - uncompressedBytesWithHeader.length - HEADER_SIZE) { - throw new IOException("Uncompressed size mismatch: " - + userDataStream.size() + " vs. " - + (uncompressedBytesWithHeader.length - HEADER_SIZE)); - } - } + ByteBuffer rawKeyValues = + ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE, + uncompressedBytesWithHeader.length - HEADER_SIZE).slice(); + + //do the encoding + dataBlockEncoder.beforeWriteToDisk(rawKeyValues, + includesMemstoreTS, dataBlockEncodingCtx, blockType); + + uncompressedBytesWithHeader = + dataBlockEncodingCtx.getUncompressedBytesWithHeader(); + onDiskBytesWithHeader = + dataBlockEncodingCtx.getOnDiskBytesWithHeader(); + blockType = dataBlockEncodingCtx.getBlockType(); } /** @@ -802,5 +757,6 @@ public class TestHFileBlockCompatibility { getOnDiskSizeWithoutHeader()); } } + } diff --git src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index 613ad7d..24974e5 100644 --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -30,11 +30,12 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.io.encoding.RedundantKVGenerator; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.ChecksumType; -import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -117,18 +118,23 @@ public class TestHFileDataBlockEncoder { /** * Test writing to disk. + * @throws IOException */ @Test - public void testEncodingWritePath() { + public void testEncodingWritePath() throws IOException { // usually we have just block without headers, but don't complicate that HFileBlock block = getSampleHFileBlock(); - Pair result = - blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(), - includesMemstoreTS, HFileBlock.DUMMY_HEADER); - - int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE; - HFileBlock blockOnDisk = new HFileBlock(result.getSecond(), - size, size, -1, result.getFirst(), HFileBlock.FILL_HEADER, 0, + HFileBlockEncodingContext context = + new HFileBlockDefaultEncodingContext( + Compression.Algorithm.NONE, blockEncoder.getEncodingOnDisk()); + blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(), + includesMemstoreTS, context, block.getBlockType()); + + byte[] encodedBytes = context.getUncompressedBytesWithHeader(); + int size = encodedBytes.length - HFileBlock.HEADER_SIZE; + HFileBlock blockOnDisk = + new HFileBlock(context.getBlockType(), size, size, -1, + ByteBuffer.wrap(encodedBytes), HFileBlock.FILL_HEADER, 0, includesMemstoreTS, block.getMinorVersion(), block.getBytesPerChecksum(), block.getChecksumType(), block.getOnDiskDataSizeWithHeader()); diff --git src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java index effa74c..470dd70 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java @@ -115,11 +115,10 @@ public class DataBlockEncodingTool { byte[] previousKey = null; byte[] currentKey; - List dataBlockEncoders = - DataBlockEncoding.getAllEncoders(); - - for (DataBlockEncoder d : dataBlockEncoders) { - codecs.add(new EncodedDataBlock(d, includesMemstoreTS)); + DataBlockEncoding[] encodings = DataBlockEncoding.values(); + for(DataBlockEncoding encoding : encodings) { + DataBlockEncoder d = encoding.getEncoder(); + codecs.add(new EncodedDataBlock(d, includesMemstoreTS, encoding)); } int j = 0; @@ -280,7 +279,7 @@ public class DataBlockEncodingTool { List compressDurations = new ArrayList(); for (int itTime = 0; itTime < BENCHMARK_N_TIMES; ++itTime) { final long startTime = System.nanoTime(); - codec.doCompressData(); + codec.encodeData(); final long finishTime = System.nanoTime(); if (itTime >= BENCHMARK_N_OMIT) { compressDurations.add(finishTime - startTime); -- 1.7.4.4