diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java new file mode 100644 index 0000000..c659fb8 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; +import org.apache.hadoop.io.RawComparator; + +/** + * Encoding of KeyValue. It aims to be fast and efficient using assumptions: + * + * It is designed to work fast enough to be feasible as in memory compression. + * + * After encoding, it also optionally compresses the encoded data if a + * compression algorithm is specified in HFileBlockEncodingContext argument of + * {@link #encodeKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}. + */ +@InterfaceAudience.Private +public interface DataBlockEncoder { + + /** + * Encodes KeyValues. It will first encode key value pairs, and then + * optionally do the compression for the encoded data. + * + * @param in + * Source of KeyValue for compression. + * @param includesMemstoreTS + * true if including memstore timestamp after every key-value pair + * @param encodingContext + * the encoding context which will contain encoded uncompressed bytes + * as well as compressed encoded bytes if compression is enabled, and + * also it will reuse resources across multiple calls. + * @throws IOException + * If there is an error writing to output stream. + */ + public void encodeKeyValues( + ByteBuffer in, boolean includesMemstoreTS, + HFileBlockEncodingContext encodingContext) throws IOException; + + /** + * Decode. + * @param source Compressed stream of KeyValues. + * @param includesMemstoreTS true if including memstore timestamp after every + * key-value pair + * @return Uncompressed block of KeyValues. + * @throws IOException If there is an error in source. + */ + public ByteBuffer decodeKeyValues(DataInputStream source, + boolean includesMemstoreTS) throws IOException; + + /** + * Uncompress. + * @param source encoded stream of KeyValues. + * @param allocateHeaderLength allocate this many bytes for the header. + * @param skipLastBytes Do not copy n last bytes. + * @param includesMemstoreTS true if including memstore timestamp after every + * key-value pair + * @return Uncompressed block of KeyValues. + * @throws IOException If there is an error in source. + */ + public ByteBuffer decodeKeyValues(DataInputStream source, + int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + throws IOException; + + /** + * Return first key in block. Useful for indexing. Typically does not make + * a deep copy but returns a buffer wrapping a segment of the actual block's + * byte array. This is because the first key in block is usually stored + * unencoded. + * @param block encoded block we want index, the position will not change + * @return First key in block. + */ + public ByteBuffer getFirstKeyInBlock(ByteBuffer block); + + /** + * Create a HFileBlock seeker which find KeyValues within a block. + * @param comparator what kind of comparison should be used + * @param includesMemstoreTS true if including memstore timestamp after every + * key-value pair + * @return A newly created seeker. + */ + public EncodedSeeker createSeeker(RawComparator comparator, + boolean includesMemstoreTS); + + /** + * Creates a encoder specific encoding context + * + * @param compressionAlgorithm + * compression algorithm used if the final data needs to be + * compressed + * @param encoding + * encoding strategy used + * @param headerBytes + * header bytes to be written, put a dummy header here if the header + * is unknown + * @return a newly created encoding context + */ + public HFileBlockEncodingContext newDataBlockEncodingContext( + Algorithm compressionAlgorithm, DataBlockEncoding encoding, + byte[] headerBytes); + + /** + * Creates an encoder specific decoding context, which will prepare the data + * before actual decoding + * + * @param compressionAlgorithm + * compression algorithm used if the data needs to be decompressed + * @return a newly created decoding context + */ + public HFileBlockDecodingContext newDataBlockDecodingContext( + Algorithm compressionAlgorithm); + + /** + * An interface which enable to seek while underlying data is encoded. + * + * It works on one HFileBlock, but it is reusable. See + * {@link #setCurrentBuffer(ByteBuffer)}. + */ + public static interface EncodedSeeker { + /** + * Set on which buffer there will be done seeking. + * @param buffer Used for seeking. + */ + public void setCurrentBuffer(ByteBuffer buffer); + + /** + * Does a deep copy of the key at the current position. A deep copy is + * necessary because buffers are reused in the decoder. + * @return key at current position + */ + public ByteBuffer getKeyDeepCopy(); + + /** + * Does a shallow copy of the value at the current position. A shallow + * copy is possible because the returned buffer refers to the backing array + * of the original encoded buffer. + * @return value at current position + */ + public ByteBuffer getValueShallowCopy(); + + /** @return key value at current position. */ + public ByteBuffer getKeyValueBuffer(); + + /** + * @return the KeyValue object at the current position. Includes memstore + * timestamp. + */ + public KeyValue getKeyValue(); + + /** Set position to beginning of given block */ + public void rewind(); + + /** + * Move to next position + * @return true on success, false if there is no more positions. + */ + public boolean next(); + + /** + * Moves the seeker position within the current block to: + * + * @param key byte array containing the key + * @param offset key position the array + * @param length key length in bytes + * @param seekBefore find the key strictly less than the given key in case + * of an exact match. Does not matter in case of an inexact match. + * @return 0 on exact match, 1 on inexact match. + */ + public int seekToKeyInBlock(byte[] key, int offset, int length, + boolean seekBefore); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java new file mode 100644 index 0000000..cba3d36 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Provide access to all data block encoding algorithms. All of the algorithms + * are required to have unique id which should NEVER be changed. If you + * want to add a new algorithm/version, assign it a new id. Announce the new id + * in the HBase mailing list to prevent collisions. + */ +@InterfaceAudience.Private +public enum DataBlockEncoding { + + /** Disable data block encoding. */ + NONE(0, null), + // id 1 is reserved for the BITSET algorithm to be added later + PREFIX(2, createEncoder("org.apache.hadoop.hbase.io.encoding.PrefixKeyDeltaEncoder")), + DIFF(3, createEncoder("org.apache.hadoop.hbase.io.encoding.DiffKeyDeltaEncoder")), + FAST_DIFF(4, createEncoder("org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder")); + + private final short id; + private final byte[] idInBytes; + private final DataBlockEncoder encoder; + + public static final int ID_SIZE = Bytes.SIZEOF_SHORT; + + /** Maps data block encoding ids to enum instances. */ + private static Map idToEncoding = + new HashMap(); + + static { + for (DataBlockEncoding algo : values()) { + if (idToEncoding.containsKey(algo.id)) { + throw new RuntimeException(String.format( + "Two data block encoder algorithms '%s' and '%s' have " + + "the same id %d", + idToEncoding.get(algo.id).toString(), algo.toString(), + (int) algo.id)); + } + idToEncoding.put(algo.id, algo); + } + } + + private DataBlockEncoding(int id, DataBlockEncoder encoder) { + if (id < Short.MIN_VALUE || id > Short.MAX_VALUE) { + throw new AssertionError( + "Data block encoding algorithm id is out of range: " + id); + } + this.id = (short) id; + this.idInBytes = Bytes.toBytes(this.id); + if (idInBytes.length != ID_SIZE) { + // White this may seem redundant, if we accidentally serialize + // the id as e.g. an int instead of a short, all encoders will break. + throw new RuntimeException("Unexpected length of encoder ID byte " + + "representation: " + Bytes.toStringBinary(idInBytes)); + } + this.encoder = encoder; + } + + /** + * @return name converted to bytes. + */ + public byte[] getNameInBytes() { + return Bytes.toBytes(toString()); + } + + /** + * @return The id of a data block encoder. + */ + public short getId() { + return id; + } + + /** + * Writes id in bytes. + * @param stream where the id should be written. + */ + public void writeIdInBytes(OutputStream stream) throws IOException { + stream.write(idInBytes); + } + + + /** + * Writes id bytes to the given array starting from offset. + * + * @param dest output array + * @param offset starting offset of the output array + * @throws IOException + */ + public void writeIdInBytes(byte[] dest, int offset) throws IOException { + System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE); + } + + /** + * Return new data block encoder for given algorithm type. + * @return data block encoder if algorithm is specified, null if none is + * selected. + */ + public DataBlockEncoder getEncoder() { + return encoder; + } + + /** + * Find and create data block encoder for given id; + * @param encoderId id of data block encoder. + * @return Newly created data block encoder. + */ + public static DataBlockEncoder getDataBlockEncoderById(short encoderId) { + if (!idToEncoding.containsKey(encoderId)) { + throw new IllegalArgumentException(String.format( + "There is no data block encoder for given id '%d'", + (int) encoderId)); + } + + return idToEncoding.get(encoderId).getEncoder(); + } + + /** + * Find and return the name of data block encoder for the given id. + * @param encoderId id of data block encoder + * @return name, same as used in options in column family + */ + public static String getNameFromId(short encoderId) { + return idToEncoding.get(encoderId).toString(); + } + + /** + * Check if given encoder has this id. + * @param encoder encoder which id will be checked + * @param encoderId id which we except + * @return true if id is right for given encoder, false otherwise + * @exception IllegalArgumentException + * thrown when there is no matching data block encoder + */ + public static boolean isCorrectEncoder(DataBlockEncoder encoder, + short encoderId) { + if (!idToEncoding.containsKey(encoderId)) { + throw new IllegalArgumentException(String.format( + "There is no data block encoder for given id '%d'", + (int) encoderId)); + } + + DataBlockEncoding algorithm = idToEncoding.get(encoderId); + return algorithm.getClass().equals(encoder.getClass()); + } + + public static DataBlockEncoding getEncodingById(short dataBlockEncodingId) { + return idToEncoding.get(dataBlockEncodingId); + } + + protected static DataBlockEncoder createEncoder(String fullyQualifiedClassName){ + try { + return (DataBlockEncoder)Class.forName(fullyQualifiedClassName).newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java new file mode 100644 index 0000000..de6256f --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.io.hfile.Compression; + +/** + * A decoding context that is created by a reader's encoder, and is shared + * across the reader's all read operations. + * + * @see HFileBlockEncodingContext for encoding + */ +public interface HFileBlockDecodingContext { + + /** + * @return the compression algorithm used by this decoding context + */ + public Compression.Algorithm getCompression(); + + /** + * Perform all actions that need to be done before the encoder's real decoding process. + * Decompression needs to be done if {@link #getCompression()} returns a valid compression + * algorithm. + * + * @param onDiskSizeWithoutHeader numBytes after block and encoding headers + * @param uncompressedSizeWithoutHeader numBytes without header required to store the block after + * decompressing (not decoding) + * @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data + * @param onDiskBlock on disk bytes to be decoded + * @param offset data start offset in onDiskBlock + * @throws IOException + */ + public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, + ByteBuffer blockBufferWithoutHeader, byte[] onDiskBlock, int offset) throws IOException; + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java new file mode 100644 index 0000000..45f2749 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.Compression; + +/** + * An encoding context that is created by a writer's encoder, and is shared + * across the writer's whole lifetime. + * + * @see HFileBlockDecodingContext for decoding + * + */ +public interface HFileBlockEncodingContext { + + /** + * @return encoded and compressed bytes with header which are ready to write + * out to disk + */ + public byte[] getOnDiskBytesWithHeader(); + + /** + * @return encoded but not heavily compressed bytes with header which can be + * cached in block cache + */ + public byte[] getUncompressedBytesWithHeader(); + + /** + * @return the block type after encoding + */ + public BlockType getBlockType(); + + /** + * @return the compression algorithm used by this encoding context + */ + public Compression.Algorithm getCompression(); + + /** + * @return the header size used + */ + public int getHeaderSize(); + + /** + * @return the {@link DataBlockEncoding} encoding used + */ + public DataBlockEncoding getDataBlockEncoding(); + + /** + * Do any action that needs to be performed after the encoding. + * Compression is also included if {@link #getCompression()} returns non-null + * compression algorithm + * + * @param blockType + * @throws IOException + */ + public void postEncoding(BlockType blockType) throws IOException; + + /** + * Releases the resources used. + */ + public void close(); + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java new file mode 100644 index 0000000..423db17 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java @@ -0,0 +1,220 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Various types of {@link HFile} blocks. Ordinal values of these enum constants + * must not be relied upon. The values in the enum appear in the order they + * appear in a version 2 {@link HFile}. + */ +@InterfaceAudience.Private +public enum BlockType { + + // Scanned block section + + /** Data block, both versions */ + DATA("DATABLK*", BlockCategory.DATA), + + /** An encoded data block (e.g. with prefix compression), version 2 */ + ENCODED_DATA("DATABLKE", BlockCategory.DATA) { + @Override + public int getId() { + return DATA.ordinal(); + } + }, + + /** Version 2 leaf index block. Appears in the data block section */ + LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX), + + /** Bloom filter block, version 2 */ + BLOOM_CHUNK("BLMFBLK2", BlockCategory.BLOOM), + + // Non-scanned block section + + /** Meta blocks */ + META("METABLKc", BlockCategory.META), + + /** Intermediate-level version 2 index in the non-data block section */ + INTERMEDIATE_INDEX("IDXINTE2", BlockCategory.INDEX), + + // Load-on-open section. + + /** Root index block, also used for the single-level meta index, version 2 */ + ROOT_INDEX("IDXROOT2", BlockCategory.INDEX), + + /** File info, version 2 */ + FILE_INFO("FILEINF2", BlockCategory.META), + + /** General Bloom filter metadata, version 2 */ + GENERAL_BLOOM_META("BLMFMET2", BlockCategory.BLOOM), + + /** Delete Family Bloom filter metadata, version 2 */ + DELETE_FAMILY_BLOOM_META("DFBLMET2", BlockCategory.BLOOM), + + // Trailer + + /** Fixed file trailer, both versions (always just a magic string) */ + TRAILER("TRABLK\"$", BlockCategory.META), + + // Legacy blocks + + /** Block index magic string in version 1 */ + INDEX_V1("IDXBLK)+", BlockCategory.INDEX); + + public enum BlockCategory { + DATA, META, INDEX, BLOOM, ALL_CATEGORIES, UNKNOWN; + + /** + * Throws an exception if the block category passed is the special category + * meaning "all categories". + */ + public void expectSpecific() { + if (this == ALL_CATEGORIES) { + throw new IllegalArgumentException("Expected a specific block " + + "category but got " + this); + } + } + } + + public static final int MAGIC_LENGTH = 8; + + private final byte[] magic; + private final BlockCategory metricCat; + + private BlockType(String magicStr, BlockCategory metricCat) { + magic = Bytes.toBytes(magicStr); + this.metricCat = metricCat; + assert magic.length == MAGIC_LENGTH; + } + + /** + * Use this instead of {@link #ordinal()}. They work exactly the same, except + * DATA and ENCODED_DATA get the same id using this method (overridden for + * {@link #ENCODED_DATA}). + * @return block type id from 0 to the number of block types - 1 + */ + public int getId() { + // Default implementation, can be overridden for individual enum members. + return ordinal(); + } + + public void writeToStream(OutputStream out) throws IOException { + out.write(magic); + } + + public void write(DataOutput out) throws IOException { + out.write(magic); + } + + public void write(ByteBuffer buf) { + buf.put(magic); + } + + public BlockCategory getCategory() { + return metricCat; + } + + public static BlockType parse(byte[] buf, int offset, int length) + throws IOException { + if (length != MAGIC_LENGTH) { + throw new IOException("Magic record of invalid length: " + + Bytes.toStringBinary(buf, offset, length)); + } + + for (BlockType blockType : values()) + if (Bytes.compareTo(blockType.magic, 0, MAGIC_LENGTH, buf, offset, + MAGIC_LENGTH) == 0) + return blockType; + + throw new IOException("Invalid HFile block magic: " + + Bytes.toStringBinary(buf, offset, MAGIC_LENGTH)); + } + + public static BlockType read(DataInputStream in) throws IOException { + byte[] buf = new byte[MAGIC_LENGTH]; + in.readFully(buf); + return parse(buf, 0, buf.length); + } + + public static BlockType read(ByteBuffer buf) throws IOException { + BlockType blockType = parse(buf.array(), + buf.arrayOffset() + buf.position(), + Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)); + + // If we got here, we have read exactly MAGIC_LENGTH bytes. + buf.position(buf.position() + MAGIC_LENGTH); + return blockType; + } + + /** + * Put the magic record out to the specified byte array position. + * + * @param bytes the byte array + * @param offset position in the array + * @return incremented offset + */ + public int put(byte[] bytes, int offset) { + System.arraycopy(magic, 0, bytes, offset, MAGIC_LENGTH); + return offset + MAGIC_LENGTH; + } + + /** + * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given + * stream and expects it to match this block type. + */ + public void readAndCheck(DataInputStream in) throws IOException { + byte[] buf = new byte[MAGIC_LENGTH]; + in.readFully(buf); + if (Bytes.compareTo(buf, magic) != 0) { + throw new IOException("Invalid magic: expected " + + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf)); + } + } + + /** + * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given + * byte buffer and expects it to match this block type. + */ + public void readAndCheck(ByteBuffer in) throws IOException { + byte[] buf = new byte[MAGIC_LENGTH]; + in.get(buf); + if (Bytes.compareTo(buf, magic) != 0) { + throw new IOException("Invalid magic: expected " + + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf)); + } + } + + /** + * @return whether this block type is encoded or unencoded data block + */ + public final boolean isData() { + return this == DATA || this == ENCODED_DATA; + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java new file mode 100644 index 0000000..06194c5 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java @@ -0,0 +1,392 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Compression related stuff. + * Copied from hadoop-3315 tfile. + */ +@InterfaceAudience.Private +public final class Compression { + static final Log LOG = LogFactory.getLog(Compression.class); + + /** + * Prevent the instantiation of class. + */ + private Compression() { + super(); + } + + static class FinishOnFlushCompressionStream extends FilterOutputStream { + public FinishOnFlushCompressionStream(CompressionOutputStream cout) { + super(cout); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + CompressionOutputStream cout = (CompressionOutputStream) out; + cout.finish(); + cout.flush(); + cout.resetState(); + } + } + + /** + * Returns the classloader to load the Codec class from. + * @return + */ + private static ClassLoader getClassLoaderForCodec() { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (cl == null) { + cl = Compression.class.getClassLoader(); + } + if (cl == null) { + cl = ClassLoader.getSystemClassLoader(); + } + if (cl == null) { + throw new RuntimeException("A ClassLoader to load the Codec could not be determined"); + } + return cl; + } + + /** + * Compression algorithms. The ordinal of these cannot change or else you + * risk breaking all existing HFiles out there. Even the ones that are + * not compressed! (They use the NONE algorithm) + */ + public static enum Algorithm { + LZO("lzo") { + // Use base type to avoid compile-time dependencies. + private transient CompressionCodec lzoCodec; + + @Override + CompressionCodec getCodec(Configuration conf) { + if (lzoCodec == null) { + try { + Class externalCodec = + getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec"); + lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, + new Configuration(conf)); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + return lzoCodec; + } + }, + GZ("gz") { + private transient GzipCodec codec; + + @Override + DefaultCodec getCodec(Configuration conf) { + if (codec == null) { + codec = new ReusableStreamGzipCodec(); + codec.setConf(new Configuration(conf)); + } + + return codec; + } + }, + + NONE("none") { + @Override + DefaultCodec getCodec(Configuration conf) { + return null; + } + + @Override + public synchronized InputStream createDecompressionStream( + InputStream downStream, Decompressor decompressor, + int downStreamBufferSize) throws IOException { + if (downStreamBufferSize > 0) { + return new BufferedInputStream(downStream, downStreamBufferSize); + } + // else { + // Make sure we bypass FSInputChecker buffer. + // return new BufferedInputStream(downStream, 1024); + // } + // } + return downStream; + } + + @Override + public synchronized OutputStream createCompressionStream( + OutputStream downStream, Compressor compressor, + int downStreamBufferSize) throws IOException { + if (downStreamBufferSize > 0) { + return new BufferedOutputStream(downStream, downStreamBufferSize); + } + + return downStream; + } + }, + SNAPPY("snappy") { + // Use base type to avoid compile-time dependencies. + private transient CompressionCodec snappyCodec; + + @Override + CompressionCodec getCodec(Configuration conf) { + if (snappyCodec == null) { + try { + Class externalCodec = + getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec"); + snappyCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + return snappyCodec; + } + }, + LZ4("lz4") { + // Use base type to avoid compile-time dependencies. + private transient CompressionCodec lz4Codec; + + @Override + CompressionCodec getCodec(Configuration conf) { + if (lz4Codec == null) { + try { + Class externalCodec = + getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec"); + lz4Codec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + return lz4Codec; + } + }; + + private final Configuration conf; + private final String compressName; + // data input buffer size to absorb small reads from application. + private static final int DATA_IBUF_SIZE = 1 * 1024; + // data output buffer size to absorb small writes from application. + private static final int DATA_OBUF_SIZE = 4 * 1024; + + Algorithm(String name) { + this.conf = new Configuration(); + this.conf.setBoolean("hadoop.native.lib", true); + this.compressName = name; + } + + abstract CompressionCodec getCodec(Configuration conf); + + public InputStream createDecompressionStream( + InputStream downStream, Decompressor decompressor, + int downStreamBufferSize) throws IOException { + CompressionCodec codec = getCodec(conf); + // Set the internal buffer size to read from down stream. + if (downStreamBufferSize > 0) { + ((Configurable)codec).getConf().setInt("io.file.buffer.size", + downStreamBufferSize); + } + CompressionInputStream cis = + codec.createInputStream(downStream, decompressor); + BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); + return bis2; + + } + + public OutputStream createCompressionStream( + OutputStream downStream, Compressor compressor, int downStreamBufferSize) + throws IOException { + OutputStream bos1 = null; + if (downStreamBufferSize > 0) { + bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); + } + else { + bos1 = downStream; + } + CompressionOutputStream cos = + createPlainCompressionStream(bos1, compressor); + BufferedOutputStream bos2 = + new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), + DATA_OBUF_SIZE); + return bos2; + } + + /** + * Creates a compression stream without any additional wrapping into + * buffering streams. + */ + public CompressionOutputStream createPlainCompressionStream( + OutputStream downStream, Compressor compressor) throws IOException { + CompressionCodec codec = getCodec(conf); + ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); + return codec.createOutputStream(downStream, compressor); + } + + public Compressor getCompressor() { + CompressionCodec codec = getCodec(conf); + if (codec != null) { + Compressor compressor = CodecPool.getCompressor(codec); + if (compressor != null) { + if (compressor.finished()) { + // Somebody returns the compressor to CodecPool but is still using + // it. + LOG + .warn("Compressor obtained from CodecPool is already finished()"); + // throw new AssertionError( + // "Compressor obtained from CodecPool is already finished()"); + } + compressor.reset(); + } + return compressor; + } + return null; + } + + public void returnCompressor(Compressor compressor) { + if (compressor != null) { + CodecPool.returnCompressor(compressor); + } + } + + public Decompressor getDecompressor() { + CompressionCodec codec = getCodec(conf); + if (codec != null) { + Decompressor decompressor = CodecPool.getDecompressor(codec); + if (decompressor != null) { + if (decompressor.finished()) { + // Somebody returns the decompressor to CodecPool but is still using + // it. + LOG + .warn("Deompressor obtained from CodecPool is already finished()"); + // throw new AssertionError( + // "Decompressor obtained from CodecPool is already finished()"); + } + decompressor.reset(); + } + return decompressor; + } + + return null; + } + + public void returnDecompressor(Decompressor decompressor) { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + } + + public String getName() { + return compressName; + } + } + + public static Algorithm getCompressionAlgorithmByName(String compressName) { + Algorithm[] algos = Algorithm.class.getEnumConstants(); + + for (Algorithm a : algos) { + if (a.getName().equals(compressName)) { + return a; + } + } + + throw new IllegalArgumentException( + "Unsupported compression algorithm name: " + compressName); + } + + static String[] getSupportedAlgorithms() { + Algorithm[] algos = Algorithm.class.getEnumConstants(); + + String[] ret = new String[algos.length]; + int i = 0; + for (Algorithm a : algos) { + ret[i++] = a.getName(); + } + + return ret; + } + + /** + * Decompresses data from the given stream using the configured compression + * algorithm. It will throw an exception if the dest buffer does not have + * enough space to hold the decompressed data. + * + * @param dest + * the output bytes buffer + * @param destOffset + * start writing position of the output buffer + * @param bufferedBoundedStream + * a stream to read compressed data from, bounded to the exact amount + * of compressed data + * @param compressedSize + * compressed data size, header not included + * @param uncompressedSize + * uncompressed data size, header not included + * @param compressAlgo + * compression algorithm used + * @throws IOException + */ + public static void decompress(byte[] dest, int destOffset, + InputStream bufferedBoundedStream, int compressedSize, + int uncompressedSize, Compression.Algorithm compressAlgo) + throws IOException { + + if (dest.length - destOffset < uncompressedSize) { + throw new IllegalArgumentException( + "Output buffer does not have enough space to hold " + + uncompressedSize + " decompressed bytes, available: " + + (dest.length - destOffset)); + } + + Decompressor decompressor = null; + try { + decompressor = compressAlgo.getDecompressor(); + InputStream is = compressAlgo.createDecompressionStream( + bufferedBoundedStream, decompressor, 0); + + IOUtils.readFully(is, dest, destOffset, uncompressedSize); + is.close(); + } finally { + if (decompressor != null) { + compressAlgo.returnDecompressor(decompressor); + } + } + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java new file mode 100644 index 0000000..7f372c5 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.CompressorStream; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; + +/** + * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression + * streams. + */ +@InterfaceAudience.Private +public class ReusableStreamGzipCodec extends GzipCodec { + + private static final Log LOG = LogFactory.getLog(Compression.class); + + /** + * A bridge that wraps around a DeflaterOutputStream to make it a + * CompressionOutputStream. + */ + protected static class ReusableGzipOutputStream extends CompressorStream { + + private static final int GZIP_HEADER_LENGTH = 10; + + /** + * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for + * details. + */ + private static final byte[] GZIP_HEADER; + + static { + // Capture the fixed ten-byte header hard-coded in GZIPOutputStream. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] header = null; + GZIPOutputStream gzipStream = null; + try { + gzipStream = new GZIPOutputStream(baos); + gzipStream.finish(); + header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH); + } catch (IOException e) { + throw new RuntimeException("Could not create gzip stream", e); + } finally { + if (gzipStream != null) { + try { + gzipStream.close(); + } catch (IOException e) { + LOG.error(e); + } + } + } + GZIP_HEADER = header; + } + + private static class ResetableGZIPOutputStream extends GZIPOutputStream { + public ResetableGZIPOutputStream(OutputStream out) throws IOException { + super(out); + } + + public void resetState() throws IOException { + def.reset(); + crc.reset(); + out.write(GZIP_HEADER); + } + } + + public ReusableGzipOutputStream(OutputStream out) throws IOException { + super(new ResetableGZIPOutputStream(out)); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + out.write(data, offset, length); + } + + @Override + public void finish() throws IOException { + ((GZIPOutputStream) out).finish(); + } + + @Override + public void resetState() throws IOException { + ((ResetableGZIPOutputStream) out).resetState(); + } + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + if (ZlibFactory.isNativeZlibLoaded(getConf())) { + return super.createOutputStream(out); + } + return new ReusableGzipOutputStream(out); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java deleted file mode 100644 index c659fb8..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.encoding; - -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; -import org.apache.hadoop.io.RawComparator; - -/** - * Encoding of KeyValue. It aims to be fast and efficient using assumptions: - *
    - *
  • the KeyValues are stored sorted by key
  • - *
  • we know the structure of KeyValue
  • - *
  • the values are always iterated forward from beginning of block
  • - *
  • knowledge of Key Value format
  • - *
- * It is designed to work fast enough to be feasible as in memory compression. - * - * After encoding, it also optionally compresses the encoded data if a - * compression algorithm is specified in HFileBlockEncodingContext argument of - * {@link #encodeKeyValues(ByteBuffer, boolean, HFileBlockEncodingContext)}. - */ -@InterfaceAudience.Private -public interface DataBlockEncoder { - - /** - * Encodes KeyValues. It will first encode key value pairs, and then - * optionally do the compression for the encoded data. - * - * @param in - * Source of KeyValue for compression. - * @param includesMemstoreTS - * true if including memstore timestamp after every key-value pair - * @param encodingContext - * the encoding context which will contain encoded uncompressed bytes - * as well as compressed encoded bytes if compression is enabled, and - * also it will reuse resources across multiple calls. - * @throws IOException - * If there is an error writing to output stream. - */ - public void encodeKeyValues( - ByteBuffer in, boolean includesMemstoreTS, - HFileBlockEncodingContext encodingContext) throws IOException; - - /** - * Decode. - * @param source Compressed stream of KeyValues. - * @param includesMemstoreTS true if including memstore timestamp after every - * key-value pair - * @return Uncompressed block of KeyValues. - * @throws IOException If there is an error in source. - */ - public ByteBuffer decodeKeyValues(DataInputStream source, - boolean includesMemstoreTS) throws IOException; - - /** - * Uncompress. - * @param source encoded stream of KeyValues. - * @param allocateHeaderLength allocate this many bytes for the header. - * @param skipLastBytes Do not copy n last bytes. - * @param includesMemstoreTS true if including memstore timestamp after every - * key-value pair - * @return Uncompressed block of KeyValues. - * @throws IOException If there is an error in source. - */ - public ByteBuffer decodeKeyValues(DataInputStream source, - int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS) - throws IOException; - - /** - * Return first key in block. Useful for indexing. Typically does not make - * a deep copy but returns a buffer wrapping a segment of the actual block's - * byte array. This is because the first key in block is usually stored - * unencoded. - * @param block encoded block we want index, the position will not change - * @return First key in block. - */ - public ByteBuffer getFirstKeyInBlock(ByteBuffer block); - - /** - * Create a HFileBlock seeker which find KeyValues within a block. - * @param comparator what kind of comparison should be used - * @param includesMemstoreTS true if including memstore timestamp after every - * key-value pair - * @return A newly created seeker. - */ - public EncodedSeeker createSeeker(RawComparator comparator, - boolean includesMemstoreTS); - - /** - * Creates a encoder specific encoding context - * - * @param compressionAlgorithm - * compression algorithm used if the final data needs to be - * compressed - * @param encoding - * encoding strategy used - * @param headerBytes - * header bytes to be written, put a dummy header here if the header - * is unknown - * @return a newly created encoding context - */ - public HFileBlockEncodingContext newDataBlockEncodingContext( - Algorithm compressionAlgorithm, DataBlockEncoding encoding, - byte[] headerBytes); - - /** - * Creates an encoder specific decoding context, which will prepare the data - * before actual decoding - * - * @param compressionAlgorithm - * compression algorithm used if the data needs to be decompressed - * @return a newly created decoding context - */ - public HFileBlockDecodingContext newDataBlockDecodingContext( - Algorithm compressionAlgorithm); - - /** - * An interface which enable to seek while underlying data is encoded. - * - * It works on one HFileBlock, but it is reusable. See - * {@link #setCurrentBuffer(ByteBuffer)}. - */ - public static interface EncodedSeeker { - /** - * Set on which buffer there will be done seeking. - * @param buffer Used for seeking. - */ - public void setCurrentBuffer(ByteBuffer buffer); - - /** - * Does a deep copy of the key at the current position. A deep copy is - * necessary because buffers are reused in the decoder. - * @return key at current position - */ - public ByteBuffer getKeyDeepCopy(); - - /** - * Does a shallow copy of the value at the current position. A shallow - * copy is possible because the returned buffer refers to the backing array - * of the original encoded buffer. - * @return value at current position - */ - public ByteBuffer getValueShallowCopy(); - - /** @return key value at current position. */ - public ByteBuffer getKeyValueBuffer(); - - /** - * @return the KeyValue object at the current position. Includes memstore - * timestamp. - */ - public KeyValue getKeyValue(); - - /** Set position to beginning of given block */ - public void rewind(); - - /** - * Move to next position - * @return true on success, false if there is no more positions. - */ - public boolean next(); - - /** - * Moves the seeker position within the current block to: - *
    - *
  • the last key that that is less than or equal to the given key if - * seekBefore is false
  • - *
  • the last key that is strictly less than the given key if - * seekBefore is true. The caller is responsible for loading the - * previous block if the requested key turns out to be the first key of the - * current block.
  • - *
- * @param key byte array containing the key - * @param offset key position the array - * @param length key length in bytes - * @param seekBefore find the key strictly less than the given key in case - * of an exact match. Does not matter in case of an inexact match. - * @return 0 on exact match, 1 on inexact match. - */ - public int seekToKeyInBlock(byte[] key, int offset, int length, - boolean seekBefore); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java deleted file mode 100644 index 79abff3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.encoding; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * Provide access to all data block encoding algorithms. All of the algorithms - * are required to have unique id which should NEVER be changed. If you - * want to add a new algorithm/version, assign it a new id. Announce the new id - * in the HBase mailing list to prevent collisions. - */ -@InterfaceAudience.Private -public enum DataBlockEncoding { - - /** Disable data block encoding. */ - NONE(0, null), - // id 1 is reserved for the BITSET algorithm to be added later - PREFIX(2, new PrefixKeyDeltaEncoder()), - DIFF(3, new DiffKeyDeltaEncoder()), - FAST_DIFF(4, new FastDiffDeltaEncoder()); - - private final short id; - private final byte[] idInBytes; - private final DataBlockEncoder encoder; - - public static final int ID_SIZE = Bytes.SIZEOF_SHORT; - - /** Maps data block encoding ids to enum instances. */ - private static Map idToEncoding = - new HashMap(); - - static { - for (DataBlockEncoding algo : values()) { - if (idToEncoding.containsKey(algo.id)) { - throw new RuntimeException(String.format( - "Two data block encoder algorithms '%s' and '%s' have " + - "the same id %d", - idToEncoding.get(algo.id).toString(), algo.toString(), - (int) algo.id)); - } - idToEncoding.put(algo.id, algo); - } - } - - private DataBlockEncoding(int id, DataBlockEncoder encoder) { - if (id < Short.MIN_VALUE || id > Short.MAX_VALUE) { - throw new AssertionError( - "Data block encoding algorithm id is out of range: " + id); - } - this.id = (short) id; - this.idInBytes = Bytes.toBytes(this.id); - if (idInBytes.length != ID_SIZE) { - // White this may seem redundant, if we accidentally serialize - // the id as e.g. an int instead of a short, all encoders will break. - throw new RuntimeException("Unexpected length of encoder ID byte " + - "representation: " + Bytes.toStringBinary(idInBytes)); - } - this.encoder = encoder; - } - - /** - * @return name converted to bytes. - */ - public byte[] getNameInBytes() { - return Bytes.toBytes(toString()); - } - - /** - * @return The id of a data block encoder. - */ - public short getId() { - return id; - } - - /** - * Writes id in bytes. - * @param stream where the id should be written. - */ - public void writeIdInBytes(OutputStream stream) throws IOException { - stream.write(idInBytes); - } - - - /** - * Writes id bytes to the given array starting from offset. - * - * @param dest output array - * @param offset starting offset of the output array - * @throws IOException - */ - public void writeIdInBytes(byte[] dest, int offset) throws IOException { - System.arraycopy(idInBytes, 0, dest, offset, ID_SIZE); - } - - /** - * Return new data block encoder for given algorithm type. - * @return data block encoder if algorithm is specified, null if none is - * selected. - */ - public DataBlockEncoder getEncoder() { - return encoder; - } - - /** - * Find and create data block encoder for given id; - * @param encoderId id of data block encoder. - * @return Newly created data block encoder. - */ - public static DataBlockEncoder getDataBlockEncoderById(short encoderId) { - if (!idToEncoding.containsKey(encoderId)) { - throw new IllegalArgumentException(String.format( - "There is no data block encoder for given id '%d'", - (int) encoderId)); - } - - return idToEncoding.get(encoderId).getEncoder(); - } - - /** - * Find and return the name of data block encoder for the given id. - * @param encoderId id of data block encoder - * @return name, same as used in options in column family - */ - public static String getNameFromId(short encoderId) { - return idToEncoding.get(encoderId).toString(); - } - - /** - * Check if given encoder has this id. - * @param encoder encoder which id will be checked - * @param encoderId id which we except - * @return true if id is right for given encoder, false otherwise - * @exception IllegalArgumentException - * thrown when there is no matching data block encoder - */ - public static boolean isCorrectEncoder(DataBlockEncoder encoder, - short encoderId) { - if (!idToEncoding.containsKey(encoderId)) { - throw new IllegalArgumentException(String.format( - "There is no data block encoder for given id '%d'", - (int) encoderId)); - } - - DataBlockEncoding algorithm = idToEncoding.get(encoderId); - return algorithm.getClass().equals(encoder.getClass()); - } - - public static DataBlockEncoding getEncodingById(short dataBlockEncodingId) { - return idToEncoding.get(dataBlockEncodingId); - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java deleted file mode 100644 index 04a115c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.encoding; - -import java.io.IOException; - -import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.io.hfile.HFileBlock; - -/** - * A decoding context that is created by a reader's encoder, and is shared - * across the reader's all read operations. - * - * @see HFileBlockEncodingContext for encoding - */ -public interface HFileBlockDecodingContext { - - /** - * @return the compression algorithm used by this decoding context - */ - public Compression.Algorithm getCompression(); - - /** - * Perform all actions that need to be done before the encoder's real - * decoding process. Decompression needs to be done if - * {@link #getCompression()} returns a valid compression algorithm. - * - * @param block HFile block object - * @param onDiskBlock on disk bytes to be decoded - * @param offset data start offset in onDiskBlock - * @throws IOException - */ - public void prepareDecoding(HFileBlock block, byte[] onDiskBlock, - int offset) throws IOException; - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java index 0a99a43..dee40aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java @@ -44,17 +44,14 @@ public class HFileBlockDefaultDecodingContext implements } @Override - public void prepareDecoding(HFileBlock block, - byte[] onDiskBlock, int offset) throws IOException { - DataInputStream dis = - new DataInputStream(new ByteArrayInputStream( - onDiskBlock, offset, - block.getOnDiskSizeWithoutHeader())); - - ByteBuffer buffer = block.getBufferWithoutHeader(); - Compression.decompress(buffer.array(), buffer.arrayOffset(), - (InputStream) dis, block.getOnDiskSizeWithoutHeader(), - block.getUncompressedSizeWithoutHeader(), compressAlgo); + public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, + ByteBuffer blockBufferWithoutHeader, byte[] onDiskBlock, int offset) throws IOException { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(onDiskBlock, offset, + onDiskSizeWithoutHeader)); + + Compression.decompress(blockBufferWithoutHeader.array(), + blockBufferWithoutHeader.arrayOffset(), (InputStream) dis, onDiskSizeWithoutHeader, + uncompressedSizeWithoutHeader, compressAlgo); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java deleted file mode 100644 index 45f2749..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.encoding; - -import java.io.IOException; - -import org.apache.hadoop.hbase.io.hfile.BlockType; -import org.apache.hadoop.hbase.io.hfile.Compression; - -/** - * An encoding context that is created by a writer's encoder, and is shared - * across the writer's whole lifetime. - * - * @see HFileBlockDecodingContext for decoding - * - */ -public interface HFileBlockEncodingContext { - - /** - * @return encoded and compressed bytes with header which are ready to write - * out to disk - */ - public byte[] getOnDiskBytesWithHeader(); - - /** - * @return encoded but not heavily compressed bytes with header which can be - * cached in block cache - */ - public byte[] getUncompressedBytesWithHeader(); - - /** - * @return the block type after encoding - */ - public BlockType getBlockType(); - - /** - * @return the compression algorithm used by this encoding context - */ - public Compression.Algorithm getCompression(); - - /** - * @return the header size used - */ - public int getHeaderSize(); - - /** - * @return the {@link DataBlockEncoding} encoding used - */ - public DataBlockEncoding getDataBlockEncoding(); - - /** - * Do any action that needs to be performed after the encoding. - * Compression is also included if {@link #getCompression()} returns non-null - * compression algorithm - * - * @param blockType - * @throws IOException - */ - public void postEncoding(BlockType blockType) throws IOException; - - /** - * Releases the resources used. - */ - public void close(); - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java deleted file mode 100644 index e3c4fe4..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright 2011 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * Various types of {@link HFile} blocks. Ordinal values of these enum constants - * must not be relied upon. The values in the enum appear in the order they - * appear in a version 2 {@link HFile}. - */ -@InterfaceAudience.Private -public enum BlockType { - - // Scanned block section - - /** Data block, both versions */ - DATA("DATABLK*", BlockCategory.DATA), - - /** An encoded data block (e.g. with prefix compression), version 2 */ - ENCODED_DATA("DATABLKE", BlockCategory.DATA) { - @Override - public int getId() { - return DATA.ordinal(); - } - }, - - /** Version 2 leaf index block. Appears in the data block section */ - LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX), - - /** Bloom filter block, version 2 */ - BLOOM_CHUNK("BLMFBLK2", BlockCategory.BLOOM), - - // Non-scanned block section - - /** Meta blocks */ - META("METABLKc", BlockCategory.META), - - /** Intermediate-level version 2 index in the non-data block section */ - INTERMEDIATE_INDEX("IDXINTE2", BlockCategory.INDEX), - - // Load-on-open section. - - /** Root index block, also used for the single-level meta index, version 2 */ - ROOT_INDEX("IDXROOT2", BlockCategory.INDEX), - - /** File info, version 2 */ - FILE_INFO("FILEINF2", BlockCategory.META), - - /** General Bloom filter metadata, version 2 */ - GENERAL_BLOOM_META("BLMFMET2", BlockCategory.BLOOM), - - /** Delete Family Bloom filter metadata, version 2 */ - DELETE_FAMILY_BLOOM_META("DFBLMET2", BlockCategory.BLOOM), - - // Trailer - - /** Fixed file trailer, both versions (always just a magic string) */ - TRAILER("TRABLK\"$", BlockCategory.META), - - // Legacy blocks - - /** Block index magic string in version 1 */ - INDEX_V1("IDXBLK)+", BlockCategory.INDEX); - - public enum BlockCategory { - DATA, META, INDEX, BLOOM, ALL_CATEGORIES, UNKNOWN; - - /** - * Throws an exception if the block category passed is the special category - * meaning "all categories". - */ - public void expectSpecific() { - if (this == ALL_CATEGORIES) { - throw new IllegalArgumentException("Expected a specific block " + - "category but got " + this); - } - } - } - - public static final int MAGIC_LENGTH = 8; - - private final byte[] magic; - private final BlockCategory metricCat; - - private BlockType(String magicStr, BlockCategory metricCat) { - magic = Bytes.toBytes(magicStr); - this.metricCat = metricCat; - assert magic.length == MAGIC_LENGTH; - } - - /** - * Use this instead of {@link #ordinal()}. They work exactly the same, except - * DATA and ENCODED_DATA get the same id using this method (overridden for - * {@link #ENCODED_DATA}). - * @return block type id from 0 to the number of block types - 1 - */ - public int getId() { - // Default implementation, can be overridden for individual enum members. - return ordinal(); - } - - public void writeToStream(OutputStream out) throws IOException { - out.write(magic); - } - - public void write(DataOutput out) throws IOException { - out.write(magic); - } - - public void write(ByteBuffer buf) { - buf.put(magic); - } - - public BlockCategory getCategory() { - return metricCat; - } - - public static BlockType parse(byte[] buf, int offset, int length) - throws IOException { - if (length != MAGIC_LENGTH) { - throw new IOException("Magic record of invalid length: " - + Bytes.toStringBinary(buf, offset, length)); - } - - for (BlockType blockType : values()) - if (Bytes.compareTo(blockType.magic, 0, MAGIC_LENGTH, buf, offset, - MAGIC_LENGTH) == 0) - return blockType; - - throw new IOException("Invalid HFile block magic: " - + Bytes.toStringBinary(buf, offset, MAGIC_LENGTH)); - } - - public static BlockType read(DataInputStream in) throws IOException { - byte[] buf = new byte[MAGIC_LENGTH]; - in.readFully(buf); - return parse(buf, 0, buf.length); - } - - public static BlockType read(ByteBuffer buf) throws IOException { - BlockType blockType = parse(buf.array(), - buf.arrayOffset() + buf.position(), - Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)); - - // If we got here, we have read exactly MAGIC_LENGTH bytes. - buf.position(buf.position() + MAGIC_LENGTH); - return blockType; - } - - /** - * Put the magic record out to the specified byte array position. - * - * @param bytes the byte array - * @param offset position in the array - * @return incremented offset - */ - public int put(byte[] bytes, int offset) { - System.arraycopy(magic, 0, bytes, offset, MAGIC_LENGTH); - return offset + MAGIC_LENGTH; - } - - /** - * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given - * stream and expects it to match this block type. - */ - public void readAndCheck(DataInputStream in) throws IOException { - byte[] buf = new byte[MAGIC_LENGTH]; - in.readFully(buf); - if (Bytes.compareTo(buf, magic) != 0) { - throw new IOException("Invalid magic: expected " - + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf)); - } - } - - /** - * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given - * byte buffer and expects it to match this block type. - */ - public void readAndCheck(ByteBuffer in) throws IOException { - byte[] buf = new byte[MAGIC_LENGTH]; - in.get(buf); - if (Bytes.compareTo(buf, magic) != 0) { - throw new IOException("Invalid magic: expected " - + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf)); - } - } - - /** - * @return whether this block type is encoded or unencoded data block - */ - public final boolean isData() { - return this == DATA || this == ENCODED_DATA; - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java deleted file mode 100644 index 616ac6e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * Compression related stuff. - * Copied from hadoop-3315 tfile. - */ -@InterfaceAudience.Private -public final class Compression { - static final Log LOG = LogFactory.getLog(Compression.class); - - /** - * Prevent the instantiation of class. - */ - private Compression() { - super(); - } - - static class FinishOnFlushCompressionStream extends FilterOutputStream { - public FinishOnFlushCompressionStream(CompressionOutputStream cout) { - super(cout); - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - out.write(b, off, len); - } - - @Override - public void flush() throws IOException { - CompressionOutputStream cout = (CompressionOutputStream) out; - cout.finish(); - cout.flush(); - cout.resetState(); - } - } - - /** - * Returns the classloader to load the Codec class from. - * @return - */ - private static ClassLoader getClassLoaderForCodec() { - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - if (cl == null) { - cl = Compression.class.getClassLoader(); - } - if (cl == null) { - cl = ClassLoader.getSystemClassLoader(); - } - if (cl == null) { - throw new RuntimeException("A ClassLoader to load the Codec could not be determined"); - } - return cl; - } - - /** - * Compression algorithms. The ordinal of these cannot change or else you - * risk breaking all existing HFiles out there. Even the ones that are - * not compressed! (They use the NONE algorithm) - */ - public static enum Algorithm { - LZO("lzo") { - // Use base type to avoid compile-time dependencies. - private transient CompressionCodec lzoCodec; - - @Override - CompressionCodec getCodec(Configuration conf) { - if (lzoCodec == null) { - try { - Class externalCodec = - getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec"); - lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, - new Configuration(conf)); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - return lzoCodec; - } - }, - GZ("gz") { - private transient GzipCodec codec; - - @Override - DefaultCodec getCodec(Configuration conf) { - if (codec == null) { - codec = new ReusableStreamGzipCodec(); - codec.setConf(new Configuration(conf)); - } - - return codec; - } - }, - - NONE("none") { - @Override - DefaultCodec getCodec(Configuration conf) { - return null; - } - - @Override - public synchronized InputStream createDecompressionStream( - InputStream downStream, Decompressor decompressor, - int downStreamBufferSize) throws IOException { - if (downStreamBufferSize > 0) { - return new BufferedInputStream(downStream, downStreamBufferSize); - } - // else { - // Make sure we bypass FSInputChecker buffer. - // return new BufferedInputStream(downStream, 1024); - // } - // } - return downStream; - } - - @Override - public synchronized OutputStream createCompressionStream( - OutputStream downStream, Compressor compressor, - int downStreamBufferSize) throws IOException { - if (downStreamBufferSize > 0) { - return new BufferedOutputStream(downStream, downStreamBufferSize); - } - - return downStream; - } - }, - SNAPPY("snappy") { - // Use base type to avoid compile-time dependencies. - private transient CompressionCodec snappyCodec; - - @Override - CompressionCodec getCodec(Configuration conf) { - if (snappyCodec == null) { - try { - Class externalCodec = - getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec"); - snappyCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, - conf); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - return snappyCodec; - } - }, - LZ4("lz4") { - // Use base type to avoid compile-time dependencies. - private transient CompressionCodec lz4Codec; - - @Override - CompressionCodec getCodec(Configuration conf) { - if (lz4Codec == null) { - try { - Class externalCodec = - getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec"); - lz4Codec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, - conf); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - return lz4Codec; - } - }; - - private final Configuration conf; - private final String compressName; - // data input buffer size to absorb small reads from application. - private static final int DATA_IBUF_SIZE = 1 * 1024; - // data output buffer size to absorb small writes from application. - private static final int DATA_OBUF_SIZE = 4 * 1024; - - Algorithm(String name) { - this.conf = new Configuration(); - this.conf.setBoolean("hadoop.native.lib", true); - this.compressName = name; - } - - abstract CompressionCodec getCodec(Configuration conf); - - public InputStream createDecompressionStream( - InputStream downStream, Decompressor decompressor, - int downStreamBufferSize) throws IOException { - CompressionCodec codec = getCodec(conf); - // Set the internal buffer size to read from down stream. - if (downStreamBufferSize > 0) { - ((Configurable)codec).getConf().setInt("io.file.buffer.size", - downStreamBufferSize); - } - CompressionInputStream cis = - codec.createInputStream(downStream, decompressor); - BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); - return bis2; - - } - - public OutputStream createCompressionStream( - OutputStream downStream, Compressor compressor, int downStreamBufferSize) - throws IOException { - OutputStream bos1 = null; - if (downStreamBufferSize > 0) { - bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); - } - else { - bos1 = downStream; - } - CompressionOutputStream cos = - createPlainCompressionStream(bos1, compressor); - BufferedOutputStream bos2 = - new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), - DATA_OBUF_SIZE); - return bos2; - } - - /** - * Creates a compression stream without any additional wrapping into - * buffering streams. - */ - public CompressionOutputStream createPlainCompressionStream( - OutputStream downStream, Compressor compressor) throws IOException { - CompressionCodec codec = getCodec(conf); - ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); - return codec.createOutputStream(downStream, compressor); - } - - public Compressor getCompressor() { - CompressionCodec codec = getCodec(conf); - if (codec != null) { - Compressor compressor = CodecPool.getCompressor(codec); - if (compressor != null) { - if (compressor.finished()) { - // Somebody returns the compressor to CodecPool but is still using - // it. - LOG - .warn("Compressor obtained from CodecPool is already finished()"); - // throw new AssertionError( - // "Compressor obtained from CodecPool is already finished()"); - } - compressor.reset(); - } - return compressor; - } - return null; - } - - public void returnCompressor(Compressor compressor) { - if (compressor != null) { - CodecPool.returnCompressor(compressor); - } - } - - public Decompressor getDecompressor() { - CompressionCodec codec = getCodec(conf); - if (codec != null) { - Decompressor decompressor = CodecPool.getDecompressor(codec); - if (decompressor != null) { - if (decompressor.finished()) { - // Somebody returns the decompressor to CodecPool but is still using - // it. - LOG - .warn("Deompressor obtained from CodecPool is already finished()"); - // throw new AssertionError( - // "Decompressor obtained from CodecPool is already finished()"); - } - decompressor.reset(); - } - return decompressor; - } - - return null; - } - - public void returnDecompressor(Decompressor decompressor) { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - } - } - - public String getName() { - return compressName; - } - } - - public static Algorithm getCompressionAlgorithmByName(String compressName) { - Algorithm[] algos = Algorithm.class.getEnumConstants(); - - for (Algorithm a : algos) { - if (a.getName().equals(compressName)) { - return a; - } - } - - throw new IllegalArgumentException( - "Unsupported compression algorithm name: " + compressName); - } - - static String[] getSupportedAlgorithms() { - Algorithm[] algos = Algorithm.class.getEnumConstants(); - - String[] ret = new String[algos.length]; - int i = 0; - for (Algorithm a : algos) { - ret[i++] = a.getName(); - } - - return ret; - } - - /** - * Decompresses data from the given stream using the configured compression - * algorithm. It will throw an exception if the dest buffer does not have - * enough space to hold the decompressed data. - * - * @param dest - * the output bytes buffer - * @param destOffset - * start writing position of the output buffer - * @param bufferedBoundedStream - * a stream to read compressed data from, bounded to the exact amount - * of compressed data - * @param compressedSize - * compressed data size, header not included - * @param uncompressedSize - * uncompressed data size, header not included - * @param compressAlgo - * compression algorithm used - * @throws IOException - */ - public static void decompress(byte[] dest, int destOffset, - InputStream bufferedBoundedStream, int compressedSize, - int uncompressedSize, Compression.Algorithm compressAlgo) - throws IOException { - - if (dest.length - destOffset < uncompressedSize) { - throw new IllegalArgumentException( - "Output buffer does not have enough space to hold " - + uncompressedSize + " decompressed bytes, available: " - + (dest.length - destOffset)); - } - - Decompressor decompressor = null; - try { - decompressor = compressAlgo.getDecompressor(); - InputStream is = compressAlgo.createDecompressionStream( - bufferedBoundedStream, decompressor, 0); - - IOUtils.readFully(is, dest, destOffset, uncompressedSize); - is.close(); - } finally { - if (decompressor != null) { - compressAlgo.returnDecompressor(decompressor); - } - } - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index cb1ee66..c93c8d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1728,9 +1728,13 @@ public class HFileBlock extends SchemaConfigured implements Cacheable { // This will allocate a new buffer but keep header bytes. b.allocateBuffer(nextBlockOnDiskSize > 0); if (b.blockType.equals(BlockType.ENCODED_DATA)) { - encodedBlockDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize); + encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), + b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, + hdrSize); } else { - defaultDecodingCtx.prepareDecoding(b, onDiskBlock, hdrSize); + defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), + b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, + hdrSize); } if (nextBlockOnDiskSize > 0) { // Copy next block's header bytes into the new block if we have them. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java deleted file mode 100644 index 7f372c5..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReusableStreamGzipCodec.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.zip.GZIPOutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.CompressorStream; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.io.compress.zlib.ZlibFactory; - -/** - * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression - * streams. - */ -@InterfaceAudience.Private -public class ReusableStreamGzipCodec extends GzipCodec { - - private static final Log LOG = LogFactory.getLog(Compression.class); - - /** - * A bridge that wraps around a DeflaterOutputStream to make it a - * CompressionOutputStream. - */ - protected static class ReusableGzipOutputStream extends CompressorStream { - - private static final int GZIP_HEADER_LENGTH = 10; - - /** - * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for - * details. - */ - private static final byte[] GZIP_HEADER; - - static { - // Capture the fixed ten-byte header hard-coded in GZIPOutputStream. - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] header = null; - GZIPOutputStream gzipStream = null; - try { - gzipStream = new GZIPOutputStream(baos); - gzipStream.finish(); - header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH); - } catch (IOException e) { - throw new RuntimeException("Could not create gzip stream", e); - } finally { - if (gzipStream != null) { - try { - gzipStream.close(); - } catch (IOException e) { - LOG.error(e); - } - } - } - GZIP_HEADER = header; - } - - private static class ResetableGZIPOutputStream extends GZIPOutputStream { - public ResetableGZIPOutputStream(OutputStream out) throws IOException { - super(out); - } - - public void resetState() throws IOException { - def.reset(); - crc.reset(); - out.write(GZIP_HEADER); - } - } - - public ReusableGzipOutputStream(OutputStream out) throws IOException { - super(new ResetableGZIPOutputStream(out)); - } - - @Override - public void close() throws IOException { - out.close(); - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void write(int b) throws IOException { - out.write(b); - } - - @Override - public void write(byte[] data, int offset, int length) throws IOException { - out.write(data, offset, length); - } - - @Override - public void finish() throws IOException { - ((GZIPOutputStream) out).finish(); - } - - @Override - public void resetState() throws IOException { - ((ResetableGZIPOutputStream) out).resetState(); - } - } - - @Override - public CompressionOutputStream createOutputStream(OutputStream out) - throws IOException { - if (ZlibFactory.isNativeZlibLoaded(getConf())) { - return super.createOutputStream(out); - } - return new ReusableGzipOutputStream(out); - } - -}