Index: src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java (revision 1296006) +++ src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java (working copy) @@ -115,11 +115,10 @@ byte[] previousKey = null; byte[] currentKey; - List dataBlockEncoders = - DataBlockEncoding.getAllEncoders(); - - for (DataBlockEncoder d : dataBlockEncoders) { - codecs.add(new EncodedDataBlock(d, includesMemstoreTS)); + DataBlockEncoding[] encodings = DataBlockEncoding.values(); + for(DataBlockEncoding encoding : encodings) { + DataBlockEncoder d = encoding.getEncoder(); + codecs.add(new EncodedDataBlock(d, includesMemstoreTS, encoding)); } int j = 0; Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (revision 1296006) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (working copy) @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.*; @@ -200,7 +199,7 @@ writeTestBlockContents(dos); byte[] headerAndData = hbw.getHeaderAndData(); assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); - hbw.releaseCompressor(); + hbw.release(); return headerAndData; } Index: src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java (revision 1296006) +++ src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java (working copy) @@ -30,10 +30,11 @@ import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.io.encoding.RedundantKVGenerator; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; -import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -116,18 +117,22 @@ /** * Test writing to disk. + * @throws IOException */ @Test - public void testEncodingWritePath() { + public void testEncodingWritePath() throws IOException { // usually we have just block without headers, but don't complicate that HFileBlock block = getSampleHFileBlock(); - Pair result = - blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(), - includesMemstoreTS); + HFileBlockEncodingContext context = + new HFileBlockDefaultEncodingContext(Compression.Algorithm.NONE); + blockEncoder.beforeWriteToDisk(block.getBufferWithoutHeader(), + includesMemstoreTS, context, block.getBlockType()); - int size = result.getFirst().limit() - HFileBlock.HEADER_SIZE; - HFileBlock blockOnDisk = new HFileBlock(result.getSecond(), - size, size, -1, result.getFirst(), HFileBlock.FILL_HEADER, 0, + byte[] encodedBytes = context.getUncompressedBytesWithHeader(); + int size = encodedBytes.length - HFileBlock.HEADER_SIZE; + HFileBlock blockOnDisk = + new HFileBlock(context.getBlockType(), size, size, -1, + ByteBuffer.wrap(encodedBytes), HFileBlock.FILL_HEADER, 0, includesMemstoreTS); if (blockEncoder.getEncodingOnDisk() != Index: src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java (revision 0) @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; + +/** + * A default implementation of {@link HFileBlockEncodingContext}. It will + * compress the data section as a whole bytes buffer. + * + * @see HFileBlockDefaultDecodingContext for the decompression part + * + */ +public class HFileBlockDefaultEncodingContext implements + HFileBlockEncodingContext { + + private byte[] onDiskBytesWithHeader; + private byte[] uncompressedBytesWithHeader; + private BlockType blockType; + + /** Compressor, which is also reused between consecutive blocks. */ + private Compressor compressor; + + /** Compression output stream */ + private CompressionOutputStream compressionStream; + + /** Underlying stream to write compressed bytes to */ + private ByteArrayOutputStream compressedByteStream; + + /** Compression algorithm for all blocks this instance writes. */ + private final Compression.Algorithm compressionAlgorithm; + + ByteArrayOutputStream encodedStream = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(encodedStream); + + public HFileBlockDefaultEncodingContext( + Compression.Algorithm compressionAlgorithm) { + this.compressionAlgorithm = + compressionAlgorithm == null ? NONE : compressionAlgorithm; + if (this.compressionAlgorithm != NONE) { + compressor = compressionAlgorithm.getCompressor(); + compressedByteStream = new ByteArrayOutputStream(); + try { + compressionStream = + compressionAlgorithm.createPlainCompressionStream( + compressedByteStream, compressor); + } catch (IOException e) { + throw new RuntimeException( + "Could not create compression stream " + "for algorithm " + + compressionAlgorithm, e); + } + } + } + + @Override + public void prepareEncoding(BlockType blockType) + throws IOException { + encodedStream.reset(); + this.blockType = blockType; + } + + @Override + public void postEncoding(DataBlockEncoding algo, BlockType blockType) + throws IOException { + encodedStream.write(HFileBlock.DUMMY_HEADER); + algo.writeIdInBytes(dataOut); + postEncoding(encodedStream.toByteArray(), blockType); + } + + /** + * @param uncompressedBytesWithHeader + * @param blockType + * @throws IOException + */ + public void postEncoding(byte[] uncompressedBytesWithHeader, + BlockType blockType) throws IOException { + this.uncompressedBytesWithHeader = uncompressedBytesWithHeader; + if (compressionAlgorithm != NONE) { + compressedByteStream.reset(); + compressedByteStream.write(HFileBlock.DUMMY_HEADER); + compressionStream.resetState(); + compressionStream.write(uncompressedBytesWithHeader, + HFileBlock.HEADER_SIZE, uncompressedBytesWithHeader.length + - HFileBlock.HEADER_SIZE); + + compressionStream.flush(); + compressionStream.finish(); + onDiskBytesWithHeader = compressedByteStream.toByteArray(); + } else { + onDiskBytesWithHeader = uncompressedBytesWithHeader; + } + this.blockType = blockType; + } + + public byte[] getOnDiskBytesWithHeader() { + return onDiskBytesWithHeader; + } + + public byte[] getUncompressedBytesWithHeader() { + return uncompressedBytesWithHeader; + } + + public BlockType getBlockType() { + return blockType; + } + + public void setBlockType(BlockType blockType) { + this.blockType = blockType; + } + + /** + * Releases the compressor this writer uses to compress blocks into the + * compressor pool. + */ + public void close() { + if (compressor != null) { + compressionAlgorithm.returnCompressor(compressor); + compressor = null; + } + } + + @Override + public Algorithm getCompression() { + return this.compressionAlgorithm; + } + + public DataOutputStream getOutputStreamForEncoder() { + return this.dataOut; + } + +} Index: src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.io.RawComparator; /** @@ -37,6 +38,7 @@ */ @InterfaceAudience.Private public interface DataBlockEncoder { + /** * Compress KeyValues and write them to output buffer. * @param out Where to write compressed data. @@ -49,6 +51,20 @@ ByteBuffer in, boolean includesMemstoreTS) throws IOException; /** + * Compress KeyValues. + * @param in Source of KeyValue for compression. + * @param includesMemstoreTS true if including memstore timestamp after every + * key-value pair + * @param postEncoding + * @param algo + * @throws IOException If there is an error writing to output stream. + */ + public void compressKeyValues( + ByteBuffer in, boolean includesMemstoreTS, + HFileBlockEncodingContext postEncoding, + DataBlockEncoding algo) throws IOException; + + /** * Uncompress. * @param source Compressed stream of KeyValues. * @param includesMemstoreTS true if including memstore timestamp after every @@ -159,4 +175,10 @@ public int seekToKeyInBlock(byte[] key, int offset, int length, boolean seekBefore); } + + public HFileBlockEncodingContext newDataBlockEncodingContext( + Algorithm compressionAlgorithm); + + public HFileBlockDecodingContext newDataBlockDecodingContext( + Algorithm compressionAlgorithm); } Index: src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java (revision 0) @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; + +/** + * A decoding context that is created by one reader's encoder, and is shared + * across the reader's all read operations. + * + * @see HFileBlockEncodingContext for encoding + */ +public interface HFileBlockDecodingContext { + + /** + * @return the compression algorithm used by this decoding context + */ + public Compression.Algorithm getCompression(); + + /** + * Perform all actions that need to done before the encoder's real decoding + * process. Decompression needs to be done if {@link #getCompression()} + * returns a valid compression algorithm. + * + * @param block + * @param onDiskBlock + * @param offset + * @throws IOException + */ + public void prepareDecoding(HFileBlock block, byte[] onDiskBlock, + int offset) throws IOException; + +} Index: src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (working copy) @@ -19,7 +19,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; @@ -27,6 +26,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.io.compress.Compressor; /** @@ -40,17 +40,23 @@ ByteArrayOutputStream uncompressedOutputStream; ByteBuffer uncompressedBuffer; private byte[] cacheCompressData; - private ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); private boolean includesMemstoreTS; + + private final HFileBlockEncodingContext postEncoding; + private final DataBlockEncoding encoding; /** * Create a buffer which will be encoded using dataBlockEncoder. * @param dataBlockEncoder Algorithm used for compression. + * @param encoding */ public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, - boolean includesMemstoreTS) { + boolean includesMemstoreTS, DataBlockEncoding encoding) { this.dataBlockEncoder = dataBlockEncoder; uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE); + Compression.Algorithm nonCompression = Compression.Algorithm.NONE; + postEncoding = dataBlockEncoder.newDataBlockEncodingContext(nonCompression); + this.encoding = encoding; } /** @@ -194,18 +200,17 @@ * @return Compressed byte buffer. */ public byte[] doCompressData() { - compressedStream.reset(); - DataOutputStream dataOut = new DataOutputStream(compressedStream); try { this.dataBlockEncoder.compressKeyValues( - dataOut, getUncompressedBuffer(), includesMemstoreTS); + getUncompressedBuffer(), includesMemstoreTS, postEncoding, + encoding); } catch (IOException e) { throw new RuntimeException(String.format( "Bug in decoding part of algorithm %s. " + "Probably it requested more bytes than are available.", toString()), e); } - return compressedStream.toByteArray(); + return postEncoding.getUncompressedBytesWithHeader(); } @Override Index: src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (working copy) @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.RawComparator; @@ -301,4 +303,35 @@ } } + @Override + public HFileBlockEncodingContext newDataBlockEncodingContext( + Algorithm compressionAlgorithm) { + return new HFileBlockDefaultEncodingContext(compressionAlgorithm); + } + + @Override + public HFileBlockDecodingContext newDataBlockDecodingContext( + Algorithm compressionAlgorithm) { + return new HFileBlockDefaultDecodingContext(compressionAlgorithm); + } + + @Override + public void compressKeyValues(ByteBuffer in, + boolean includesMemstoreTS, + HFileBlockEncodingContext encodingCxt, + DataBlockEncoding algo) throws IOException { + if (!(encodingCxt instanceof HFileBlockDefaultEncodingContext)) { + throw new IOException (this.getClass().getName() + " only accepts " + + HFileBlockDefaultEncodingContext.class.getName() + " as the " + + "encoding context."); + } + + DataOutputStream dataOut = + ((HFileBlockDefaultEncodingContext) encodingCxt) + .getOutputStreamForEncoder(); + encodingCxt.prepareEncoding(BlockType.DATA); + compressKeyValues(dataOut, in, includesMemstoreTS); + encodingCxt.postEncoding(algo, BlockType.ENCODED_DATA); + } + } Index: src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (working copy) @@ -94,4 +94,5 @@ } }; } + } Index: src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java (revision 0) @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; + +/** + * A default implementation of {@link HFileBlockDecodingContext}. It assumes the + * block data section is compressed as a whole. + * + * @see HFileBlockDefaultEncodingContext for the default compression context + * + */ +public class HFileBlockDefaultDecodingContext implements + HFileBlockDecodingContext { + + private Compression.Algorithm compressAlgo; + + public HFileBlockDefaultDecodingContext( + Compression.Algorithm compressAlgo) { + this.compressAlgo = compressAlgo; + } + + @Override + public void prepareDecoding(HFileBlock block, + byte[] onDiskBlock, int offset) throws IOException { + DataInputStream dis = + new DataInputStream(new ByteArrayInputStream( + onDiskBlock, offset, + block.getOnDiskSizeWithoutHeader())); + + ByteBuffer buffer = block.getBufferReadOnly(); + Compression.decompress(buffer.array(), buffer.arrayOffset() + + HFileBlock.HEADER_SIZE, (InputStream) dis, block + .getOnDiskSizeWithoutHeader(), block + .getUncompressedSizeWithoutHeader(), compressAlgo); + } + + @Override + public Algorithm getCompression() { + return compressAlgo; + } + +} Index: src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (working copy) @@ -104,6 +104,16 @@ } /** + * Writes id bytes to the given array starting from offset. + * + * @param stream + * where the id should be written. + */ + public void writeIdInBytes(byte[] dest, int offset) throws IOException { + System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE); + } + + /** * Return new data block encoder for given algorithm type. * @return data block encoder if algorithm is specified, null if none is * selected. Index: src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java (revision 0) @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.Compression; + +/** + * An encoding context that is created by one writer's encoder, and is shared + * across the write's whole lifetime. + * + * @see HFileBlockDecodingContext for decoding + * + */ +public interface HFileBlockEncodingContext { + + /** + * @return on disk bytes which are ready to write out after encoding + */ + public byte[] getOnDiskBytesWithHeader(); + + /** + * @return uncompressed bytes with header which can be cached after encoding + */ + public byte[] getUncompressedBytesWithHeader(); + + /** + * @return the block type after encoding + */ + public BlockType getBlockType(); + + /** + * @return the compression algorithm used by this encoding context + */ + public Compression.Algorithm getCompression(); + + /** + * Prepare an encoding + * + * @param blockType the block type to be encoded + * @throws IOException + */ + public void prepareEncoding(BlockType blockType) throws IOException; + + /** + * Do any action that need to be performed after the encoding. + * Compression is also included if {@link #getCompression()} returns a valid + * compression algorithm + * + * @param algo + * @param blockType + * @throws IOException + */ + public void postEncoding(DataBlockEncoding algo, BlockType blockType) + throws IOException; + + /** + * Releases the resources used. + */ + public void close(); + +} Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH; -import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; @@ -34,19 +33,19 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.regionserver.MemStore; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompoundBloomFilter; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; import com.google.common.base.Preconditions; @@ -532,11 +531,12 @@ /** Writer state. Used to ensure the correct usage protocol. */ private State state = State.INIT; - /** Compression algorithm for all blocks this instance writes. */ - private final Compression.Algorithm compressAlgo; - /** Data block encoder used for data blocks */ private final HFileDataBlockEncoder dataBlockEncoder; + + private final HFileBlockEncodingContext dataBlockEncodingCtx; + + private final HFileBlockDefaultEncodingContext nonDataBlockEncodingCtx; /** * The stream we use to accumulate data in uncompressed format for each @@ -546,15 +546,6 @@ */ private ByteArrayOutputStream baosInMemory; - /** Compressor, which is also reused between consecutive blocks. */ - private Compressor compressor; - - /** Compression output stream */ - private CompressionOutputStream compressionStream; - - /** Underlying stream to write compressed bytes to */ - private ByteArrayOutputStream compressedByteStream; - /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} @@ -605,23 +596,15 @@ */ public Writer(Compression.Algorithm compressionAlgorithm, HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS) { - compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm; this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; - + dataBlockEncodingCtx = + dataBlockEncoder.newDataBlockEncodingContext(compressionAlgorithm); + + this.nonDataBlockEncodingCtx = + new HFileBlockDefaultEncodingContext(compressionAlgorithm); + baosInMemory = new ByteArrayOutputStream(); - if (compressAlgo != NONE) { - compressor = compressionAlgorithm.getCompressor(); - compressedByteStream = new ByteArrayOutputStream(); - try { - compressionStream = - compressionAlgorithm.createPlainCompressionStream( - compressedByteStream, compressor); - } catch (IOException e) { - throw new RuntimeException("Could not create compression stream " + - "for algorithm " + compressionAlgorithm, e); - } - } prevOffsetByType = new long[BlockType.values().length]; for (int i = 0; i < prevOffsetByType.length; ++i) @@ -692,7 +675,6 @@ */ private void finishBlock() throws IOException { userDataStream.flush(); - // This does an array copy, so it is safe to cache this byte array. uncompressedBytesWithHeader = baosInMemory.toByteArray(); prevOffset = prevOffsetByType[blockType.getId()]; @@ -701,37 +683,20 @@ // cache-on-write. In a way, the block is ready, but not yet encoded or // compressed. state = State.BLOCK_READY; - encodeDataBlockForDisk(); - - doCompression(); - putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length, - uncompressedBytesWithHeader.length); - } - - /** - * Do compression if it is enabled, or re-use the uncompressed buffer if - * it is not. Fills in the compressed block's header if doing compression. - */ - private void doCompression() throws IOException { - // do the compression - if (compressAlgo != NONE) { - compressedByteStream.reset(); - compressedByteStream.write(DUMMY_HEADER); - - compressionStream.resetState(); - - compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE, - uncompressedBytesWithHeader.length - HEADER_SIZE); - - compressionStream.flush(); - compressionStream.finish(); - - onDiskBytesWithHeader = compressedByteStream.toByteArray(); + if (blockType == BlockType.DATA) { + encodeDataBlockForDisk(); + } else { + nonDataBlockEncodingCtx.prepareEncoding(blockType); + nonDataBlockEncodingCtx.postEncoding(uncompressedBytesWithHeader, + blockType); + onDiskBytesWithHeader = + dataBlockEncodingCtx.getOnDiskBytesWithHeader(); putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, uncompressedBytesWithHeader.length); - } else { - onDiskBytesWithHeader = uncompressedBytesWithHeader; } + putHeader(uncompressedBytesWithHeader, 0, + onDiskBytesWithHeader.length, + uncompressedBytesWithHeader.length); } /** @@ -739,22 +704,25 @@ * {@link #dataBlockEncoder}. */ private void encodeDataBlockForDisk() throws IOException { - if (blockType != BlockType.DATA) { - return; // skip any non-data block - } - // do data block encoding, if data block encoder is set ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE, uncompressedBytesWithHeader.length - HEADER_SIZE).slice(); - Pair encodingResult = - dataBlockEncoder.beforeWriteToDisk(rawKeyValues, - includesMemstoreTS); - BlockType encodedBlockType = encodingResult.getSecond(); + dataBlockEncoder.beforeWriteToDisk(rawKeyValues, + includesMemstoreTS, dataBlockEncodingCtx, + blockType); + + BlockType encodedBlockType = dataBlockEncodingCtx.getBlockType(); if (encodedBlockType == BlockType.ENCODED_DATA) { - uncompressedBytesWithHeader = encodingResult.getFirst().array(); + uncompressedBytesWithHeader = + dataBlockEncodingCtx.getUncompressedBytesWithHeader(); + onDiskBytesWithHeader = + dataBlockEncodingCtx.getOnDiskBytesWithHeader(); blockType = BlockType.ENCODED_DATA; + + putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, + uncompressedBytesWithHeader.length); } else { // There is no encoding configured. Do some extra sanity-checking. if (encodedBlockType != BlockType.DATA) { @@ -767,6 +735,8 @@ + userDataStream.size() + " vs. " + (uncompressedBytesWithHeader.length - HEADER_SIZE)); } + + onDiskBytesWithHeader = uncompressedBytesWithHeader; } } @@ -833,13 +803,11 @@ } /** - * Releases the compressor this writer uses to compress blocks into the - * compressor pool. Needs to be called before the writer is discarded. + * Releases resources used by this writer. */ - public void releaseCompressor() { - if (compressor != null) { - compressAlgo.returnCompressor(compressor); - compressor = null; + public void release() { + if (dataBlockEncodingCtx != null) { + dataBlockEncodingCtx.close(); } } @@ -1140,40 +1108,8 @@ return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + HEADER_SIZE; } - + /** - * Decompresses data from the given stream using the configured compression - * algorithm. - * @param dest - * @param destOffset - * @param bufferedBoundedStream - * a stream to read compressed data from, bounded to the exact - * amount of compressed data - * @param compressedSize - * compressed data size, header not included - * @param uncompressedSize - * uncompressed data size, header not included - * @throws IOException - */ - protected void decompress(byte[] dest, int destOffset, - InputStream bufferedBoundedStream, int compressedSize, - int uncompressedSize) throws IOException { - Decompressor decompressor = null; - try { - decompressor = compressAlgo.getDecompressor(); - InputStream is = compressAlgo.createDecompressionStream( - bufferedBoundedStream, decompressor, 0); - - IOUtils.readFully(is, dest, destOffset, uncompressedSize); - is.close(); - } finally { - if (decompressor != null) { - compressAlgo.returnDecompressor(decompressor); - } - } - } - - /** * Creates a buffered stream reading a certain slice of the file system * input stream. We need this because the decompression we use seems to * expect the input stream to be bounded. @@ -1271,8 +1207,9 @@ } else { InputStream bufferedBoundedStream = createBufferedBoundedStream( offset, onDiskSize, pread); - decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA, - bufferedBoundedStream, onDiskSize, uncompressedSizeWithMagic); + Compression.decompress(buf.array(), buf.arrayOffset() + + HEADER_DELTA, bufferedBoundedStream, onDiskSize, + uncompressedSizeWithMagic, this.compressAlgo); // We don't really have a good way to exclude the "magic record" size // from the compressed block's size, since it is compressed as well. @@ -1312,6 +1249,9 @@ protected HFileDataBlockEncoder dataBlockEncoder = NoOpDataBlockEncoder.INSTANCE; + HFileBlockDecodingContext encodedBlockDecodingCxt; + HFileBlockDefaultDecodingContext defaultDecodingCxt; + private ThreadLocal prefetchedHeaderForThread = new ThreadLocal() { @Override @@ -1323,6 +1263,8 @@ public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo, long fileSize) { super(istream, compressAlgo, fileSize); + defaultDecodingCxt = + new HFileBlockDefaultDecodingContext(compressAlgo); } /** @@ -1358,187 +1300,130 @@ } int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL; + + PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); + ByteBuffer headerBuf = prefetchedHeader.offset == offset ? + prefetchedHeader.buf : null; - HFileBlock b; - if (onDiskSizeWithHeader > 0) { - // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. This code path is used when - // doing a random read operation relying on the block index, as well as - // when the client knows the on-disk size from peeking into the next - // block's header (e.g. this block's header) when reading the previous - // block. This is the faster and more preferable case. - - int onDiskSizeWithoutHeader = onDiskSizeWithHeader - HEADER_SIZE; - assert onDiskSizeWithoutHeader >= 0; - - // See if we can avoid reading the header. This is desirable, because - // we will not incur a seek operation to seek back if we have already - // read this block's header as part of the previous read's look-ahead. - PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - byte[] header = prefetchedHeader.offset == offset - ? prefetchedHeader.header : null; - - // Size that we have to skip in case we have already read the header. - int preReadHeaderSize = header == null ? 0 : HEADER_SIZE; - - if (compressAlgo == Compression.Algorithm.NONE) { - // Just read the whole thing. Allocate enough space to read the - // next block's header too. - - ByteBuffer headerAndData = ByteBuffer.allocate(onDiskSizeWithHeader - + HEADER_SIZE); - headerAndData.limit(onDiskSizeWithHeader); - - if (header != null) { - System.arraycopy(header, 0, headerAndData.array(), 0, - HEADER_SIZE); - } - - int nextBlockOnDiskSizeWithHeader = readAtOffset( - headerAndData.array(), headerAndData.arrayOffset() - + preReadHeaderSize, onDiskSizeWithHeader - - preReadHeaderSize, true, offset + preReadHeaderSize, - pread); - - b = new HFileBlock(headerAndData); - b.assumeUncompressed(); - b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); - b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSizeWithHeader; - - if (b.nextBlockOnDiskSizeWithHeader > 0) - setNextBlockHeader(offset, b); - } else { - // Allocate enough space to fit the next block's header too. - byte[] onDiskBlock = new byte[onDiskSizeWithHeader + HEADER_SIZE]; - - int nextBlockOnDiskSize = readAtOffset(onDiskBlock, - preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, - true, offset + preReadHeaderSize, pread); - - if (header == null) - header = onDiskBlock; - - try { - b = new HFileBlock(ByteBuffer.wrap(header, 0, HEADER_SIZE)); - } catch (IOException ex) { - // Seen in load testing. Provide comprehensive debug info. - throw new IOException("Failed to read compressed block at " - + offset + ", onDiskSizeWithoutHeader=" + onDiskSizeWithHeader - + ", preReadHeaderSize=" + preReadHeaderSize - + ", header.length=" + header.length + ", header bytes: " - + Bytes.toStringBinary(header, 0, HEADER_SIZE), ex); - } - b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); - b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; - - DataInputStream dis = new DataInputStream(new ByteArrayInputStream( - onDiskBlock, HEADER_SIZE, onDiskSizeWithoutHeader)); - - // This will allocate a new buffer but keep header bytes. - b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0); - - decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis, - onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader); - - // Copy next block's header bytes into the new block if we have them. - if (nextBlockOnDiskSize > 0) { - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(), - b.buf.arrayOffset() + HEADER_SIZE - + b.uncompressedSizeWithoutHeader, HEADER_SIZE); - - setNextBlockHeader(offset, b); - } - } - - } else { + HFileBlock b = null; + if (headerBuf == null) { // We don't know the on-disk size. Read the header first, determine the // on-disk size from it, and read the remaining data, thereby incurring // two read operations. This might happen when we are doing the first // read in a series of reads or a random read, and we don't have access // to the block index. This is costly and should happen very rarely. - // Check if we have read this block's header as part of reading the - // previous block. If so, don't read the header again. - PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - ByteBuffer headerBuf = prefetchedHeader.offset == offset ? - prefetchedHeader.buf : null; + // Check headerBuf to see if we have read this block's header as part of + // reading the previous block. If so, don't read the header again. + // Unfortunately, we still have to do a separate read operation to + // read the header. + headerBuf = ByteBuffer.allocate(HEADER_SIZE); + readAtOffset(headerBuf.array(), headerBuf.arrayOffset(), + HEADER_SIZE, false, offset, pread); + } + + // We know the total on-disk size but not the uncompressed size. Read + // the entire block into memory, then parse the header and decompress + // from memory if using compression. Here we already read the block's + // header + int onDiskSizeWithoutHeader = + onDiskSizeWithHeader - HEADER_SIZE; + assert onDiskSizeWithoutHeader >= 0; - if (headerBuf == null) { - // Unfortunately, we still have to do a separate read operation to - // read the header. - headerBuf = ByteBuffer.allocate(HEADER_SIZE);; - readAtOffset(headerBuf.array(), headerBuf.arrayOffset(), HEADER_SIZE, - false, offset, pread); - } - + // See if we can avoid reading the header. This is desirable, because + // we will not incur a seek operation to seek back if we have already + // read this block's header as part of the previous read's look-ahead. + try { b = new HFileBlock(headerBuf); + } catch (IOException ex) { + // Seen in load testing. Provide comprehensive debug info. + throw new IOException("Failed to read compressed block at " + + offset + + ", onDiskSizeWithoutHeader=" + + onDiskSizeWithHeader + + ", preReadHeaderSize=" + + HEADER_SIZE + + ", header.length=" + + prefetchedHeader.header.length + + ", header bytes: " + + Bytes.toStringBinary(prefetchedHeader.header, 0, + HEADER_SIZE), ex); + } + onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + HEADER_SIZE; - // This will also allocate enough room for the next block's header. - b.allocateBuffer(true); + boolean isCompressed = + compressAlgo != null + && compressAlgo != Compression.Algorithm.NONE; + if(!isCompressed) { + b.assumeUncompressed(); + } - if (compressAlgo == Compression.Algorithm.NONE) { + // Allocate enough space to fit the next block's header too. + // If the data is compressed, the onDiskBlock will be discarded + // after decompressing the data, so no need to copy the current + // block's header to it. But if the data is uncompressed, + // the onDisk will become the backing buffer for the block, + // so need to fill in the header to it. + byte[] onDiskBlock = + new byte[onDiskSizeWithHeader + HEADER_SIZE]; + + // read the data, and leave the header section (the beginning HEAD_SIZE + // bytes) uninitialized. + int nextBlockOnDiskSize = + readAtOffset(onDiskBlock, HEADER_SIZE, onDiskSizeWithHeader + - HEADER_SIZE, true, offset + HEADER_SIZE, pread); + + b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); + b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; - // Avoid creating bounded streams and using a "codec" that does - // nothing. - b.assumeUncompressed(); - b.nextBlockOnDiskSizeWithHeader = readAtOffset(b.buf.array(), - b.buf.arrayOffset() + HEADER_SIZE, - b.uncompressedSizeWithoutHeader, true, offset + HEADER_SIZE, - pread); - - if (b.nextBlockOnDiskSizeWithHeader > 0) { - setNextBlockHeader(offset, b); - } + if (isCompressed) { + // This will allocate a new buffer but keep header bytes. + b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0); + if(b.blockType.equals(BlockType.ENCODED_DATA)) { + encodedBlockDecodingCxt.prepareDecoding(b, onDiskBlock, HEADER_SIZE); } else { - // Allocate enough space for the block's header and compressed data. - byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader() - + HEADER_SIZE]; - - b.nextBlockOnDiskSizeWithHeader = readAtOffset(compressedBytes, - HEADER_SIZE, b.onDiskSizeWithoutHeader, true, offset - + HEADER_SIZE, pread); - DataInputStream dis = new DataInputStream(new ByteArrayInputStream( - compressedBytes, HEADER_SIZE, b.onDiskSizeWithoutHeader)); - - decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis, - b.onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader); - + defaultDecodingCxt.prepareDecoding(b, onDiskBlock, HEADER_SIZE); + } + if (b.nextBlockOnDiskSizeWithHeader > 0) { + // Copy next block's header bytes into the new block if we have them. if (b.nextBlockOnDiskSizeWithHeader > 0) { - // Copy the next block's header into the new block. - int nextHeaderOffset = b.buf.arrayOffset() + HEADER_SIZE - + b.uncompressedSizeWithoutHeader; - System.arraycopy(compressedBytes, - compressedBytes.length - HEADER_SIZE, - b.buf.array(), - nextHeaderOffset, - HEADER_SIZE); - - setNextBlockHeader(offset, b); + System.arraycopy(onDiskBlock, onDiskSizeWithHeader, + b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE + + b.uncompressedSizeWithoutHeader, HEADER_SIZE); } } + } else { + //copy the current block's header to onDiskBlock as the header has + //already been read + System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), + onDiskBlock, 0, HEADER_SIZE); + // the onDiskBlock will become the headerAndDataBuffer for this block. + // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already + // contains the header of next block, so no need to set next + // block's header to it + b = new HFileBlock(ByteBuffer.wrap(onDiskBlock)); } + //set prefetched header + if (b.nextBlockOnDiskSizeWithHeader > 0) { + prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); + System.arraycopy(onDiskBlock, onDiskSizeWithHeader, + prefetchedHeader.header, 0, HFileBlock.HEADER_SIZE); + } + b.includesMemstoreTS = includesMemstoreTS; b.offset = offset; return b; } + - private void setNextBlockHeader(long offset, HFileBlock b) { - PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); - int nextHeaderOffset = b.buf.arrayOffset() + HEADER_SIZE - + b.uncompressedSizeWithoutHeader; - System.arraycopy(b.buf.array(), nextHeaderOffset, - prefetchedHeader.header, 0, HEADER_SIZE); - } - void setIncludesMemstoreTS(boolean enabled) { includesMemstoreTS = enabled; } void setDataBlockEncoder(HFileDataBlockEncoder encoder) { this.dataBlockEncoder = encoder; + encodedBlockDecodingCxt = encoder.newDataBlockDecodingContext(this.compressAlgo); } } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (working copy) @@ -16,18 +16,18 @@ */ package org.apache.hadoop.hbase.io.hfile; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import com.google.common.base.Preconditions; @@ -39,11 +39,11 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { private final DataBlockEncoding onDisk; private final DataBlockEncoding inCache; - + public HFileDataBlockEncoderImpl(DataBlockEncoding encoding) { this(encoding, encoding); } - + /** * Do data block encoding with specified options. * @param onDisk What kind of data block encoding will be used before writing @@ -151,19 +151,20 @@ /** * Precondition: a non-encoded buffer. * Postcondition: on-disk encoding. + * @throws IOException */ @Override - public Pair beforeWriteToDisk(ByteBuffer in, - boolean includesMemstoreTS) { + public void beforeWriteToDisk(ByteBuffer in, + boolean includesMemstoreTS, + HFileBlockEncodingContext postEncoding, + BlockType blockType) throws IOException { + postEncoding.prepareEncoding(blockType); if (onDisk == DataBlockEncoding.NONE) { // there is no need to encode the block before writing it to disk - return new Pair(in, BlockType.DATA); + return; } - - ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in, - onDisk, includesMemstoreTS); - return new Pair(encodedBuffer, - BlockType.ENCODED_DATA); + encodeBufferToHFileBlockBuffer(in, onDisk, + includesMemstoreTS, postEncoding); } @Override @@ -174,32 +175,34 @@ return inCache != DataBlockEncoding.NONE; } - private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in, - DataBlockEncoding algo, boolean includesMemstoreTS) { - ByteArrayOutputStream encodedStream = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(encodedStream); + private void encodeBufferToHFileBlockBuffer(ByteBuffer in, + DataBlockEncoding algo, boolean includesMemstoreTS, + HFileBlockEncodingContext encodeCxt) { DataBlockEncoder encoder = algo.getEncoder(); try { - encodedStream.write(HFileBlock.DUMMY_HEADER); - algo.writeIdInBytes(dataOut); - encoder.compressKeyValues(dataOut, in, - includesMemstoreTS); + encoder.compressKeyValues(in, includesMemstoreTS, encodeCxt, algo); } catch (IOException e) { - throw new RuntimeException(String.format("Bug in data block encoder " + - "'%s', it probably requested too much data", algo.toString()), e); + throw new RuntimeException(String.format( + "Bug in data block encoder " + + "'%s', it probably requested too much data", + algo.toString()), e); } - return ByteBuffer.wrap(encodedStream.toByteArray()); } private HFileBlock encodeDataBlock(HFileBlock block, DataBlockEncoding algo, boolean includesMemstoreTS) { - ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer( - block.getBufferWithoutHeader(), algo, includesMemstoreTS); - int sizeWithoutHeader = compressedBuffer.limit() - HFileBlock.HEADER_SIZE; + HFileBlockEncodingContext encodedCxt = + newDataBlockEncodingContext(Algorithm.NONE); + encodeBufferToHFileBlockBuffer( + block.getBufferWithoutHeader(), algo, includesMemstoreTS, encodedCxt); + byte[] encodedUncompressedBytes = + encodedCxt.getUncompressedBytesWithHeader(); + ByteBuffer bufferWrapper = ByteBuffer.wrap(encodedUncompressedBytes); + int sizeWithoutHeader = bufferWrapper.limit() - HFileBlock.HEADER_SIZE; HFileBlock encodedBlock = new HFileBlock(BlockType.ENCODED_DATA, block.getOnDiskSizeWithoutHeader(), sizeWithoutHeader, block.getPrevBlockOffset(), - compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(), + bufferWrapper, HFileBlock.FILL_HEADER, block.getOffset(), includesMemstoreTS); block.passSchemaMetricsTo(encodedBlock); return encodedBlock; @@ -211,4 +214,18 @@ inCache + ")"; } + @Override + public HFileBlockEncodingContext newDataBlockEncodingContext( + Algorithm compressionAlgorithm) { + DataBlockEncoder encoder = onDisk.getEncoder(); + return encoder.newDataBlockEncodingContext(compressionAlgorithm); + } + + @Override + public HFileBlockDecodingContext newDataBlockDecodingContext( + Algorithm compressionAlgorithm) { + DataBlockEncoder encoder = onDisk.getEncoder(); + return encoder.newDataBlockDecodingContext(compressionAlgorithm); + } + } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -422,7 +422,7 @@ finishClose(trailer); - fsBlockWriter.releaseCompressor(); + fsBlockWriter.release(); } @Override Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (working copy) @@ -21,8 +21,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.Pair; /** * Controls what kind of data block encoding is used. If data block encoding is @@ -31,6 +33,7 @@ */ @InterfaceAudience.Private public interface HFileDataBlockEncoder { + /** * Converts a block from the on-disk format to the in-cache format. Called in * the following cases: @@ -51,11 +54,14 @@ * Should be called before an encoded or unencoded data block is written to * disk. * @param in KeyValues next to each other - * @return a non-null on-heap buffer containing the contents of the - * HFileBlock with unfilled header and block type + * @param encodingResult the encoded result + * @param blockType block type + * @throws IOException */ - public Pair beforeWriteToDisk( - ByteBuffer in, boolean includesMemstoreTS); + public void beforeWriteToDisk( + ByteBuffer in, boolean includesMemstoreTS, + HFileBlockEncodingContext encodingResult, + BlockType blockType) throws IOException; /** * Decides whether we should use a scanner over encoded blocks. @@ -84,4 +90,25 @@ */ public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction); + /** + * create a encoder specific encoding context object for writing. And the + * encoding context should also perform compression if compressionAlgorithm is + * valid. + * + * @param compressionAlgorithm compression algorithm + * @return + */ + public HFileBlockEncodingContext newDataBlockEncodingContext( + Algorithm compressionAlgorithm); + + /** + * create a encoder specific decoding context for reading. And the decoding context + * should also do decompression if compressionAlgorithm is valid. + * + * @param compressionAlgorithm + * @return + */ + public HFileBlockDecodingContext newDataBlockDecodingContext( + Algorithm compressionAlgorithm); + } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; @@ -234,7 +235,7 @@ * Creates a compression stream without any additional wrapping into * buffering streams. */ - CompressionOutputStream createPlainCompressionStream( + public CompressionOutputStream createPlainCompressionStream( OutputStream downStream, Compressor compressor) throws IOException { CompressionCodec codec = getCodec(conf); ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); @@ -323,4 +324,41 @@ return ret; } + + + /** + * Decompresses data from the given stream using the configured compression + * algorithm. + * @param dest + * @param destOffset + * @param bufferedBoundedStream + * a stream to read compressed data from, bounded to the exact + * amount of compressed data + * @param compressedSize + * compressed data size, header not included + * @param uncompressedSize + * uncompressed data size, header not included + * @param compressAlgo + * compression algorithm used + * @throws IOException + */ + public static void decompress(byte[] dest, int destOffset, + InputStream bufferedBoundedStream, int compressedSize, + int uncompressedSize, Compression.Algorithm compressAlgo) + throws IOException { + Decompressor decompressor = null; + try { + decompressor = compressAlgo.getDecompressor(); + InputStream is = compressAlgo.createDecompressionStream( + bufferedBoundedStream, decompressor, 0); + + IOUtils.readFully(is, dest, destOffset, uncompressedSize); + is.close(); + } finally { + if (decompressor != null) { + compressAlgo.returnDecompressor(decompressor); + } + } + } + } Index: src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (revision 1296006) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (working copy) @@ -16,12 +16,17 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.Pair; /** * Does not perform any kind of encoding/decoding. @@ -45,9 +50,18 @@ } @Override - public Pair beforeWriteToDisk( - ByteBuffer in, boolean includesMemstoreTS) { - return new Pair(in, BlockType.DATA); + public void beforeWriteToDisk(ByteBuffer in, + boolean includesMemstoreTS, + HFileBlockEncodingContext encodingCxt, BlockType blockType) + throws IOException { + if (!(encodingCxt instanceof HFileBlockDefaultEncodingContext)) { + throw new IOException (this.getClass().getName() + " only accepts " + + HFileBlockDefaultEncodingContext.class.getName() + "."); + } + + HFileBlockDefaultEncodingContext defaultContext = + (HFileBlockDefaultEncodingContext) encodingCxt; + defaultContext.postEncoding(in.array(), blockType); } @Override @@ -79,4 +93,16 @@ return getClass().getSimpleName(); } + @Override + public HFileBlockEncodingContext newDataBlockEncodingContext( + Algorithm compressionAlgorithm) { + return new HFileBlockDefaultEncodingContext(compressionAlgorithm); + } + + @Override + public HFileBlockDecodingContext newDataBlockDecodingContext( + Algorithm compressionAlgorithm) { + return new HFileBlockDefaultDecodingContext(compressionAlgorithm); + } + }