Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (working copy)
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.encoding;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
-import org.apache.hadoop.io.RawComparator;
-
-/**
- * Encoding of KeyValue. It aims to be fast and efficient using assumptions:
- *
- * - the KeyValues are stored sorted by key
- * - we know the structure of KeyValue
- * - the values are always iterated forward from beginning of block
- * - knowledge of Key Value format
- *
- * It is designed to work fast enough to be feasible as in memory compression.
- *
- * After encoding, it also optionally compresses the encoded data if a
- * compression algorithm is specified in HFileBlockEncodingContext argument of
- * {@link #encodeKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}.
- */
-@InterfaceAudience.Private
-public interface DataBlockEncoder {
-
- /**
- * Encodes KeyValues. It will first encode key value pairs, and then
- * optionally do the compression for the encoded data.
- *
- * @param in
- * Source of KeyValue for compression.
- * @param includesMemstoreTS
- * true if including memstore timestamp after every key-value pair
- * @param encodingContext
- * the encoding context which will contain encoded uncompressed bytes
- * as well as compressed encoded bytes if compression is enabled, and
- * also it will reuse resources across multiple calls.
- * @throws IOException
- * If there is an error writing to output stream.
- */
- public void encodeKeyValues(
- ByteBuffer in, boolean includesMemstoreTS,
- HFileBlockEncodingContext encodingContext) throws IOException;
-
- /**
- * Decode.
- * @param source Compressed stream of KeyValues.
- * @param includesMemstoreTS true if including memstore timestamp after every
- * key-value pair
- * @return Uncompressed block of KeyValues.
- * @throws IOException If there is an error in source.
- */
- public ByteBuffer decodeKeyValues(DataInputStream source,
- boolean includesMemstoreTS) throws IOException;
-
- /**
- * Uncompress.
- * @param source encoded stream of KeyValues.
- * @param allocateHeaderLength allocate this many bytes for the header.
- * @param skipLastBytes Do not copy n last bytes.
- * @param includesMemstoreTS true if including memstore timestamp after every
- * key-value pair
- * @return Uncompressed block of KeyValues.
- * @throws IOException If there is an error in source.
- */
- public ByteBuffer decodeKeyValues(DataInputStream source,
- int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
- throws IOException;
-
- /**
- * Return first key in block. Useful for indexing. Typically does not make
- * a deep copy but returns a buffer wrapping a segment of the actual block's
- * byte array. This is because the first key in block is usually stored
- * unencoded.
- * @param block encoded block we want index, the position will not change
- * @return First key in block.
- */
- public ByteBuffer getFirstKeyInBlock(ByteBuffer block);
-
- /**
- * Create a HFileBlock seeker which find KeyValues within a block.
- * @param comparator what kind of comparison should be used
- * @param includesMemstoreTS true if including memstore timestamp after every
- * key-value pair
- * @return A newly created seeker.
- */
- public EncodedSeeker createSeeker(RawComparator comparator,
- boolean includesMemstoreTS);
-
- /**
- * Creates a encoder specific encoding context
- *
- * @param compressionAlgorithm
- * compression algorithm used if the final data needs to be
- * compressed
- * @param encoding
- * encoding strategy used
- * @param headerBytes
- * header bytes to be written, put a dummy header here if the header
- * is unknown
- * @return a newly created encoding context
- */
- public HFileBlockEncodingContext newDataBlockEncodingContext(
- Algorithm compressionAlgorithm, DataBlockEncoding encoding,
- byte[] headerBytes);
-
- /**
- * Creates an encoder specific decoding context, which will prepare the data
- * before actual decoding
- *
- * @param compressionAlgorithm
- * compression algorithm used if the data needs to be decompressed
- * @return a newly created decoding context
- */
- public HFileBlockDecodingContext newDataBlockDecodingContext(
- Algorithm compressionAlgorithm);
-
- /**
- * An interface which enable to seek while underlying data is encoded.
- *
- * It works on one HFileBlock, but it is reusable. See
- * {@link #setCurrentBuffer(ByteBuffer)}.
- */
- public static interface EncodedSeeker {
- /**
- * Set on which buffer there will be done seeking.
- * @param buffer Used for seeking.
- */
- public void setCurrentBuffer(ByteBuffer buffer);
-
- /**
- * Does a deep copy of the key at the current position. A deep copy is
- * necessary because buffers are reused in the decoder.
- * @return key at current position
- */
- public ByteBuffer getKeyDeepCopy();
-
- /**
- * Does a shallow copy of the value at the current position. A shallow
- * copy is possible because the returned buffer refers to the backing array
- * of the original encoded buffer.
- * @return value at current position
- */
- public ByteBuffer getValueShallowCopy();
-
- /** @return key value at current position. */
- public ByteBuffer getKeyValueBuffer();
-
- /**
- * @return the KeyValue object at the current position. Includes memstore
- * timestamp.
- */
- public KeyValue getKeyValue();
-
- /** Set position to beginning of given block */
- public void rewind();
-
- /**
- * Move to next position
- * @return true on success, false if there is no more positions.
- */
- public boolean next();
-
- /**
- * Moves the seeker position within the current block to:
- *
- * - the last key that that is less than or equal to the given key if
- *
seekBefore is false
- * - the last key that is strictly less than the given key if
- * seekBefore is true. The caller is responsible for loading the
- * previous block if the requested key turns out to be the first key of the
- * current block.
- *
- * @param key byte array containing the key
- * @param offset key position the array
- * @param length key length in bytes
- * @param seekBefore find the key strictly less than the given key in case
- * of an exact match. Does not matter in case of an inexact match.
- * @return 0 on exact match, 1 on inexact match.
- */
- public int seekToKeyInBlock(byte[] key, int offset, int length,
- boolean seekBefore);
- }
-}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java (working copy)
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.encoding;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.io.hfile.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFileBlock;
-
-/**
- * A decoding context that is created by a reader's encoder, and is shared
- * across the reader's all read operations.
- *
- * @see HFileBlockEncodingContext for encoding
- */
-public interface HFileBlockDecodingContext {
-
- /**
- * @return the compression algorithm used by this decoding context
- */
- public Compression.Algorithm getCompression();
-
- /**
- * Perform all actions that need to be done before the encoder's real
- * decoding process. Decompression needs to be done if
- * {@link #getCompression()} returns a valid compression algorithm.
- *
- * @param block HFile block object
- * @param onDiskBlock on disk bytes to be decoded
- * @param offset data start offset in onDiskBlock
- * @throws IOException
- */
- public void prepareDecoding(HFileBlock block, byte[] onDiskBlock,
- int offset) throws IOException;
-
-}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java (working copy)
@@ -44,17 +44,14 @@
}
@Override
- public void prepareDecoding(HFileBlock block,
- byte[] onDiskBlock, int offset) throws IOException {
- DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(
- onDiskBlock, offset,
- block.getOnDiskSizeWithoutHeader()));
+ public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
+ ByteBuffer blockBufferWithoutHeader, byte[] onDiskBlock, int offset) throws IOException {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(onDiskBlock, offset,
+ onDiskSizeWithoutHeader));
- ByteBuffer buffer = block.getBufferWithoutHeader();
- Compression.decompress(buffer.array(), buffer.arrayOffset(),
- (InputStream) dis, block.getOnDiskSizeWithoutHeader(),
- block.getUncompressedSizeWithoutHeader(), compressAlgo);
+ Compression.decompress(blockBufferWithoutHeader.array(),
+ blockBufferWithoutHeader.arrayOffset(), (InputStream) dis, onDiskSizeWithoutHeader,
+ uncompressedSizeWithoutHeader, compressAlgo);
}
@Override
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (working copy)
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.encoding;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Provide access to all data block encoding algorithms. All of the algorithms
- * are required to have unique id which should NEVER be changed. If you
- * want to add a new algorithm/version, assign it a new id. Announce the new id
- * in the HBase mailing list to prevent collisions.
- */
-@InterfaceAudience.Private
-public enum DataBlockEncoding {
-
- /** Disable data block encoding. */
- NONE(0, null),
- // id 1 is reserved for the BITSET algorithm to be added later
- PREFIX(2, new PrefixKeyDeltaEncoder()),
- DIFF(3, new DiffKeyDeltaEncoder()),
- FAST_DIFF(4, new FastDiffDeltaEncoder());
-
- private final short id;
- private final byte[] idInBytes;
- private final DataBlockEncoder encoder;
-
- public static final int ID_SIZE = Bytes.SIZEOF_SHORT;
-
- /** Maps data block encoding ids to enum instances. */
- private static Map idToEncoding =
- new HashMap();
-
- static {
- for (DataBlockEncoding algo : values()) {
- if (idToEncoding.containsKey(algo.id)) {
- throw new RuntimeException(String.format(
- "Two data block encoder algorithms '%s' and '%s' have " +
- "the same id %d",
- idToEncoding.get(algo.id).toString(), algo.toString(),
- (int) algo.id));
- }
- idToEncoding.put(algo.id, algo);
- }
- }
-
- private DataBlockEncoding(int id, DataBlockEncoder encoder) {
- if (id < Short.MIN_VALUE || id > Short.MAX_VALUE) {
- throw new AssertionError(
- "Data block encoding algorithm id is out of range: " + id);
- }
- this.id = (short) id;
- this.idInBytes = Bytes.toBytes(this.id);
- if (idInBytes.length != ID_SIZE) {
- // White this may seem redundant, if we accidentally serialize
- // the id as e.g. an int instead of a short, all encoders will break.
- throw new RuntimeException("Unexpected length of encoder ID byte " +
- "representation: " + Bytes.toStringBinary(idInBytes));
- }
- this.encoder = encoder;
- }
-
- /**
- * @return name converted to bytes.
- */
- public byte[] getNameInBytes() {
- return Bytes.toBytes(toString());
- }
-
- /**
- * @return The id of a data block encoder.
- */
- public short getId() {
- return id;
- }
-
- /**
- * Writes id in bytes.
- * @param stream where the id should be written.
- */
- public void writeIdInBytes(OutputStream stream) throws IOException {
- stream.write(idInBytes);
- }
-
-
- /**
- * Writes id bytes to the given array starting from offset.
- *
- * @param dest output array
- * @param offset starting offset of the output array
- * @throws IOException
- */
- public void writeIdInBytes(byte[] dest, int offset) throws IOException {
- System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE);
- }
-
- /**
- * Return new data block encoder for given algorithm type.
- * @return data block encoder if algorithm is specified, null if none is
- * selected.
- */
- public DataBlockEncoder getEncoder() {
- return encoder;
- }
-
- /**
- * Find and create data block encoder for given id;
- * @param encoderId id of data block encoder.
- * @return Newly created data block encoder.
- */
- public static DataBlockEncoder getDataBlockEncoderById(short encoderId) {
- if (!idToEncoding.containsKey(encoderId)) {
- throw new IllegalArgumentException(String.format(
- "There is no data block encoder for given id '%d'",
- (int) encoderId));
- }
-
- return idToEncoding.get(encoderId).getEncoder();
- }
-
- /**
- * Find and return the name of data block encoder for the given id.
- * @param encoderId id of data block encoder
- * @return name, same as used in options in column family
- */
- public static String getNameFromId(short encoderId) {
- return idToEncoding.get(encoderId).toString();
- }
-
- /**
- * Check if given encoder has this id.
- * @param encoder encoder which id will be checked
- * @param encoderId id which we except
- * @return true if id is right for given encoder, false otherwise
- * @exception IllegalArgumentException
- * thrown when there is no matching data block encoder
- */
- public static boolean isCorrectEncoder(DataBlockEncoder encoder,
- short encoderId) {
- if (!idToEncoding.containsKey(encoderId)) {
- throw new IllegalArgumentException(String.format(
- "There is no data block encoder for given id '%d'",
- (int) encoderId));
- }
-
- DataBlockEncoding algorithm = idToEncoding.get(encoderId);
- return algorithm.getClass().equals(encoder.getClass());
- }
-
- public static DataBlockEncoding getEncodingById(short dataBlockEncodingId) {
- return idToEncoding.get(dataBlockEncodingId);
- }
-
-}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java (working copy)
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.encoding;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.io.hfile.BlockType;
-import org.apache.hadoop.hbase.io.hfile.Compression;
-
-/**
- * An encoding context that is created by a writer's encoder, and is shared
- * across the writer's whole lifetime.
- *
- * @see HFileBlockDecodingContext for decoding
- *
- */
-public interface HFileBlockEncodingContext {
-
- /**
- * @return encoded and compressed bytes with header which are ready to write
- * out to disk
- */
- public byte[] getOnDiskBytesWithHeader();
-
- /**
- * @return encoded but not heavily compressed bytes with header which can be
- * cached in block cache
- */
- public byte[] getUncompressedBytesWithHeader();
-
- /**
- * @return the block type after encoding
- */
- public BlockType getBlockType();
-
- /**
- * @return the compression algorithm used by this encoding context
- */
- public Compression.Algorithm getCompression();
-
- /**
- * @return the header size used
- */
- public int getHeaderSize();
-
- /**
- * @return the {@link DataBlockEncoding} encoding used
- */
- public DataBlockEncoding getDataBlockEncoding();
-
- /**
- * Do any action that needs to be performed after the encoding.
- * Compression is also included if {@link #getCompression()} returns non-null
- * compression algorithm
- *
- * @param blockType
- * @throws IOException
- */
- public void postEncoding(BlockType blockType) throws IOException;
-
- /**
- * Releases the resources used.
- */
- public void close();
-
-}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (working copy)
@@ -1728,9 +1728,13 @@
// This will allocate a new buffer but keep header bytes.
b.allocateBuffer(nextBlockOnDiskSize > 0);
if (b.blockType.equals(BlockType.ENCODED_DATA)) {
- encodedBlockDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize);
+ encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
+ b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock,
+ hdrSize);
} else {
- defaultDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize);
+ defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
+ b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock,
+ hdrSize);
}
if (nextBlockOnDiskSize > 0) {
// Copy next block's header bytes into the new block if we have them.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (working copy)
@@ -1,220 +0,0 @@
-/*
- * Copyright 2011 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io.hfile;
-
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Various types of {@link HFile} blocks. Ordinal values of these enum constants
- * must not be relied upon. The values in the enum appear in the order they
- * appear in a version 2 {@link HFile}.
- */
-@InterfaceAudience.Private
-public enum BlockType {
-
- // Scanned block section
-
- /** Data block, both versions */
- DATA("DATABLK*", BlockCategory.DATA),
-
- /** An encoded data block (e.g. with prefix compression), version 2 */
- ENCODED_DATA("DATABLKE", BlockCategory.DATA) {
- @Override
- public int getId() {
- return DATA.ordinal();
- }
- },
-
- /** Version 2 leaf index block. Appears in the data block section */
- LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX),
-
- /** Bloom filter block, version 2 */
- BLOOM_CHUNK("BLMFBLK2", BlockCategory.BLOOM),
-
- // Non-scanned block section
-
- /** Meta blocks */
- META("METABLKc", BlockCategory.META),
-
- /** Intermediate-level version 2 index in the non-data block section */
- INTERMEDIATE_INDEX("IDXINTE2", BlockCategory.INDEX),
-
- // Load-on-open section.
-
- /** Root index block, also used for the single-level meta index, version 2 */
- ROOT_INDEX("IDXROOT2", BlockCategory.INDEX),
-
- /** File info, version 2 */
- FILE_INFO("FILEINF2", BlockCategory.META),
-
- /** General Bloom filter metadata, version 2 */
- GENERAL_BLOOM_META("BLMFMET2", BlockCategory.BLOOM),
-
- /** Delete Family Bloom filter metadata, version 2 */
- DELETE_FAMILY_BLOOM_META("DFBLMET2", BlockCategory.BLOOM),
-
- // Trailer
-
- /** Fixed file trailer, both versions (always just a magic string) */
- TRAILER("TRABLK\"$", BlockCategory.META),
-
- // Legacy blocks
-
- /** Block index magic string in version 1 */
- INDEX_V1("IDXBLK)+", BlockCategory.INDEX);
-
- public enum BlockCategory {
- DATA, META, INDEX, BLOOM, ALL_CATEGORIES, UNKNOWN;
-
- /**
- * Throws an exception if the block category passed is the special category
- * meaning "all categories".
- */
- public void expectSpecific() {
- if (this == ALL_CATEGORIES) {
- throw new IllegalArgumentException("Expected a specific block " +
- "category but got " + this);
- }
- }
- }
-
- public static final int MAGIC_LENGTH = 8;
-
- private final byte[] magic;
- private final BlockCategory metricCat;
-
- private BlockType(String magicStr, BlockCategory metricCat) {
- magic = Bytes.toBytes(magicStr);
- this.metricCat = metricCat;
- assert magic.length == MAGIC_LENGTH;
- }
-
- /**
- * Use this instead of {@link #ordinal()}. They work exactly the same, except
- * DATA and ENCODED_DATA get the same id using this method (overridden for
- * {@link #ENCODED_DATA}).
- * @return block type id from 0 to the number of block types - 1
- */
- public int getId() {
- // Default implementation, can be overridden for individual enum members.
- return ordinal();
- }
-
- public void writeToStream(OutputStream out) throws IOException {
- out.write(magic);
- }
-
- public void write(DataOutput out) throws IOException {
- out.write(magic);
- }
-
- public void write(ByteBuffer buf) {
- buf.put(magic);
- }
-
- public BlockCategory getCategory() {
- return metricCat;
- }
-
- public static BlockType parse(byte[] buf, int offset, int length)
- throws IOException {
- if (length != MAGIC_LENGTH) {
- throw new IOException("Magic record of invalid length: "
- + Bytes.toStringBinary(buf, offset, length));
- }
-
- for (BlockType blockType : values())
- if (Bytes.compareTo(blockType.magic, 0, MAGIC_LENGTH, buf, offset,
- MAGIC_LENGTH) == 0)
- return blockType;
-
- throw new IOException("Invalid HFile block magic: "
- + Bytes.toStringBinary(buf, offset, MAGIC_LENGTH));
- }
-
- public static BlockType read(DataInputStream in) throws IOException {
- byte[] buf = new byte[MAGIC_LENGTH];
- in.readFully(buf);
- return parse(buf, 0, buf.length);
- }
-
- public static BlockType read(ByteBuffer buf) throws IOException {
- BlockType blockType = parse(buf.array(),
- buf.arrayOffset() + buf.position(),
- Math.min(buf.limit() - buf.position(), MAGIC_LENGTH));
-
- // If we got here, we have read exactly MAGIC_LENGTH bytes.
- buf.position(buf.position() + MAGIC_LENGTH);
- return blockType;
- }
-
- /**
- * Put the magic record out to the specified byte array position.
- *
- * @param bytes the byte array
- * @param offset position in the array
- * @return incremented offset
- */
- public int put(byte[] bytes, int offset) {
- System.arraycopy(magic, 0, bytes, offset, MAGIC_LENGTH);
- return offset + MAGIC_LENGTH;
- }
-
- /**
- * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given
- * stream and expects it to match this block type.
- */
- public void readAndCheck(DataInputStream in) throws IOException {
- byte[] buf = new byte[MAGIC_LENGTH];
- in.readFully(buf);
- if (Bytes.compareTo(buf, magic) != 0) {
- throw new IOException("Invalid magic: expected "
- + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf));
- }
- }
-
- /**
- * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given
- * byte buffer and expects it to match this block type.
- */
- public void readAndCheck(ByteBuffer in) throws IOException {
- byte[] buf = new byte[MAGIC_LENGTH];
- in.get(buf);
- if (Bytes.compareTo(buf, magic) != 0) {
- throw new IOException("Invalid magic: expected "
- + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf));
- }
- }
-
- /**
- * @return whether this block type is encoded or unencoded data block
- */
- public final boolean isData() {
- return this == DATA || this == ENCODED_DATA;
- }
-
-}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (working copy)
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hbase.io.hfile;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * Compression related stuff.
- * Copied from hadoop-3315 tfile.
- */
-@InterfaceAudience.Private
-public final class Compression {
- static final Log LOG = LogFactory.getLog(Compression.class);
-
- /**
- * Prevent the instantiation of class.
- */
- private Compression() {
- super();
- }
-
- static class FinishOnFlushCompressionStream extends FilterOutputStream {
- public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
- super(cout);
- }
-
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- out.write(b, off, len);
- }
-
- @Override
- public void flush() throws IOException {
- CompressionOutputStream cout = (CompressionOutputStream) out;
- cout.finish();
- cout.flush();
- cout.resetState();
- }
- }
-
- /**
- * Returns the classloader to load the Codec class from.
- * @return
- */
- private static ClassLoader getClassLoaderForCodec() {
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- if (cl == null) {
- cl = Compression.class.getClassLoader();
- }
- if (cl == null) {
- cl = ClassLoader.getSystemClassLoader();
- }
- if (cl == null) {
- throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
- }
- return cl;
- }
-
- /**
- * Compression algorithms. The ordinal of these cannot change or else you
- * risk breaking all existing HFiles out there. Even the ones that are
- * not compressed! (They use the NONE algorithm)
- */
- public static enum Algorithm {
- LZO("lzo") {
- // Use base type to avoid compile-time dependencies.
- private transient CompressionCodec lzoCodec;
-
- @Override
- CompressionCodec getCodec(Configuration conf) {
- if (lzoCodec == null) {
- try {
- Class> externalCodec =
- getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
- lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
- new Configuration(conf));
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
- return lzoCodec;
- }
- },
- GZ("gz") {
- private transient GzipCodec codec;
-
- @Override
- DefaultCodec getCodec(Configuration conf) {
- if (codec == null) {
- codec = new ReusableStreamGzipCodec();
- codec.setConf(new Configuration(conf));
- }
-
- return codec;
- }
- },
-
- NONE("none") {
- @Override
- DefaultCodec getCodec(Configuration conf) {
- return null;
- }
-
- @Override
- public synchronized InputStream createDecompressionStream(
- InputStream downStream, Decompressor decompressor,
- int downStreamBufferSize) throws IOException {
- if (downStreamBufferSize > 0) {
- return new BufferedInputStream(downStream, downStreamBufferSize);
- }
- // else {
- // Make sure we bypass FSInputChecker buffer.
- // return new BufferedInputStream(downStream, 1024);
- // }
- // }
- return downStream;
- }
-
- @Override
- public synchronized OutputStream createCompressionStream(
- OutputStream downStream, Compressor compressor,
- int downStreamBufferSize) throws IOException {
- if (downStreamBufferSize > 0) {
- return new BufferedOutputStream(downStream, downStreamBufferSize);
- }
-
- return downStream;
- }
- },
- SNAPPY("snappy") {
- // Use base type to avoid compile-time dependencies.
- private transient CompressionCodec snappyCodec;
-
- @Override
- CompressionCodec getCodec(Configuration conf) {
- if (snappyCodec == null) {
- try {
- Class> externalCodec =
- getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
- snappyCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
- conf);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
- return snappyCodec;
- }
- },
- LZ4("lz4") {
- // Use base type to avoid compile-time dependencies.
- private transient CompressionCodec lz4Codec;
-
- @Override
- CompressionCodec getCodec(Configuration conf) {
- if (lz4Codec == null) {
- try {
- Class> externalCodec =
- getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
- lz4Codec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
- conf);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
- return lz4Codec;
- }
- };
-
- private final Configuration conf;
- private final String compressName;
- // data input buffer size to absorb small reads from application.
- private static final int DATA_IBUF_SIZE = 1 * 1024;
- // data output buffer size to absorb small writes from application.
- private static final int DATA_OBUF_SIZE = 4 * 1024;
-
- Algorithm(String name) {
- this.conf = new Configuration();
- this.conf.setBoolean("hadoop.native.lib", true);
- this.compressName = name;
- }
-
- abstract CompressionCodec getCodec(Configuration conf);
-
- public InputStream createDecompressionStream(
- InputStream downStream, Decompressor decompressor,
- int downStreamBufferSize) throws IOException {
- CompressionCodec codec = getCodec(conf);
- // Set the internal buffer size to read from down stream.
- if (downStreamBufferSize > 0) {
- ((Configurable)codec).getConf().setInt("io.file.buffer.size",
- downStreamBufferSize);
- }
- CompressionInputStream cis =
- codec.createInputStream(downStream, decompressor);
- BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
- return bis2;
-
- }
-
- public OutputStream createCompressionStream(
- OutputStream downStream, Compressor compressor, int downStreamBufferSize)
- throws IOException {
- OutputStream bos1 = null;
- if (downStreamBufferSize > 0) {
- bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
- }
- else {
- bos1 = downStream;
- }
- CompressionOutputStream cos =
- createPlainCompressionStream(bos1, compressor);
- BufferedOutputStream bos2 =
- new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
- DATA_OBUF_SIZE);
- return bos2;
- }
-
- /**
- * Creates a compression stream without any additional wrapping into
- * buffering streams.
- */
- public CompressionOutputStream createPlainCompressionStream(
- OutputStream downStream, Compressor compressor) throws IOException {
- CompressionCodec codec = getCodec(conf);
- ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
- return codec.createOutputStream(downStream, compressor);
- }
-
- public Compressor getCompressor() {
- CompressionCodec codec = getCodec(conf);
- if (codec != null) {
- Compressor compressor = CodecPool.getCompressor(codec);
- if (compressor != null) {
- if (compressor.finished()) {
- // Somebody returns the compressor to CodecPool but is still using
- // it.
- LOG
- .warn("Compressor obtained from CodecPool is already finished()");
- // throw new AssertionError(
- // "Compressor obtained from CodecPool is already finished()");
- }
- compressor.reset();
- }
- return compressor;
- }
- return null;
- }
-
- public void returnCompressor(Compressor compressor) {
- if (compressor != null) {
- CodecPool.returnCompressor(compressor);
- }
- }
-
- public Decompressor getDecompressor() {
- CompressionCodec codec = getCodec(conf);
- if (codec != null) {
- Decompressor decompressor = CodecPool.getDecompressor(codec);
- if (decompressor != null) {
- if (decompressor.finished()) {
- // Somebody returns the decompressor to CodecPool but is still using
- // it.
- LOG
- .warn("Deompressor obtained from CodecPool is already finished()");
- // throw new AssertionError(
- // "Decompressor obtained from CodecPool is already finished()");
- }
- decompressor.reset();
- }
- return decompressor;
- }
-
- return null;
- }
-
- public void returnDecompressor(Decompressor decompressor) {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- }
- }
-
- public String getName() {
- return compressName;
- }
- }
-
- public static Algorithm getCompressionAlgorithmByName(String compressName) {
- Algorithm[] algos = Algorithm.class.getEnumConstants();
-
- for (Algorithm a : algos) {
- if (a.getName().equals(compressName)) {
- return a;
- }
- }
-
- throw new IllegalArgumentException(
- "Unsupported compression algorithm name: " + compressName);
- }
-
- static String[] getSupportedAlgorithms() {
- Algorithm[] algos = Algorithm.class.getEnumConstants();
-
- String[] ret = new String[algos.length];
- int i = 0;
- for (Algorithm a : algos) {
- ret[i++] = a.getName();
- }
-
- return ret;
- }
-
- /**
- * Decompresses data from the given stream using the configured compression
- * algorithm. It will throw an exception if the dest buffer does not have
- * enough space to hold the decompressed data.
- *
- * @param dest
- * the output bytes buffer
- * @param destOffset
- * start writing position of the output buffer
- * @param bufferedBoundedStream
- * a stream to read compressed data from, bounded to the exact amount
- * of compressed data
- * @param compressedSize
- * compressed data size, header not included
- * @param uncompressedSize
- * uncompressed data size, header not included
- * @param compressAlgo
- * compression algorithm used
- * @throws IOException
- */
- public static void decompress(byte[] dest, int destOffset,
- InputStream bufferedBoundedStream, int compressedSize,
- int uncompressedSize, Compression.Algorithm compressAlgo)
- throws IOException {
-
- if (dest.length - destOffset < uncompressedSize) {
- throw new IllegalArgumentException(
- "Output buffer does not have enough space to hold "
- + uncompressedSize + " decompressed bytes, available: "
- + (dest.length - destOffset));
- }
-
- Decompressor decompressor = null;
- try {
- decompressor = compressAlgo.getDecompressor();
- InputStream is = compressAlgo.createDecompressionStream(
- bufferedBoundedStream, decompressor, 0);
-
- IOUtils.readFully(is, dest, destOffset, uncompressedSize);
- is.close();
- } finally {
- if (decompressor != null) {
- compressAlgo.returnDecompressor(decompressor);
- }
- }
- }
-
-}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java (revision 1355582)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java (working copy)
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.hfile;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.zip.GZIPOutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.CompressorStream;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-
-/**
- * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression
- * streams.
- */
-@InterfaceAudience.Private
-public class ReusableStreamGzipCodec extends GzipCodec {
-
- private static final Log LOG = LogFactory.getLog(Compression.class);
-
- /**
- * A bridge that wraps around a DeflaterOutputStream to make it a
- * CompressionOutputStream.
- */
- protected static class ReusableGzipOutputStream extends CompressorStream {
-
- private static final int GZIP_HEADER_LENGTH = 10;
-
- /**
- * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
- * details.
- */
- private static final byte[] GZIP_HEADER;
-
- static {
- // Capture the fixed ten-byte header hard-coded in GZIPOutputStream.
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- byte[] header = null;
- GZIPOutputStream gzipStream = null;
- try {
- gzipStream = new GZIPOutputStream(baos);
- gzipStream.finish();
- header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH);
- } catch (IOException e) {
- throw new RuntimeException("Could not create gzip stream", e);
- } finally {
- if (gzipStream != null) {
- try {
- gzipStream.close();
- } catch (IOException e) {
- LOG.error(e);
- }
- }
- }
- GZIP_HEADER = header;
- }
-
- private static class ResetableGZIPOutputStream extends GZIPOutputStream {
- public ResetableGZIPOutputStream(OutputStream out) throws IOException {
- super(out);
- }
-
- public void resetState() throws IOException {
- def.reset();
- crc.reset();
- out.write(GZIP_HEADER);
- }
- }
-
- public ReusableGzipOutputStream(OutputStream out) throws IOException {
- super(new ResetableGZIPOutputStream(out));
- }
-
- @Override
- public void close() throws IOException {
- out.close();
- }
-
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- }
-
- @Override
- public void write(byte[] data, int offset, int length) throws IOException {
- out.write(data, offset, length);
- }
-
- @Override
- public void finish() throws IOException {
- ((GZIPOutputStream) out).finish();
- }
-
- @Override
- public void resetState() throws IOException {
- ((ResetableGZIPOutputStream) out).resetState();
- }
- }
-
- @Override
- public CompressionOutputStream createOutputStream(OutputStream out)
- throws IOException {
- if (ZlibFactory.isNativeZlibLoaded(getConf())) {
- return super.createOutputStream(out);
- }
- return new ReusableGzipOutputStream(out);
- }
-
-}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (revision 0)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (revision 0)
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Encoding of KeyValue. It aims to be fast and efficient using assumptions:
+ *
+ * - the KeyValues are stored sorted by key
+ * - we know the structure of KeyValue
+ * - the values are always iterated forward from beginning of block
+ * - knowledge of Key Value format
+ *
+ * It is designed to work fast enough to be feasible as in memory compression.
+ *
+ * After encoding, it also optionally compresses the encoded data if a
+ * compression algorithm is specified in HFileBlockEncodingContext argument of
+ * {@link #encodeKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}.
+ */
+@InterfaceAudience.Private
+public interface DataBlockEncoder {
+
+ /**
+ * Encodes KeyValues. It will first encode key value pairs, and then
+ * optionally do the compression for the encoded data.
+ *
+ * @param in
+ * Source of KeyValue for compression.
+ * @param includesMemstoreTS
+ * true if including memstore timestamp after every key-value pair
+ * @param encodingContext
+ * the encoding context which will contain encoded uncompressed bytes
+ * as well as compressed encoded bytes if compression is enabled, and
+ * also it will reuse resources across multiple calls.
+ * @throws IOException
+ * If there is an error writing to output stream.
+ */
+ public void encodeKeyValues(
+ ByteBuffer in, boolean includesMemstoreTS,
+ HFileBlockEncodingContext encodingContext) throws IOException;
+
+ /**
+ * Decode.
+ * @param source Compressed stream of KeyValues.
+ * @param includesMemstoreTS true if including memstore timestamp after every
+ * key-value pair
+ * @return Uncompressed block of KeyValues.
+ * @throws IOException If there is an error in source.
+ */
+ public ByteBuffer decodeKeyValues(DataInputStream source,
+ boolean includesMemstoreTS) throws IOException;
+
+ /**
+ * Uncompress.
+ * @param source encoded stream of KeyValues.
+ * @param allocateHeaderLength allocate this many bytes for the header.
+ * @param skipLastBytes Do not copy n last bytes.
+ * @param includesMemstoreTS true if including memstore timestamp after every
+ * key-value pair
+ * @return Uncompressed block of KeyValues.
+ * @throws IOException If there is an error in source.
+ */
+ public ByteBuffer decodeKeyValues(DataInputStream source,
+ int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+ throws IOException;
+
+ /**
+ * Return first key in block. Useful for indexing. Typically does not make
+ * a deep copy but returns a buffer wrapping a segment of the actual block's
+ * byte array. This is because the first key in block is usually stored
+ * unencoded.
+ * @param block encoded block we want index, the position will not change
+ * @return First key in block.
+ */
+ public ByteBuffer getFirstKeyInBlock(ByteBuffer block);
+
+ /**
+ * Create a HFileBlock seeker which find KeyValues within a block.
+ * @param comparator what kind of comparison should be used
+ * @param includesMemstoreTS true if including memstore timestamp after every
+ * key-value pair
+ * @return A newly created seeker.
+ */
+ public EncodedSeeker createSeeker(RawComparator comparator,
+ boolean includesMemstoreTS);
+
+ /**
+ * Creates a encoder specific encoding context
+ *
+ * @param compressionAlgorithm
+ * compression algorithm used if the final data needs to be
+ * compressed
+ * @param encoding
+ * encoding strategy used
+ * @param headerBytes
+ * header bytes to be written, put a dummy header here if the header
+ * is unknown
+ * @return a newly created encoding context
+ */
+ public HFileBlockEncodingContext newDataBlockEncodingContext(
+ Algorithm compressionAlgorithm, DataBlockEncoding encoding,
+ byte[] headerBytes);
+
+ /**
+ * Creates an encoder specific decoding context, which will prepare the data
+ * before actual decoding
+ *
+ * @param compressionAlgorithm
+ * compression algorithm used if the data needs to be decompressed
+ * @return a newly created decoding context
+ */
+ public HFileBlockDecodingContext newDataBlockDecodingContext(
+ Algorithm compressionAlgorithm);
+
+ /**
+ * An interface which enable to seek while underlying data is encoded.
+ *
+ * It works on one HFileBlock, but it is reusable. See
+ * {@link #setCurrentBuffer(ByteBuffer)}.
+ */
+ public static interface EncodedSeeker {
+ /**
+ * Set on which buffer there will be done seeking.
+ * @param buffer Used for seeking.
+ */
+ public void setCurrentBuffer(ByteBuffer buffer);
+
+ /**
+ * Does a deep copy of the key at the current position. A deep copy is
+ * necessary because buffers are reused in the decoder.
+ * @return key at current position
+ */
+ public ByteBuffer getKeyDeepCopy();
+
+ /**
+ * Does a shallow copy of the value at the current position. A shallow
+ * copy is possible because the returned buffer refers to the backing array
+ * of the original encoded buffer.
+ * @return value at current position
+ */
+ public ByteBuffer getValueShallowCopy();
+
+ /** @return key value at current position. */
+ public ByteBuffer getKeyValueBuffer();
+
+ /**
+ * @return the KeyValue object at the current position. Includes memstore
+ * timestamp.
+ */
+ public KeyValue getKeyValue();
+
+ /** Set position to beginning of given block */
+ public void rewind();
+
+ /**
+ * Move to next position
+ * @return true on success, false if there is no more positions.
+ */
+ public boolean next();
+
+ /**
+ * Moves the seeker position within the current block to:
+ *
+ * - the last key that that is less than or equal to the given key if
+ *
seekBefore is false
+ * - the last key that is strictly less than the given key if
+ * seekBefore is true. The caller is responsible for loading the
+ * previous block if the requested key turns out to be the first key of the
+ * current block.
+ *
+ * @param key byte array containing the key
+ * @param offset key position the array
+ * @param length key length in bytes
+ * @param seekBefore find the key strictly less than the given key in case
+ * of an exact match. Does not matter in case of an inexact match.
+ * @return 0 on exact match, 1 on inexact match.
+ */
+ public int seekToKeyInBlock(byte[] key, int offset, int length,
+ boolean seekBefore);
+ }
+}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java (revision 0)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java (revision 0)
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.hfile.Compression;
+
+/**
+ * A decoding context that is created by a reader's encoder, and is shared
+ * across the reader's all read operations.
+ *
+ * @see HFileBlockEncodingContext for encoding
+ */
+public interface HFileBlockDecodingContext {
+
+ /**
+ * @return the compression algorithm used by this decoding context
+ */
+ public Compression.Algorithm getCompression();
+
+ /**
+ * Perform all actions that need to be done before the encoder's real decoding process.
+ * Decompression needs to be done if {@link #getCompression()} returns a valid compression
+ * algorithm.
+ *
+ * @param onDiskSizeWithoutHeader numBytes after block and encoding headers
+ * @param uncompressedSizeWithoutHeader numBytes without header required to store the block after
+ * decompressing (not decoding)
+ * @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data
+ * @param onDiskBlock on disk bytes to be decoded
+ * @param offset data start offset in onDiskBlock
+ * @throws IOException
+ */
+ public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
+ ByteBuffer blockBufferWithoutHeader, byte[] onDiskBlock, int offset) throws IOException;
+
+}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (revision 0)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (revision 0)
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Provide access to all data block encoding algorithms. All of the algorithms
+ * are required to have unique id which should NEVER be changed. If you
+ * want to add a new algorithm/version, assign it a new id. Announce the new id
+ * in the HBase mailing list to prevent collisions.
+ */
+@InterfaceAudience.Private
+public enum DataBlockEncoding {
+
+ /** Disable data block encoding. */
+ NONE(0, null),
+ // id 1 is reserved for the BITSET algorithm to be added later
+ PREFIX(2, "org.apache.hadoop.hbase.io.encoding.PrefixKeyDeltaEncoder"),
+ DIFF(3, "org.apache.hadoop.hbase.io.encoding.DiffKeyDeltaEncoder"),
+ FAST_DIFF(4, "org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder");
+
+ private final short id;
+ private final byte[] idInBytes;
+ private DataBlockEncoder encoder;
+ private final String encoderClassName;
+
+ public static final int ID_SIZE = Bytes.SIZEOF_SHORT;
+
+ /** Maps data block encoding ids to enum instances. */
+ private static Map idToEncoding =
+ new HashMap();
+
+ static {
+ for (DataBlockEncoding algo : values()) {
+ if (idToEncoding.containsKey(algo.id)) {
+ throw new RuntimeException(String.format(
+ "Two data block encoder algorithms '%s' and '%s' have " +
+ "the same id %d",
+ idToEncoding.get(algo.id).toString(), algo.toString(),
+ (int) algo.id));
+ }
+ idToEncoding.put(algo.id, algo);
+ }
+ }
+
+ private DataBlockEncoding(int id, String encoderClsName) {
+ if (id < Short.MIN_VALUE || id > Short.MAX_VALUE) {
+ throw new AssertionError(
+ "Data block encoding algorithm id is out of range: " + id);
+ }
+ this.id = (short) id;
+ this.idInBytes = Bytes.toBytes(this.id);
+ if (idInBytes.length != ID_SIZE) {
+ // White this may seem redundant, if we accidentally serialize
+ // the id as e.g. an int instead of a short, all encoders will break.
+ throw new RuntimeException("Unexpected length of encoder ID byte " +
+ "representation: " + Bytes.toStringBinary(idInBytes));
+ }
+ this.encoderClassName = encoderClsName;
+ }
+
+ /**
+ * @return name converted to bytes.
+ */
+ public byte[] getNameInBytes() {
+ return Bytes.toBytes(toString());
+ }
+
+ /**
+ * @return The id of a data block encoder.
+ */
+ public short getId() {
+ return id;
+ }
+
+ /**
+ * Writes id in bytes.
+ * @param stream where the id should be written.
+ */
+ public void writeIdInBytes(OutputStream stream) throws IOException {
+ stream.write(idInBytes);
+ }
+
+
+ /**
+ * Writes id bytes to the given array starting from offset.
+ *
+ * @param dest output array
+ * @param offset starting offset of the output array
+ * @throws IOException
+ */
+ public void writeIdInBytes(byte[] dest, int offset) throws IOException {
+ System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE);
+ }
+
+ /**
+ * Return new data block encoder for given algorithm type.
+ * @return data block encoder if algorithm is specified, null if none is
+ * selected.
+ */
+ public DataBlockEncoder getEncoder() {
+ if (encoder == null && encoderClassName != null) encoder = createEncoder(encoderClassName);
+ return encoder;
+ }
+
+ /**
+ * Find and create data block encoder for given id;
+ * @param encoderId id of data block encoder.
+ * @return Newly created data block encoder.
+ */
+ public static DataBlockEncoder getDataBlockEncoderById(short encoderId) {
+ if (!idToEncoding.containsKey(encoderId)) {
+ throw new IllegalArgumentException(String.format(
+ "There is no data block encoder for given id '%d'",
+ (int) encoderId));
+ }
+
+ return idToEncoding.get(encoderId).getEncoder();
+ }
+
+ /**
+ * Find and return the name of data block encoder for the given id.
+ * @param encoderId id of data block encoder
+ * @return name, same as used in options in column family
+ */
+ public static String getNameFromId(short encoderId) {
+ return idToEncoding.get(encoderId).toString();
+ }
+
+ /**
+ * Check if given encoder has this id.
+ * @param encoder encoder which id will be checked
+ * @param encoderId id which we except
+ * @return true if id is right for given encoder, false otherwise
+ * @exception IllegalArgumentException
+ * thrown when there is no matching data block encoder
+ */
+ public static boolean isCorrectEncoder(DataBlockEncoder encoder,
+ short encoderId) {
+ if (!idToEncoding.containsKey(encoderId)) {
+ throw new IllegalArgumentException(String.format(
+ "There is no data block encoder for given id '%d'",
+ (int) encoderId));
+ }
+
+ DataBlockEncoding algorithm = idToEncoding.get(encoderId);
+ return algorithm.getClass().equals(encoder.getClass());
+ }
+
+ public static DataBlockEncoding getEncodingById(short dataBlockEncodingId) {
+ return idToEncoding.get(dataBlockEncodingId);
+ }
+
+ protected static DataBlockEncoder createEncoder(String fullyQualifiedClassName){
+ try {
+ return (DataBlockEncoder)Class.forName(fullyQualifiedClassName).newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java (revision 0)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java (revision 0)
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+
+/**
+ * An encoding context that is created by a writer's encoder, and is shared
+ * across the writer's whole lifetime.
+ *
+ * @see HFileBlockDecodingContext for decoding
+ *
+ */
+public interface HFileBlockEncodingContext {
+
+ /**
+ * @return encoded and compressed bytes with header which are ready to write
+ * out to disk
+ */
+ public byte[] getOnDiskBytesWithHeader();
+
+ /**
+ * @return encoded but not heavily compressed bytes with header which can be
+ * cached in block cache
+ */
+ public byte[] getUncompressedBytesWithHeader();
+
+ /**
+ * @return the block type after encoding
+ */
+ public BlockType getBlockType();
+
+ /**
+ * @return the compression algorithm used by this encoding context
+ */
+ public Compression.Algorithm getCompression();
+
+ /**
+ * @return the header size used
+ */
+ public int getHeaderSize();
+
+ /**
+ * @return the {@link DataBlockEncoding} encoding used
+ */
+ public DataBlockEncoding getDataBlockEncoding();
+
+ /**
+ * Do any action that needs to be performed after the encoding.
+ * Compression is also included if {@link #getCompression()} returns non-null
+ * compression algorithm
+ *
+ * @param blockType
+ * @throws IOException
+ */
+ public void postEncoding(BlockType blockType) throws IOException;
+
+ /**
+ * Releases the resources used.
+ */
+ public void close();
+
+}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (revision 0)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (revision 0)
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Compression related stuff.
+ * Copied from hadoop-3315 tfile.
+ */
+@InterfaceAudience.Private
+public final class Compression {
+ static final Log LOG = LogFactory.getLog(Compression.class);
+
+ /**
+ * Prevent the instantiation of class.
+ */
+ private Compression() {
+ super();
+ }
+
+ static class FinishOnFlushCompressionStream extends FilterOutputStream {
+ public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
+ super(cout);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ CompressionOutputStream cout = (CompressionOutputStream) out;
+ cout.finish();
+ cout.flush();
+ cout.resetState();
+ }
+ }
+
+ /**
+ * Returns the classloader to load the Codec class from.
+ * @return
+ */
+ private static ClassLoader getClassLoaderForCodec() {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if (cl == null) {
+ cl = Compression.class.getClassLoader();
+ }
+ if (cl == null) {
+ cl = ClassLoader.getSystemClassLoader();
+ }
+ if (cl == null) {
+ throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
+ }
+ return cl;
+ }
+
+ /**
+ * Compression algorithms. The ordinal of these cannot change or else you
+ * risk breaking all existing HFiles out there. Even the ones that are
+ * not compressed! (They use the NONE algorithm)
+ */
+ public static enum Algorithm {
+ LZO("lzo") {
+ // Use base type to avoid compile-time dependencies.
+ private transient CompressionCodec lzoCodec;
+
+ @Override
+ CompressionCodec getCodec(Configuration conf) {
+ if (lzoCodec == null) {
+ try {
+ Class> externalCodec =
+ getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
+ lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
+ new Configuration(conf));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return lzoCodec;
+ }
+ },
+ GZ("gz") {
+ private transient GzipCodec codec;
+
+ @Override
+ DefaultCodec getCodec(Configuration conf) {
+ if (codec == null) {
+ codec = new ReusableStreamGzipCodec();
+ codec.setConf(new Configuration(conf));
+ }
+
+ return codec;
+ }
+ },
+
+ NONE("none") {
+ @Override
+ DefaultCodec getCodec(Configuration conf) {
+ return null;
+ }
+
+ @Override
+ public synchronized InputStream createDecompressionStream(
+ InputStream downStream, Decompressor decompressor,
+ int downStreamBufferSize) throws IOException {
+ if (downStreamBufferSize > 0) {
+ return new BufferedInputStream(downStream, downStreamBufferSize);
+ }
+ // else {
+ // Make sure we bypass FSInputChecker buffer.
+ // return new BufferedInputStream(downStream, 1024);
+ // }
+ // }
+ return downStream;
+ }
+
+ @Override
+ public synchronized OutputStream createCompressionStream(
+ OutputStream downStream, Compressor compressor,
+ int downStreamBufferSize) throws IOException {
+ if (downStreamBufferSize > 0) {
+ return new BufferedOutputStream(downStream, downStreamBufferSize);
+ }
+
+ return downStream;
+ }
+ },
+ SNAPPY("snappy") {
+ // Use base type to avoid compile-time dependencies.
+ private transient CompressionCodec snappyCodec;
+
+ @Override
+ CompressionCodec getCodec(Configuration conf) {
+ if (snappyCodec == null) {
+ try {
+ Class> externalCodec =
+ getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
+ snappyCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return snappyCodec;
+ }
+ },
+ LZ4("lz4") {
+ // Use base type to avoid compile-time dependencies.
+ private transient CompressionCodec lz4Codec;
+
+ @Override
+ CompressionCodec getCodec(Configuration conf) {
+ if (lz4Codec == null) {
+ try {
+ Class> externalCodec =
+ getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
+ lz4Codec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return lz4Codec;
+ }
+ };
+
+ private final Configuration conf;
+ private final String compressName;
+ // data input buffer size to absorb small reads from application.
+ private static final int DATA_IBUF_SIZE = 1 * 1024;
+ // data output buffer size to absorb small writes from application.
+ private static final int DATA_OBUF_SIZE = 4 * 1024;
+
+ Algorithm(String name) {
+ this.conf = new Configuration();
+ this.conf.setBoolean("hadoop.native.lib", true);
+ this.compressName = name;
+ }
+
+ abstract CompressionCodec getCodec(Configuration conf);
+
+ public InputStream createDecompressionStream(
+ InputStream downStream, Decompressor decompressor,
+ int downStreamBufferSize) throws IOException {
+ CompressionCodec codec = getCodec(conf);
+ // Set the internal buffer size to read from down stream.
+ if (downStreamBufferSize > 0) {
+ ((Configurable)codec).getConf().setInt("io.file.buffer.size",
+ downStreamBufferSize);
+ }
+ CompressionInputStream cis =
+ codec.createInputStream(downStream, decompressor);
+ BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
+ return bis2;
+
+ }
+
+ public OutputStream createCompressionStream(
+ OutputStream downStream, Compressor compressor, int downStreamBufferSize)
+ throws IOException {
+ OutputStream bos1 = null;
+ if (downStreamBufferSize > 0) {
+ bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
+ }
+ else {
+ bos1 = downStream;
+ }
+ CompressionOutputStream cos =
+ createPlainCompressionStream(bos1, compressor);
+ BufferedOutputStream bos2 =
+ new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
+ DATA_OBUF_SIZE);
+ return bos2;
+ }
+
+ /**
+ * Creates a compression stream without any additional wrapping into
+ * buffering streams.
+ */
+ public CompressionOutputStream createPlainCompressionStream(
+ OutputStream downStream, Compressor compressor) throws IOException {
+ CompressionCodec codec = getCodec(conf);
+ ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
+ return codec.createOutputStream(downStream, compressor);
+ }
+
+ public Compressor getCompressor() {
+ CompressionCodec codec = getCodec(conf);
+ if (codec != null) {
+ Compressor compressor = CodecPool.getCompressor(codec);
+ if (compressor != null) {
+ if (compressor.finished()) {
+ // Somebody returns the compressor to CodecPool but is still using
+ // it.
+ LOG
+ .warn("Compressor obtained from CodecPool is already finished()");
+ // throw new AssertionError(
+ // "Compressor obtained from CodecPool is already finished()");
+ }
+ compressor.reset();
+ }
+ return compressor;
+ }
+ return null;
+ }
+
+ public void returnCompressor(Compressor compressor) {
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ }
+ }
+
+ public Decompressor getDecompressor() {
+ CompressionCodec codec = getCodec(conf);
+ if (codec != null) {
+ Decompressor decompressor = CodecPool.getDecompressor(codec);
+ if (decompressor != null) {
+ if (decompressor.finished()) {
+ // Somebody returns the decompressor to CodecPool but is still using
+ // it.
+ LOG
+ .warn("Deompressor obtained from CodecPool is already finished()");
+ // throw new AssertionError(
+ // "Decompressor obtained from CodecPool is already finished()");
+ }
+ decompressor.reset();
+ }
+ return decompressor;
+ }
+
+ return null;
+ }
+
+ public void returnDecompressor(Decompressor decompressor) {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
+ }
+
+ public String getName() {
+ return compressName;
+ }
+ }
+
+ public static Algorithm getCompressionAlgorithmByName(String compressName) {
+ Algorithm[] algos = Algorithm.class.getEnumConstants();
+
+ for (Algorithm a : algos) {
+ if (a.getName().equals(compressName)) {
+ return a;
+ }
+ }
+
+ throw new IllegalArgumentException(
+ "Unsupported compression algorithm name: " + compressName);
+ }
+
+ static String[] getSupportedAlgorithms() {
+ Algorithm[] algos = Algorithm.class.getEnumConstants();
+
+ String[] ret = new String[algos.length];
+ int i = 0;
+ for (Algorithm a : algos) {
+ ret[i++] = a.getName();
+ }
+
+ return ret;
+ }
+
+ /**
+ * Decompresses data from the given stream using the configured compression
+ * algorithm. It will throw an exception if the dest buffer does not have
+ * enough space to hold the decompressed data.
+ *
+ * @param dest
+ * the output bytes buffer
+ * @param destOffset
+ * start writing position of the output buffer
+ * @param bufferedBoundedStream
+ * a stream to read compressed data from, bounded to the exact amount
+ * of compressed data
+ * @param compressedSize
+ * compressed data size, header not included
+ * @param uncompressedSize
+ * uncompressed data size, header not included
+ * @param compressAlgo
+ * compression algorithm used
+ * @throws IOException
+ */
+ public static void decompress(byte[] dest, int destOffset,
+ InputStream bufferedBoundedStream, int compressedSize,
+ int uncompressedSize, Compression.Algorithm compressAlgo)
+ throws IOException {
+
+ if (dest.length - destOffset < uncompressedSize) {
+ throw new IllegalArgumentException(
+ "Output buffer does not have enough space to hold "
+ + uncompressedSize + " decompressed bytes, available: "
+ + (dest.length - destOffset));
+ }
+
+ Decompressor decompressor = null;
+ try {
+ decompressor = compressAlgo.getDecompressor();
+ InputStream is = compressAlgo.createDecompressionStream(
+ bufferedBoundedStream, decompressor, 0);
+
+ IOUtils.readFully(is, dest, destOffset, uncompressedSize);
+ is.close();
+ } finally {
+ if (decompressor != null) {
+ compressAlgo.returnDecompressor(decompressor);
+ }
+ }
+ }
+
+}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java (revision 0)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java (revision 0)
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.CompressorStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+
+/**
+ * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression
+ * streams.
+ */
+@InterfaceAudience.Private
+public class ReusableStreamGzipCodec extends GzipCodec {
+
+ private static final Log LOG = LogFactory.getLog(Compression.class);
+
+ /**
+ * A bridge that wraps around a DeflaterOutputStream to make it a
+ * CompressionOutputStream.
+ */
+ protected static class ReusableGzipOutputStream extends CompressorStream {
+
+ private static final int GZIP_HEADER_LENGTH = 10;
+
+ /**
+ * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
+ * details.
+ */
+ private static final byte[] GZIP_HEADER;
+
+ static {
+ // Capture the fixed ten-byte header hard-coded in GZIPOutputStream.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] header = null;
+ GZIPOutputStream gzipStream = null;
+ try {
+ gzipStream = new GZIPOutputStream(baos);
+ gzipStream.finish();
+ header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create gzip stream", e);
+ } finally {
+ if (gzipStream != null) {
+ try {
+ gzipStream.close();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ GZIP_HEADER = header;
+ }
+
+ private static class ResetableGZIPOutputStream extends GZIPOutputStream {
+ public ResetableGZIPOutputStream(OutputStream out) throws IOException {
+ super(out);
+ }
+
+ public void resetState() throws IOException {
+ def.reset();
+ crc.reset();
+ out.write(GZIP_HEADER);
+ }
+ }
+
+ public ReusableGzipOutputStream(OutputStream out) throws IOException {
+ super(new ResetableGZIPOutputStream(out));
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] data, int offset, int length) throws IOException {
+ out.write(data, offset, length);
+ }
+
+ @Override
+ public void finish() throws IOException {
+ ((GZIPOutputStream) out).finish();
+ }
+
+ @Override
+ public void resetState() throws IOException {
+ ((ResetableGZIPOutputStream) out).resetState();
+ }
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ if (ZlibFactory.isNativeZlibLoaded(getConf())) {
+ return super.createOutputStream(out);
+ }
+ return new ReusableGzipOutputStream(out);
+ }
+
+}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (revision 0)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (revision 0)
@@ -0,0 +1,220 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Various types of {@link HFile} blocks. Ordinal values of these enum constants
+ * must not be relied upon. The values in the enum appear in the order they
+ * appear in a version 2 {@link HFile}.
+ */
+@InterfaceAudience.Private
+public enum BlockType {
+
+ // Scanned block section
+
+ /** Data block, both versions */
+ DATA("DATABLK*", BlockCategory.DATA),
+
+ /** An encoded data block (e.g. with prefix compression), version 2 */
+ ENCODED_DATA("DATABLKE", BlockCategory.DATA) {
+ @Override
+ public int getId() {
+ return DATA.ordinal();
+ }
+ },
+
+ /** Version 2 leaf index block. Appears in the data block section */
+ LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX),
+
+ /** Bloom filter block, version 2 */
+ BLOOM_CHUNK("BLMFBLK2", BlockCategory.BLOOM),
+
+ // Non-scanned block section
+
+ /** Meta blocks */
+ META("METABLKc", BlockCategory.META),
+
+ /** Intermediate-level version 2 index in the non-data block section */
+ INTERMEDIATE_INDEX("IDXINTE2", BlockCategory.INDEX),
+
+ // Load-on-open section.
+
+ /** Root index block, also used for the single-level meta index, version 2 */
+ ROOT_INDEX("IDXROOT2", BlockCategory.INDEX),
+
+ /** File info, version 2 */
+ FILE_INFO("FILEINF2", BlockCategory.META),
+
+ /** General Bloom filter metadata, version 2 */
+ GENERAL_BLOOM_META("BLMFMET2", BlockCategory.BLOOM),
+
+ /** Delete Family Bloom filter metadata, version 2 */
+ DELETE_FAMILY_BLOOM_META("DFBLMET2", BlockCategory.BLOOM),
+
+ // Trailer
+
+ /** Fixed file trailer, both versions (always just a magic string) */
+ TRAILER("TRABLK\"$", BlockCategory.META),
+
+ // Legacy blocks
+
+ /** Block index magic string in version 1 */
+ INDEX_V1("IDXBLK)+", BlockCategory.INDEX);
+
+ public enum BlockCategory {
+ DATA, META, INDEX, BLOOM, ALL_CATEGORIES, UNKNOWN;
+
+ /**
+ * Throws an exception if the block category passed is the special category
+ * meaning "all categories".
+ */
+ public void expectSpecific() {
+ if (this == ALL_CATEGORIES) {
+ throw new IllegalArgumentException("Expected a specific block " +
+ "category but got " + this);
+ }
+ }
+ }
+
+ public static final int MAGIC_LENGTH = 8;
+
+ private final byte[] magic;
+ private final BlockCategory metricCat;
+
+ private BlockType(String magicStr, BlockCategory metricCat) {
+ magic = Bytes.toBytes(magicStr);
+ this.metricCat = metricCat;
+ assert magic.length == MAGIC_LENGTH;
+ }
+
+ /**
+ * Use this instead of {@link #ordinal()}. They work exactly the same, except
+ * DATA and ENCODED_DATA get the same id using this method (overridden for
+ * {@link #ENCODED_DATA}).
+ * @return block type id from 0 to the number of block types - 1
+ */
+ public int getId() {
+ // Default implementation, can be overridden for individual enum members.
+ return ordinal();
+ }
+
+ public void writeToStream(OutputStream out) throws IOException {
+ out.write(magic);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.write(magic);
+ }
+
+ public void write(ByteBuffer buf) {
+ buf.put(magic);
+ }
+
+ public BlockCategory getCategory() {
+ return metricCat;
+ }
+
+ public static BlockType parse(byte[] buf, int offset, int length)
+ throws IOException {
+ if (length != MAGIC_LENGTH) {
+ throw new IOException("Magic record of invalid length: "
+ + Bytes.toStringBinary(buf, offset, length));
+ }
+
+ for (BlockType blockType : values())
+ if (Bytes.compareTo(blockType.magic, 0, MAGIC_LENGTH, buf, offset,
+ MAGIC_LENGTH) == 0)
+ return blockType;
+
+ throw new IOException("Invalid HFile block magic: "
+ + Bytes.toStringBinary(buf, offset, MAGIC_LENGTH));
+ }
+
+ public static BlockType read(DataInputStream in) throws IOException {
+ byte[] buf = new byte[MAGIC_LENGTH];
+ in.readFully(buf);
+ return parse(buf, 0, buf.length);
+ }
+
+ public static BlockType read(ByteBuffer buf) throws IOException {
+ BlockType blockType = parse(buf.array(),
+ buf.arrayOffset() + buf.position(),
+ Math.min(buf.limit() - buf.position(), MAGIC_LENGTH));
+
+ // If we got here, we have read exactly MAGIC_LENGTH bytes.
+ buf.position(buf.position() + MAGIC_LENGTH);
+ return blockType;
+ }
+
+ /**
+ * Put the magic record out to the specified byte array position.
+ *
+ * @param bytes the byte array
+ * @param offset position in the array
+ * @return incremented offset
+ */
+ public int put(byte[] bytes, int offset) {
+ System.arraycopy(magic, 0, bytes, offset, MAGIC_LENGTH);
+ return offset + MAGIC_LENGTH;
+ }
+
+ /**
+ * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given
+ * stream and expects it to match this block type.
+ */
+ public void readAndCheck(DataInputStream in) throws IOException {
+ byte[] buf = new byte[MAGIC_LENGTH];
+ in.readFully(buf);
+ if (Bytes.compareTo(buf, magic) != 0) {
+ throw new IOException("Invalid magic: expected "
+ + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf));
+ }
+ }
+
+ /**
+ * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given
+ * byte buffer and expects it to match this block type.
+ */
+ public void readAndCheck(ByteBuffer in) throws IOException {
+ byte[] buf = new byte[MAGIC_LENGTH];
+ in.get(buf);
+ if (Bytes.compareTo(buf, magic) != 0) {
+ throw new IOException("Invalid magic: expected "
+ + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf));
+ }
+ }
+
+ /**
+ * @return whether this block type is encoded or unencoded data block
+ */
+ public final boolean isData() {
+ return this == DATA || this == ENCODED_DATA;
+ }
+
+}