Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java (revision 1402279) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java (working copy) @@ -106,7 +106,7 @@ DataInputStream dis = new DataInputStream(bais); ByteBuffer actualDataset; DataBlockEncoder encoder = encoding.getEncoder(); - actualDataset = encoder.decodeKeyValues(dis, includesMemstoreTS); + actualDataset = encoder.decodeKeyValues(dis, 0, includesMemstoreTS, dis.available()); dataset.rewind(); actualDataset.rewind(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (revision 1402279) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java (working copy) @@ -115,8 +115,8 @@ dos.writeInt(i / 100); } - static int writeTestKeyValues(OutputStream dos, int seed, boolean includesMemstoreTS) - throws IOException { + static void writeTestKeyValues(Algorithm algo, OutputStream dos, int seed, + boolean includesMemstoreTS) throws IOException { List keyValues = new ArrayList(); Random randomizer = new Random(42l + seed); // just any fixed number @@ -165,20 +165,23 @@ } // sort it and write to stream - int totalSize = 0; Collections.sort(keyValues, KeyValue.COMPARATOR); DataOutputStream dataOutputStream = new DataOutputStream(dos); + HFileBlock.Writer hbw = new HFileBlock.Writer(algo, null, + includesMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE, + HFile.DEFAULT_BYTES_PER_CHECKSUM); + for (KeyValue kv : keyValues) { - totalSize += kv.getLength(); + long memstoreTS = randomizer.nextLong(); + hbw.appendEncodedKV(memstoreTS, kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), + kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + + // Write raw key/value pair for validation. dataOutputStream.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); if (includesMemstoreTS) { - long memstoreTS = randomizer.nextLong(); WritableUtils.writeVLong(dataOutputStream, memstoreTS); - totalSize += WritableUtils.getVIntSize(memstoreTS); } } - - return totalSize; } public byte[] createTestV1Block(Compression.Algorithm algo) @@ -304,9 +307,11 @@ HFile.DEFAULT_BYTES_PER_CHECKSUM); long totalSize = 0; for (int blockId = 0; blockId < 2; ++blockId) { - DataOutputStream dos = hbw.startWriting(BlockType.DATA); - for (int i = 0; i < 1234; ++i) + hbw.startWriting(BlockType.DATA); + DataOutputStream dos = hbw.getUserDataStreamUnsafe(); + for (int i = 0; i < 1234; ++i) { dos.writeInt(i); + } hbw.writeHeaderAndData(os); totalSize += hbw.getOnDiskSizeWithHeader(); } @@ -377,6 +382,9 @@ blockId, includesMemstoreTS, HFileBlock.DUMMY_HEADER); hbw.writeHeaderAndData(os); totalSize += hbw.getOnDiskSizeWithHeader(); + LOG.info("Wrote block #" + blockId + ": " + + "onDiskSizeWithHeader=" + hbw.getOnDiskSizeWithHeader() + ", " + + "uncompressedSizeWithHeader=" + hbw.getUncompressedSizeWithHeader()); } os.close(); @@ -424,7 +432,7 @@ ByteArrayOutputStream baos = new ByteArrayOutputStream(); DoubleOutputStream doubleOutputStream = new DoubleOutputStream(dos, baos); - writeTestKeyValues(doubleOutputStream, blockId, includesMemstoreTS); + writeTestKeyValues(algo, doubleOutputStream, blockId, includesMemstoreTS); ByteBuffer rawBuf = ByteBuffer.wrap(baos.toByteArray()); rawBuf.rewind(); @@ -477,27 +485,16 @@ expectedBuffer.get(prefix) == actualBuffer.get(prefix)) { prefix++; } - - fail(String.format( - "Content mismath for compression %s, encoding %s, " + - "pread %s, commonPrefix %d, expected %s, got %s", + assertEquals(String.format( + "Content mismatch for compression %s, encoding %s, " + + "pread %s, commonPrefix %d, expected length %d, actual length %d", compression, encoding, pread, prefix, - nextBytesToStr(expectedBuffer, prefix), - nextBytesToStr(actualBuffer, prefix))); + expectedBuffer.limit(), actualBuffer.limit()), + Bytes.toStringBinary(expectedBuffer), + Bytes.toStringBinary(actualBuffer)); } } - /** - * Convert a few next bytes in the given buffer at the given position to - * string. Used for error messages. - */ - private static String nextBytesToStr(ByteBuffer buf, int pos) { - int maxBytes = buf.limit() - pos; - int numBytes = Math.min(16, maxBytes); - return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, - numBytes) + (numBytes < maxBytes ? "..." : ""); - } - @Test public void testPreviousOffset() throws IOException { for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { @@ -735,7 +732,8 @@ blockTypeOrdinal = BlockType.DATA.ordinal(); } BlockType bt = BlockType.values()[blockTypeOrdinal]; - DataOutputStream dos = hbw.startWriting(bt); + hbw.startWriting(bt); + DataOutputStream dos = hbw.getUserDataStreamUnsafe(); for (int j = 0; j < rand.nextInt(500); ++j) { // This might compress well. dos.writeShort(i + 1); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestIncrementalEncoding.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestIncrementalEncoding.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestIncrementalEncoding.java (revision 0) @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +public class TestIncrementalEncoding { + + private static final Log LOG = LogFactory.getLog(TestIncrementalEncoding.class); + + private static final int BLOCK_SIZE = 1024; + private static final int KVTYPES = 4; + private static final byte[] FAMILY = Bytes.toBytes("family"); + + public void testEncoding(DataBlockEncoding dataEncoding, boolean includeMemstoreTS, + int kvType) throws IOException { + LOG.info("encoding=" + dataEncoding + ", includeMemstoreTS=" + includeMemstoreTS + ", " + + "kvType=" + kvType); + Compression.Algorithm algo = Compression.Algorithm.GZ; + HFileDataBlockEncoder blockEncoder = new HFileDataBlockEncoderImpl(dataEncoding); + HFileBlock.Writer writerEncoded = new HFileBlock.Writer(algo, blockEncoder, + includeMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE, HFile.DEFAULT_BYTES_PER_CHECKSUM); + HFileBlock.Writer writerUnencoded = new HFileBlock.Writer(algo, + NoOpDataBlockEncoder.INSTANCE, includeMemstoreTS, HFile.DEFAULT_CHECKSUM_TYPE, + HFile.DEFAULT_BYTES_PER_CHECKSUM); + writerEncoded.startWriting(BlockType.DATA); + writerUnencoded.startWriting(BlockType.DATA); + + // Fill block with data + long time = 1 << 10; + while (writerEncoded.blockSizeWritten() < BLOCK_SIZE) { + KeyValue kv; + switch (kvType) { + case 3: + kv = new KeyValue(Bytes.toBytes(time), FAMILY, + Bytes.toBytes(time), time, Bytes.toBytes(time)); + break; + case 2: + kv = new KeyValue(Bytes.toBytes("row"), FAMILY, + Bytes.toBytes("qf" + time), 0, Bytes.toBytes("V")); + break; + case 1: + kv = new KeyValue(Bytes.toBytes("row"), FAMILY, + Bytes.toBytes("qf" + time), time, Bytes.toBytes("V" + time)); + break; + default: + kv = new KeyValue(Bytes.toBytes("row" + time), FAMILY, + Bytes.toBytes("qf"), 0, Bytes.toBytes("Value")); + } + time++; + appendEncoded(kv, writerEncoded); + appendEncoded(kv, writerUnencoded); + } + + ByteArrayOutputStream encoded = new ByteArrayOutputStream(); + writerEncoded.writeHeaderAndData(new FSDataOutputStream(encoded)); + + ByteArrayOutputStream unencoded = new ByteArrayOutputStream(); + writerUnencoded.writeHeaderAndData(new FSDataOutputStream(unencoded)); + + ByteArrayOutputStream encodedAgain = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(encodedAgain); + int bytesToSkip = HFileBlock.HEADER_SIZE; + ByteBuffer unencodedWithoutHeader = ByteBuffer.wrap(unencoded.toByteArray(), bytesToSkip, + unencoded.size() - bytesToSkip).slice(); + HFileBlockEncodingContext encodingCtx = + dataEncoding.getEncoder().newDataBlockEncodingContext(algo, dataEncoding, + HFileBlock.DUMMY_HEADER); + + dataEncoding.getEncoder().encodeKeyValues( + unencodedWithoutHeader, includeMemstoreTS, encodingCtx); + + assertEquals(encodedAgain.size() + HFileBlock.HEADER_SIZE + + dataEncoding.encodingIdSize(), encoded.size()); + + byte[] en = encoded.toByteArray(); + byte[] en2 = encodedAgain.toByteArray(); + int shift = HFileBlock.HEADER_SIZE + dataEncoding.encodingIdSize(); + for (int i = 0; i < encodedAgain.size(); i++) { + assertEquals("Byte" + i, en2[i], en[i + shift]); + } + } + + private void testOneEncodingWithAllKVTypes(DataBlockEncoding blockEncoding, + boolean includeMemstoreTS) throws IOException { + for (int i = 0; i < KVTYPES; i++) { + testEncoding(blockEncoding, includeMemstoreTS, i); + } + } + + @Test + public void testAllEncodings() throws IOException { + for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + for (boolean includeMemstoreTS : HConstants.BOOLEAN_VALUES) { + testOneEncodingWithAllKVTypes(encoding, includeMemstoreTS); + } + } + } + + public void appendEncoded(final KeyValue kv, HFileBlock.Writer writer) + throws IOException { + writer.appendEncodedKV(kv.getMemstoreTS(), kv.getBuffer(), + kv.getKeyOffset(), kv.getKeyLength(), kv.getBuffer(), + kv.getValueOffset(), kv.getValueLength()); + } +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (revision 1402279) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (working copy) @@ -261,10 +261,19 @@ String countByType = blockCountByType.toString(); BlockType cachedDataBlockType = encoderType.encodeInCache ? BlockType.ENCODED_DATA : BlockType.DATA; - assertEquals("{" + cachedDataBlockType - + "=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", - countByType); + // Block size depends on whether encoding on disk has been enabled + // so number of blocks depends on this parameter as well. + if (encoder.getEncodingOnDisk() == DataBlockEncoding.PREFIX) { + assertEquals("{" + cachedDataBlockType + + "=965, LEAF_INDEX=121, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=17}", + countByType); + } else { + assertEquals("{" + cachedDataBlockType + + "=1379, LEAF_INDEX=173, BLOOM_CHUNK=9, INTERMEDIATE_INDEX=24}", + countByType); + } + reader.close(); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java (revision 1402279) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java (working copy) @@ -171,14 +171,8 @@ ByteBuffer val = scanner.getValue(); String keyStr = String.format(localFormatter, Integer.valueOf(i)); String valStr = value + keyStr; - byte [] keyBytes = Bytes.toBytes(key); - assertTrue("bytes for keys do not match " + keyStr + " " + - Bytes.toString(Bytes.toBytes(key)), - Arrays.equals(Bytes.toBytes(keyStr), keyBytes)); - byte [] valBytes = Bytes.toBytes(val); - assertTrue("bytes for vals do not match " + valStr + " " + - Bytes.toString(valBytes), - Arrays.equals(Bytes.toBytes(valStr), valBytes)); + assertEquals("bytes for keys do not match", keyStr, Bytes.toStringBinaryRemaining(key)); + assertEquals("bytes for vals do not match", valStr, Bytes.toStringBinaryRemaining(val)); if (!scanner.next()) { break; } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (revision 1402279) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java (working copy) @@ -222,7 +222,8 @@ new HFileBlockIndex.BlockIndexWriter(hbw, null, null); for (int i = 0; i < NUM_DATA_BLOCKS; ++i) { - hbw.startWriting(BlockType.DATA).write( + hbw.startWriting(BlockType.DATA); + hbw.getUserDataStreamUnsafe().write( String.valueOf(rand.nextInt(1000)).getBytes()); long blockOffset = outputStream.getPos(); hbw.writeHeaderAndData(outputStream); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java (revision 1402279) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java (working copy) @@ -262,23 +262,6 @@ assertFalse(ByteBufferUtils.arePartsEqual(buffer, 0, 3, 6, 3)); } - /** - * Test serializing int to bytes - */ - @Test - public void testPutInt() { - testPutInt(0); - testPutInt(Integer.MAX_VALUE); - - for (int i = 0; i < 3; i++) { - testPutInt((128 << i) - 1); - } - - for (int i = 0; i < 3; i++) { - testPutInt((128 << i)); - } - } - // Utility methods invoked from test methods private void testCompressedInt(int value) throws IOException { @@ -294,20 +277,4 @@ assertEquals(value, parsedValue); } - private void testPutInt(int value) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - ByteBufferUtils.putInt(baos, value); - } catch (IOException e) { - throw new RuntimeException("Bug in putIn()", e); - } - - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dis = new DataInputStream(bais); - try { - assertEquals(dis.readInt(), value); - } catch (IOException e) { - throw new RuntimeException("Bug in test!", e); - } - } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java (working copy) @@ -124,16 +124,24 @@ } /** - * Put in output stream 32 bit integer (Big Endian byte order). + * Write a big-endian integer at a specific position in the output array * @param out Where to put integer. + * @param offset Offset in array. + * @param length Length of the part of the array we are allowed to write into, starting with the + * specified offset. * @param value Value of integer. - * @throws IOException On stream error. + * @throws IOException When insufficient size to write. */ - public static void putInt(OutputStream out, final int value) - throws IOException { - for (int i = Bytes.SIZEOF_INT - 1; i >= 0; --i) { - out.write((byte) (value >>> (i * 8))); + public static void putInt(byte[] out, final int offset, final int length, + final int v) throws IOException { + if (length < Bytes.SIZEOF_INT) { + throw new IOException("Not enough space to write an int: offset=" + offset + ", length=" + + length); } + out[offset] = (byte) ((v >>> 24) & 0xFF); + out[offset + 1] = (byte) ((v >>> 16) & 0xFF); + out[offset + 2] = (byte) ((v >>> 8) & 0xFF); + out[offset + 3] = (byte) (v & 0xFF); } /** @@ -160,8 +168,13 @@ public static void copyBufferToStream(OutputStream out, ByteBuffer in, int offset, int length) throws IOException { if (in.hasArray()) { - out.write(in.array(), in.arrayOffset() + offset, - length); + try { + out.write(in.array(), in.arrayOffset() + offset, + length); + } catch (IndexOutOfBoundsException ex) { + throw new IOException("Array out of bounds: arrayOffset=" + in.arrayOffset() + ", " + + "offset=" + offset + ", length=" + in.array().length, ex); + } } else { for (int i = 0; i < length; ++i) { out.write(in.get(offset + i)); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (working copy) @@ -19,7 +19,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; @@ -34,7 +33,7 @@ * Compress using: * - store size of common prefix * - save column family once in the first KeyValue - * - use integer compression for key, value and prefix (7-bit encoding) + * - use integer compression for key, value and prefix lengths (7-bit encoding) * - use bits to avoid duplication key length, value length * and type if it same as previous * - store in 3 bits length of prefix timestamp @@ -55,14 +54,14 @@ */ @InterfaceAudience.Private public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { - final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2); - final int SHIFT_TIMESTAMP_LENGTH = 0; - final int FLAG_SAME_KEY_LENGTH = 1 << 3; - final int FLAG_SAME_VALUE_LENGTH = 1 << 4; - final int FLAG_SAME_TYPE = 1 << 5; - final int FLAG_SAME_VALUE = 1 << 6; + final static int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2); + final static int SHIFT_TIMESTAMP_LENGTH = 0; + final static int FLAG_SAME_KEY_LENGTH = 1 << 3; + final static int FLAG_SAME_VALUE_LENGTH = 1 << 4; + final static int FLAG_SAME_TYPE = 1 << 5; + final static int FLAG_SAME_VALUE = 1 << 6; - private static class FastDiffCompressionState extends CompressionState { + private static class FastDiffEncodingState extends EncodingState { byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE]; int prevTimestampOffset; @@ -72,9 +71,9 @@ } @Override - void copyFrom(CompressionState state) { + void copyFrom(EncodingState state) { super.copyFrom(state); - FastDiffCompressionState state2 = (FastDiffCompressionState) state; + FastDiffEncodingState state2 = (FastDiffEncodingState) state; System.arraycopy(state2.timestamp, 0, timestamp, 0, KeyValue.TIMESTAMP_SIZE); prevTimestampOffset = state2.prevTimestampOffset; @@ -84,7 +83,7 @@ * Copies the first key/value from the given stream, and initializes * decompression state based on it. Assumes that we have already read key * and value lengths. Does not set {@link #qualifierLength} (not used by - * decompression) or {@link #prevOffset} (set by the calle afterwards). + * decompression) or {@link #keyOffset} (set by the caller afterwards). */ private void decompressFirstKV(ByteBuffer out, DataInputStream in) throws IOException { @@ -102,111 +101,8 @@ } - private void compressSingleKeyValue( - FastDiffCompressionState previousState, - FastDiffCompressionState currentState, - OutputStream out, ByteBuffer in) throws IOException { - currentState.prevOffset = in.position(); - int keyLength = in.getInt(); - int valueOffset = - currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET; - int valueLength = in.getInt(); - byte flag = 0; - - if (previousState.isFirst()) { - // copy the key, there is no common prefix with none - out.write(flag); - ByteBufferUtils.putCompressedInt(out, keyLength); - ByteBufferUtils.putCompressedInt(out, valueLength); - ByteBufferUtils.putCompressedInt(out, 0); - - currentState.readKey(in, keyLength, valueLength); - - ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength); - } else { - // find a common prefix and skip it - int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(), - previousState.prevOffset + KeyValue.ROW_OFFSET, - Math.min(keyLength, previousState.keyLength) - - KeyValue.TIMESTAMP_TYPE_SIZE); - - currentState.readKey(in, keyLength, valueLength, - commonPrefix, previousState); - - if (keyLength == previousState.keyLength) { - flag |= FLAG_SAME_KEY_LENGTH; - } - if (valueLength == previousState.valueLength) { - flag |= FLAG_SAME_VALUE_LENGTH; - } - if (currentState.type == previousState.type) { - flag |= FLAG_SAME_TYPE; - } - - int commonTimestampPrefix = findCommonTimestampPrefix( - currentState, previousState); - flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH; - - // Check if current and previous values are the same. Compare value - // length first as an optimization. - if (valueLength == previousState.valueLength) { - int previousValueOffset = previousState.prevOffset - + previousState.keyLength + KeyValue.ROW_OFFSET; - if (ByteBufferUtils.arePartsEqual(in, - previousValueOffset, previousState.valueLength, - valueOffset, valueLength)) { - flag |= FLAG_SAME_VALUE; - } - } - - out.write(flag); - if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { - ByteBufferUtils.putCompressedInt(out, keyLength); - } - if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { - ByteBufferUtils.putCompressedInt(out, valueLength); - } - ByteBufferUtils.putCompressedInt(out, commonPrefix); - - ByteBufferUtils.skip(in, commonPrefix); - if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) { - // Previous and current rows are different. Copy the differing part of - // the row, skip the column family, and copy the qualifier. - ByteBufferUtils.moveBufferToStream(out, in, - currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix); - ByteBufferUtils.skip(in, currentState.familyLength + - KeyValue.FAMILY_LENGTH_SIZE); - ByteBufferUtils.moveBufferToStream(out, in, - currentState.qualifierLength); - } else { - // The common part includes the whole row. As the column family is the - // same across the whole file, it will automatically be included in the - // common prefix, so we need not special-case it here. - int restKeyLength = keyLength - commonPrefix - - KeyValue.TIMESTAMP_TYPE_SIZE; - ByteBufferUtils.moveBufferToStream(out, in, restKeyLength); - } - ByteBufferUtils.skip(in, commonTimestampPrefix); - ByteBufferUtils.moveBufferToStream(out, in, - KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix); - - // Write the type if it is not the same as before. - if ((flag & FLAG_SAME_TYPE) == 0) { - out.write(currentState.type); - } - - // Write the value if it is not the same as before. - if ((flag & FLAG_SAME_VALUE) == 0) { - ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength); - } - - // Skip key type and value in the input buffer. - ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength); - } - } - - private int findCommonTimestampPrefix(FastDiffCompressionState left, - FastDiffCompressionState right) { + private static int findCommonTimestampPrefix(FastDiffEncodingState left, + FastDiffEncodingState right) { int prefixTimestamp = 0; while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) && left.timestamp[prefixTimestamp] @@ -217,7 +113,7 @@ } private void uncompressSingleKeyValue(DataInputStream source, - ByteBuffer out, FastDiffCompressionState state) + ByteBuffer out, FastDiffEncodingState state) throws IOException, EncoderBufferTooSmallException { byte flag = source.readByte(); int prevKeyLength = state.keyLength; @@ -243,15 +139,15 @@ if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { out.putInt(state.keyLength); out.putInt(state.valueLength); - prevOffset = state.prevOffset + KeyValue.ROW_OFFSET; + prevOffset = state.keyOffset + KeyValue.ROW_OFFSET; common = commonLength; } else { if ((flag & FLAG_SAME_KEY_LENGTH) != 0) { - prevOffset = state.prevOffset; + prevOffset = state.keyOffset; common = commonLength + KeyValue.ROW_OFFSET; } else { out.putInt(state.keyLength); - prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE; + prevOffset = state.keyOffset + KeyValue.KEY_LENGTH_SIZE; common = commonLength + KeyValue.KEY_LENGTH_SIZE; } } @@ -286,7 +182,7 @@ // copy the column family ByteBufferUtils.copyFromBufferToBuffer(out, out, - state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE + state.keyOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE + state.rowLength, state.familyLength + KeyValue.FAMILY_LENGTH_SIZE); state.rowLength = (short) (rowWithSizeLength - @@ -316,7 +212,7 @@ if ((flag & FLAG_SAME_TYPE) != 0) { out.put(state.type); if ((flag & FLAG_SAME_VALUE) != 0) { - ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset + + ByteBufferUtils.copyFromBufferToBuffer(out, out, state.keyOffset + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); } else { ByteBufferUtils.copyFromStreamToBuffer(out, source, @@ -326,7 +222,7 @@ if ((flag & FLAG_SAME_VALUE) != 0) { ByteBufferUtils.copyFromStreamToBuffer(out, source, KeyValue.TYPE_SIZE); - ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset + + ByteBufferUtils.copyFromBufferToBuffer(out, out, state.keyOffset + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength); } else { ByteBufferUtils.copyFromStreamToBuffer(out, source, @@ -339,44 +235,31 @@ state.decompressFirstKV(out, source); } - state.prevOffset = kvPos; + state.keyOffset = kvPos; } @Override public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in, boolean includesMemstoreTS) throws IOException { - in.rewind(); - ByteBufferUtils.putInt(out, in.limit()); - FastDiffCompressionState previousState = new FastDiffCompressionState(); - FastDiffCompressionState currentState = new FastDiffCompressionState(); - while (in.hasRemaining()) { - compressSingleKeyValue(previousState, currentState, - out, in); - afterEncodingKeyValue(in, out, includesMemstoreTS); - - // swap previousState <-> currentState - FastDiffCompressionState tmp = previousState; - previousState = currentState; - currentState = tmp; - } + encodeKeyValues(out, in, includesMemstoreTS); } @Override public ByteBuffer decodeKeyValues(DataInputStream source, - int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + int allocHeaderLength, boolean includesMemstoreTS, int totalEncodedSize) throws IOException { + int skipLastBytes = source.available() - totalEncodedSize; int decompressedSize = source.readInt(); - ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + - allocHeaderLength); + ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocHeaderLength); buffer.position(allocHeaderLength); - FastDiffCompressionState state = new FastDiffCompressionState(); + FastDiffEncodingState state = new FastDiffEncodingState(); while (source.available() > skipLastBytes) { uncompressSingleKeyValue(source, buffer, state); afterDecodingKeyValue(source, buffer, includesMemstoreTS); } if (source.available() != skipLastBytes) { - throw new IllegalStateException("Read too much bytes."); + throw new IllegalStateException("Read too many bytes."); } return buffer; @@ -395,10 +278,118 @@ } @Override - public String toString() { - return FastDiffDeltaEncoder.class.getSimpleName(); + public FastDiffDeltaEncoderWriter createWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + return new FastDiffDeltaEncoderWriter(out, includesMemstoreTS); } + /** + * A writer that incrementally performs Fast Diff Delta Encoding + */ + private static class FastDiffDeltaEncoderWriter + extends BufferedEncodedWriter { + public FastDiffDeltaEncoderWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + super(out, includesMemstoreTS); + } + + @Override + FastDiffEncodingState createState() { + return new FastDiffEncodingState(); + } + + @Override + protected void updateInitial(byte[] key, int keyOffset, int keyLength, + byte[] value, int valueOffset, int valueLength) throws IOException { + byte flag = 0; + ByteBuffer keyBuffer = ByteBuffer.wrap(key, keyOffset, keyLength); + + if (this.prevState == null) { + // The first element in the stream + out.write(flag); + ByteBufferUtils.putCompressedInt(out, keyLength); + ByteBufferUtils.putCompressedInt(out, valueLength); + ByteBufferUtils.putCompressedInt(out, 0); + out.write(key, keyOffset, keyLength); + out.write(value, valueOffset, valueLength); + + // Initialize the compression state + currentState.readKey(keyBuffer, keyLength, valueLength); + } else { + // Find the common prefix to skip + int commonPrefix = getCommonPrefixLength(key, keyOffset, keyLength - + KeyValue.TIMESTAMP_TYPE_SIZE, this.prevState.key, + this.prevState.keyOffset, this.prevState.keyLength - + KeyValue.TIMESTAMP_TYPE_SIZE); + + currentState.readKey(keyBuffer, keyLength, valueLength, commonPrefix, + this.prevState); + + if (keyLength == this.prevState.keyLength) { + flag |= FLAG_SAME_KEY_LENGTH; + } + if (valueLength == this.prevState.valueLength) { + flag |= FLAG_SAME_VALUE_LENGTH; + } + if (currentState.type == this.prevState.type) { + flag |= FLAG_SAME_TYPE; + } + + int commonTimestampPrefix = findCommonTimestampPrefix(currentState, this.prevState); + flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH; + + // Check if current and previous values are the same. Compare value + // length first as an optimization. + if (valueLength == this.prevState.valueLength) { + if (valueLength == getCommonPrefixLength(value, valueOffset, valueLength, + this.prevState.value, this.prevState.valueLength, + this.prevState.valueLength)) { + // if common prefix consists of whole value length + flag |= FLAG_SAME_VALUE; + } + } + + out.write(flag); + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(out, keyLength); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(out, valueLength); + } + ByteBufferUtils.putCompressedInt(out, commonPrefix); + + if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) { + // Previous and current rows are different. Copy the differing part + // of the row, skip the column family, and copy the qualifier. + out.write(key, keyOffset + commonPrefix, currentState.rowLength + + KeyValue.ROW_LENGTH_SIZE - commonPrefix); + out.write(key, keyOffset + currentState.familyLength + + KeyValue.FAMILY_LENGTH_SIZE + currentState.rowLength + + KeyValue.ROW_LENGTH_SIZE, currentState.qualifierLength); + } else { + // The common part includes the whole row. As the column family is + // the same across the whole file, it will automatically be included + // in the common prefix, so we need not special-case it here. + out.write(key, keyOffset + commonPrefix, keyLength - commonPrefix - + KeyValue.TIMESTAMP_TYPE_SIZE); + } + out.write(key, keyOffset + keyLength - (KeyValue.TIMESTAMP_TYPE_SIZE - + commonTimestampPrefix), KeyValue.TIMESTAMP_SIZE - + commonTimestampPrefix); + + // Write the type if it is not the same as before. + if ((flag & FLAG_SAME_TYPE) == 0) { + out.write(currentState.type); + } + + // Write the value if it is not the same as before. + if ((flag & FLAG_SAME_VALUE) == 0) { + out.write(value, valueOffset, valueLength); + } + } + } + } + protected static class FastDiffSeekerState extends SeekerState { private byte[] prevTimestampAndType = new byte[KeyValue.TIMESTAMP_TYPE_SIZE]; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (working copy) @@ -16,14 +16,10 @@ */ package org.apache.hadoop.hbase.io.encoding; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.FilterOutputStream; import java.io.IOException; -import java.lang.reflect.Field; import java.nio.ByteBuffer; -import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.KeyValue; @@ -48,59 +44,19 @@ @InterfaceAudience.Private public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { - private int addKV(int prevKeyOffset, DataOutputStream out, - ByteBuffer in, int prevKeyLength) throws IOException { - int keyLength = in.getInt(); - int valueLength = in.getInt(); - - if (prevKeyOffset == -1) { - // copy the key, there is no common prefix with none - ByteBufferUtils.putCompressedInt(out, keyLength); - ByteBufferUtils.putCompressedInt(out, valueLength); - ByteBufferUtils.putCompressedInt(out, 0); - ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength); - } else { - // find a common prefix and skip it - int common = ByteBufferUtils.findCommonPrefix( - in, prevKeyOffset + KeyValue.ROW_OFFSET, - in.position(), - Math.min(prevKeyLength, keyLength)); - - ByteBufferUtils.putCompressedInt(out, keyLength - common); - ByteBufferUtils.putCompressedInt(out, valueLength); - ByteBufferUtils.putCompressedInt(out, common); - - ByteBufferUtils.skip(in, common); - ByteBufferUtils.moveBufferToStream(out, in, keyLength - common - + valueLength); - } - - return keyLength; - } - @Override public void internalEncodeKeyValues(DataOutputStream writeHere, ByteBuffer in, boolean includesMemstoreTS) throws IOException { - in.rewind(); - ByteBufferUtils.putInt(writeHere, in.limit()); - int prevOffset = -1; - int offset = 0; - int keyLength = 0; - while (in.hasRemaining()) { - offset = in.position(); - keyLength = addKV(prevOffset, writeHere, in, keyLength); - afterEncodingKeyValue(in, writeHere, includesMemstoreTS); - prevOffset = offset; - } + encodeKeyValues(writeHere, in, includesMemstoreTS); } @Override public ByteBuffer decodeKeyValues(DataInputStream source, - int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + int allocHeaderLength, boolean includesMemstoreTS, int totalEncodedSize) throws IOException { + int skipLastBytes = source.available() - totalEncodedSize; int decompressedSize = source.readInt(); - ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + - allocHeaderLength); + ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocHeaderLength); buffer.position(allocHeaderLength); int prevKeyOffset = 0; @@ -110,16 +66,15 @@ } if (source.available() != skipLastBytes) { - throw new IllegalStateException("Read too many bytes."); + throw new IOException("Read too many bytes"); } buffer.limit(buffer.position()); return buffer; } - private int decodeKeyValue(DataInputStream source, ByteBuffer buffer, - int prevKeyOffset) - throws IOException, EncoderBufferTooSmallException { + private int decodeKeyValue(DataInputStream source, ByteBuffer buffer, int prevKeyOffset) + throws IOException, EncoderBufferTooSmallException { int keyLength = ByteBufferUtils.readCompressedInt(source); int valueLength = ByteBufferUtils.readCompressedInt(source); int commonLength = ByteBufferUtils.readCompressedInt(source); @@ -164,10 +119,40 @@ } @Override - public String toString() { - return PrefixKeyDeltaEncoder.class.getSimpleName(); + public PrefixKeyDeltaEncoderWriter createWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + return new PrefixKeyDeltaEncoderWriter(out, includesMemstoreTS); } + /** + * A writer that incrementally performs Prefix Key Delta Encoding + */ + private static class PrefixKeyDeltaEncoderWriter extends BufferedEncodedWriter { + public PrefixKeyDeltaEncoderWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + super(out, includesMemstoreTS); + } + + @Override + EncodingState createState() { + return new EncodingState(); + } + + @Override + protected void updateInitial(byte[] key, int keyOffset, int keyLength, byte[] value, + int valueOffset, int valueLength) throws IOException { + int common = prevState == null ? 0 : getCommonPrefixLength(key, keyOffset, keyLength, + this.prevState.key, this.prevState.keyOffset, this.prevState.keyLength); + + ByteBufferUtils.putCompressedInt(this.out, keyLength - common); + ByteBufferUtils.putCompressedInt(this.out, valueLength); + ByteBufferUtils.putCompressedInt(this.out, common); + + this.out.write(key, keyOffset + common, keyLength - common); + this.out.write(value, valueOffset, valueLength); + } + } + @Override public EncodedSeeker createSeeker(RawComparator comparator, final boolean includesMemstoreTS) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (working copy) @@ -97,7 +97,7 @@ if (decompressedData == null) { try { decompressedData = dataBlockEncoder.decodeKeyValues( - dis, includesMemstoreTS); + dis, 0, includesMemstoreTS, dis.available()); } catch (IOException e) { throw new RuntimeException("Problem with data block encoder, " + "most likely it requested more bytes than are available.", e); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (working copy) @@ -31,6 +31,8 @@ import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableUtils; +import com.google.common.base.Preconditions; + /** * Base class for all data block encoders that use a buffer. */ @@ -39,12 +41,153 @@ private static int INITIAL_KEY_BUFFER_SIZE = 512; - @Override - public ByteBuffer decodeKeyValues(DataInputStream source, - boolean includesMemstoreTS) throws IOException { - return decodeKeyValues(source, 0, 0, includesMemstoreTS); + protected static int getCommonPrefixLength(byte[] a, int aOffset, int aLength, + byte[] b, int bOffset, int bLength) { + if (a == null || b == null) { + return 0; + } + + int common = 0; + int minLength = Math.min(aLength, bLength); + while (common < minLength && a[aOffset + common] == b[bOffset + common]) { + common++; + } + return common; } + public void encodeKeyValues(DataOutputStream out, + ByteBuffer in, boolean includesMemstoreTS) throws IOException { + in.rewind(); + if (shouldWriteUnencodedLength()) { + out.writeInt(in.remaining()); + } + BufferedEncodedWriter writer = createWriter(out, includesMemstoreTS); + byte[] inputBytes = in.array(); + int inputOffset = in.arrayOffset(); + + while (in.hasRemaining()) { + int keyLength = in.getInt(); + int valueLength = in.getInt(); + int inputPos = in.position(); // This is the buffer position after key/value length. + int keyOffset = inputOffset + inputPos; + int valueOffset = keyOffset + keyLength; + + writer.updateInitial(inputBytes, keyOffset, keyLength, inputBytes, valueOffset, valueLength); + in.position(inputPos + keyLength + valueLength); + + long memstoreTS = 0; + if (includesMemstoreTS) { + memstoreTS = ByteBufferUtils.readVLong(in); + } + writer.finishAddingKeyValue(memstoreTS, inputBytes, keyOffset, keyLength, inputBytes, + valueOffset, valueLength); + } + } + + abstract static class BufferedEncodedWriter + implements EncodedWriter { + + protected int unencodedLength; + protected final DataOutputStream out; + protected final boolean includesMemstoreTS; + + protected STATE currentState; + protected STATE prevState; + + /** + * Starts updating the writer with a new key/value pair. Unlike {@link #update(long, byte[], + * int, int, byte[], int, int)}, does not take a memstore timestamp. + */ + protected abstract void updateInitial(final byte[] key, + final int keyOffset, final int keyLength, final byte[] value, + final int valueOffset, final int valueLength) throws IOException; + + @Override + public void update(final long memstoreTS, final byte[] key, + final int keyOffset, final int keyLength, final byte[] value, + final int valueOffset, final int valueLength) throws IOException { + updateInitial(key, keyOffset, keyLength, value, valueOffset, valueLength); + finishAddingKeyValue(memstoreTS, key, keyOffset, keyLength, value, valueOffset, valueLength); + } + + protected void finishAddingKeyValue(long memstoreTS, byte[] key, int keyOffset, int keyLength, + byte[] value, int valueOffset, int valueLength) throws IOException { + if (this.includesMemstoreTS) { + WritableUtils.writeVLong(this.out, memstoreTS); + unencodedLength += WritableUtils.getVIntSize(memstoreTS); + } + this.unencodedLength += KeyValue.getKVSize(keyLength, valueLength); + + // Encoder state management. + // state may be null for the no-op encoder. + if (currentState != null) { + if (prevState == null) { + // Initially prevState is null, let us initialize it. + prevState = currentState; + currentState = createState(); + } else { + // Both previous and current states exist. Swap them. + STATE tmp = currentState; + currentState = prevState; + prevState = tmp; + } + + prevState.key = key; + prevState.value = value; + prevState.keyOffset = keyOffset; + prevState.valueOffset = valueOffset; + prevState.keyLength = keyLength; + prevState.valueLength = valueLength; + } + } + + /** + * Forces writer to move all encoded data to a single stream and for all + * key/value pairs to agree on including the memstore timestamp + */ + public BufferedEncodedWriter(DataOutputStream out, boolean includesMemstoreTS) + throws IOException { + Preconditions.checkNotNull(out); + this.unencodedLength = 0; + this.out = out; + this.includesMemstoreTS = includesMemstoreTS; + currentState = createState(); + if (currentState == null && + (Class) getClass() != CopyKeyDataBlockEncoder.UnencodedWriter.class) { + throw new NullPointerException("Encoder state is null for " + getClass().getName()); + } + } + + /** + * Reserves space for necessary metadata, such as unencoded size, in the encoded block. Called + * before we start writing encoded data. + */ + @Override + public void reserveMetadataSpace() throws IOException { + out.writeInt(0); + } + + /** + * @param data a byte array containing encoded data + * @param offset offset of encoded data in the given array + * @param length length of encoded data in the given array + * @return true if the resulting block is stored in the "encoded block" format + */ + @Override + public boolean finishEncoding(byte[] data, final int offset, final int length) + throws IOException { + // Add unencoded length to front of array. This only happens for encoded blocks. + ByteBufferUtils.putInt(data, offset, length, this.unencodedLength); + return true; + } + + abstract STATE createState(); + } + + @Override + public abstract BufferedEncodedWriter createWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException; + protected static class SeekerState { protected int valueOffset = -1; protected int keyLength; @@ -352,4 +495,14 @@ } } + /** Whether unencoded data length should be stored in the beginning of an encoded block */ + boolean shouldWriteUnencodedLength() { + return true; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java (revision 0) @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.encoding; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +/** + * Stores the state of data block encoder at the beginning of new key. + */ +class EncodingState { + byte[] key; + byte[] value; + + int keyLength; + int valueLength; + int valueOffset = 0; + + short rowLength; + int keyOffset = FIRST_KEY; + byte familyLength; + int qualifierLength; + byte type; + + private final static int FIRST_KEY = -1; + + boolean isFirst() { + return keyOffset == FIRST_KEY; + } + + /** + * Analyze the key and fill the state. + * Uses mark() and reset() in ByteBuffer. + * @param in Buffer at the position where key starts + * @param keyLength Length of key in bytes + * @param valueLength Length of values in bytes + */ + void readKey(ByteBuffer in, int keyLength, int valueLength) { + readKey(in, keyLength, valueLength, 0, null); + } + + /** + * Analyze the key and fill the state assuming we know previous state. + * Uses mark() and reset() in ByteBuffer to avoid moving the position. + *

+ * This method overrides all the fields of this instance, except + * {@link #keyOffset}, which is usually manipulated directly by encoders + * and decoders. + * @param in Buffer at the position where key starts + * @param keyLength Length of key in bytes + * @param valueLength Length of values in bytes + * @param commonPrefix how many first bytes are common with previous KeyValue + * @param previousState State from previous KeyValue + */ + void readKey(ByteBuffer in, int keyLength, int valueLength, + int commonPrefix, EncodingState previousState) { + this.keyLength = keyLength; + this.valueLength = valueLength; + + // fill the state + in.mark(); // mark beginning of key + + if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) { + rowLength = in.getShort(); + ByteBufferUtils.skip(in, rowLength); + + familyLength = in.get(); + + qualifierLength = keyLength - rowLength - familyLength - + KeyValue.KEY_INFRASTRUCTURE_SIZE; + ByteBufferUtils.skip(in, familyLength + qualifierLength); + } else { + rowLength = previousState.rowLength; + familyLength = previousState.familyLength; + qualifierLength = previousState.qualifierLength + + keyLength - previousState.keyLength; + ByteBufferUtils.skip(in, (KeyValue.ROW_LENGTH_SIZE + + KeyValue.FAMILY_LENGTH_SIZE) + + rowLength + familyLength + qualifierLength); + } + + readTimestamp(in); + + type = in.get(); + + in.reset(); + } + + protected void readTimestamp(ByteBuffer in) { + // used in subclasses to add timestamp to state + ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_SIZE); + } + + void copyFrom(EncodingState state) { + keyLength = state.keyLength; + valueLength = state.valueLength; + + rowLength = state.rowLength; + keyOffset = state.keyOffset; + familyLength = state.familyLength; + qualifierLength = state.qualifierLength; + type = state.type; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (working copy) @@ -27,8 +27,10 @@ import org.apache.hadoop.io.RawComparator; /** - * Just copy data, do not do any kind of compression. Use for comparison and - * benchmarking. + * This "encoder" implementation is used for the case when encoding is turned + * off. Unencoded block headers are exactly the same as they were before the + * data block encoding feature was introduced (no special fields for unencoded + * size or encoding type). */ @InterfaceAudience.Private public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { @@ -36,28 +38,27 @@ public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in, boolean includesMemstoreTS) throws IOException { in.rewind(); - ByteBufferUtils.putInt(out, in.limit()); - ByteBufferUtils.moveBufferToStream(out, in, in.limit()); + ByteBufferUtils.moveBufferToStream(out, in, in.remaining()); } @Override public ByteBuffer decodeKeyValues(DataInputStream source, - int preserveHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + int preserveHeaderLength, boolean includesMemstoreTS, int totalEncodedSize) throws IOException { - int decompressedSize = source.readInt(); - ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + + // Encoded size and decoded size are the same here + ByteBuffer buffer = ByteBuffer.allocate(totalEncodedSize + preserveHeaderLength); buffer.position(preserveHeaderLength); - ByteBufferUtils.copyFromStreamToBuffer(buffer, source, decompressedSize); + ByteBufferUtils.copyFromStreamToBuffer(buffer, source, totalEncodedSize); return buffer; } @Override public ByteBuffer getFirstKeyInBlock(ByteBuffer block) { - int keyLength = block.getInt(Bytes.SIZEOF_INT); + int keyLength = block.getInt(0); return ByteBuffer.wrap(block.array(), - block.arrayOffset() + 3 * Bytes.SIZEOF_INT, keyLength).slice(); + block.arrayOffset() + 2 * Bytes.SIZEOF_INT, keyLength).slice(); } @@ -67,6 +68,45 @@ } @Override + public UnencodedWriter createWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + return new UnencodedWriter(out, includesMemstoreTS); + } + + static class UnencodedWriter extends BufferedEncodedWriter { + public UnencodedWriter(DataOutputStream out, boolean includesMemstoreTS) + throws IOException { + super(out, includesMemstoreTS); + } + + @Override + public void updateInitial(final byte[] key, + final int keyOffset, final int keyLength, final byte[] value, + final int valueOffset, final int valueLength) throws IOException { + // Write out the unencoded data + out.writeInt(keyLength); + out.writeInt(valueLength); + this.out.write(key, keyOffset, keyLength); + this.out.write(value, valueOffset, valueLength); + } + + @Override + public void reserveMetadataSpace() { + // We are not storing unencoded block length for unencoded blocks. + } + + @Override + public boolean finishEncoding(byte[] data, int offset, int length) throws IOException { + return false; // no encoding + } + + @Override + EncodingState createState() { + return null; + } + } + + @Override public EncodedSeeker createSeeker(RawComparator comparator, final boolean includesMemstoreTS) { return new BufferedEncodedSeeker(comparator) { @@ -88,11 +128,15 @@ @Override protected void decodeFirst() { - ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); current.lastCommonPrefix = 0; decodeNext(); } }; } + @Override + boolean shouldWriteUnencodedLength() { + // We don't store unencoded length for unencoded blocks. + return false; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java (working copy) @@ -27,11 +27,13 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.RawComparator; +import com.google.common.base.Preconditions; + /** * Compress using: * - store size of common prefix * - save column family once, it is same within HFile - * - use integer compression for key, value and prefix (7-bit encoding) + * - use integer compression for key, value and prefix lengths (7-bit encoding) * - use bits to avoid duplication key length, value length * and type if it same as previous * - store in 3 bits length of timestamp field @@ -58,7 +60,7 @@ static final int SHIFT_TIMESTAMP_LENGTH = 4; static final int FLAG_TIMESTAMP_SIGN = 1 << 7; - protected static class DiffCompressionState extends CompressionState { + protected static class DiffEncodingState extends EncodingState { long timestamp; byte[] familyNameWithSize; @@ -68,141 +70,16 @@ } @Override - void copyFrom(CompressionState state) { + void copyFrom(EncodingState state) { super.copyFrom(state); - DiffCompressionState state2 = (DiffCompressionState) state; + DiffEncodingState state2 = (DiffEncodingState) state; timestamp = state2.timestamp; } } - private void compressSingleKeyValue(DiffCompressionState previousState, - DiffCompressionState currentState, DataOutputStream out, - ByteBuffer in) throws IOException { - byte flag = 0; - int kvPos = in.position(); - int keyLength = in.getInt(); - int valueLength = in.getInt(); - - long timestamp; - long diffTimestamp = 0; - int diffTimestampFitsInBytes = 0; - - int commonPrefix; - - int timestampFitsInBytes; - - if (previousState.isFirst()) { - currentState.readKey(in, keyLength, valueLength); - currentState.prevOffset = kvPos; - timestamp = currentState.timestamp; - if (timestamp < 0) { - flag |= FLAG_TIMESTAMP_SIGN; - timestamp = -timestamp; - } - timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); - - flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; - commonPrefix = 0; - - // put column family - in.mark(); - ByteBufferUtils.skip(in, currentState.rowLength - + KeyValue.ROW_LENGTH_SIZE); - ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength - + KeyValue.FAMILY_LENGTH_SIZE); - in.reset(); - } else { - // find a common prefix and skip it - commonPrefix = - ByteBufferUtils.findCommonPrefix(in, in.position(), - previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength - - KeyValue.TIMESTAMP_TYPE_SIZE); - // don't compress timestamp and type using prefix - - currentState.readKey(in, keyLength, valueLength, - commonPrefix, previousState); - currentState.prevOffset = kvPos; - timestamp = currentState.timestamp; - boolean negativeTimestamp = timestamp < 0; - if (negativeTimestamp) { - timestamp = -timestamp; - } - timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); - - if (keyLength == previousState.keyLength) { - flag |= FLAG_SAME_KEY_LENGTH; - } - if (valueLength == previousState.valueLength) { - flag |= FLAG_SAME_VALUE_LENGTH; - } - if (currentState.type == previousState.type) { - flag |= FLAG_SAME_TYPE; - } - - // encode timestamp - diffTimestamp = previousState.timestamp - currentState.timestamp; - boolean minusDiffTimestamp = diffTimestamp < 0; - if (minusDiffTimestamp) { - diffTimestamp = -diffTimestamp; - } - diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp); - if (diffTimestampFitsInBytes < timestampFitsInBytes) { - flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; - flag |= FLAG_TIMESTAMP_IS_DIFF; - if (minusDiffTimestamp) { - flag |= FLAG_TIMESTAMP_SIGN; - } - } else { - flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; - if (negativeTimestamp) { - flag |= FLAG_TIMESTAMP_SIGN; - } - } - } - - out.write(flag); - - if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { - ByteBufferUtils.putCompressedInt(out, keyLength); - } - if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { - ByteBufferUtils.putCompressedInt(out, valueLength); - } - - ByteBufferUtils.putCompressedInt(out, commonPrefix); - ByteBufferUtils.skip(in, commonPrefix); - - if (previousState.isFirst() || - commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) { - int restRowLength = - currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix; - ByteBufferUtils.moveBufferToStream(out, in, restRowLength); - ByteBufferUtils.skip(in, currentState.familyLength + - KeyValue.FAMILY_LENGTH_SIZE); - ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength); - } else { - ByteBufferUtils.moveBufferToStream(out, in, - keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE); - } - - if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { - ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes); - } else { - ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes); - } - - if ((flag & FLAG_SAME_TYPE) == 0) { - out.write(currentState.type); - } - ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE); - - ByteBufferUtils.moveBufferToStream(out, in, valueLength); - } - private void uncompressSingleKeyValue(DataInputStream source, - ByteBuffer buffer, - DiffCompressionState state) - throws IOException, EncoderBufferTooSmallException { + ByteBuffer buffer, DiffEncodingState state) + throws IOException, EncoderBufferTooSmallException { // read the column family at the beginning if (state.isFirst()) { state.familyLength = source.readByte(); @@ -231,16 +108,14 @@ } int commonPrefix = ByteBufferUtils.readCompressedInt(source); - // create KeyValue buffer and fill it prefix int keyOffset = buffer.position(); - ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength - + KeyValue.ROW_OFFSET); + ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET); buffer.putInt(keyLength); buffer.putInt(valueLength); // copy common from previous key if (commonPrefix > 0) { - ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset + ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.keyOffset + KeyValue.ROW_OFFSET, commonPrefix); } @@ -307,49 +182,37 @@ // copy value part ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength); - state.keyLength = keyLength; - state.valueLength = valueLength; - state.prevOffset = keyOffset; + state.keyOffset = keyOffset; state.timestamp = timestamp; state.type = type; - // state.qualifier is unused + state.keyLength = keyLength; + state.valueLength = valueLength; } @Override public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in, boolean includesMemstoreTS) throws IOException { - in.rewind(); - ByteBufferUtils.putInt(out, in.limit()); - DiffCompressionState previousState = new DiffCompressionState(); - DiffCompressionState currentState = new DiffCompressionState(); - while (in.hasRemaining()) { - compressSingleKeyValue(previousState, currentState, - out, in); - afterEncodingKeyValue(in, out, includesMemstoreTS); - - // swap previousState <-> currentState - DiffCompressionState tmp = previousState; - previousState = currentState; - currentState = tmp; - } + encodeKeyValues(out, in, includesMemstoreTS); } @Override public ByteBuffer decodeKeyValues(DataInputStream source, - int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + int allocHeaderLength, boolean includesMemstoreTS, int totalEncodedSize) throws IOException { + int skipLastBytes = source.available() - totalEncodedSize; + Preconditions.checkState(skipLastBytes >= 0, "Requested to skip a negative number of bytes"); int decompressedSize = source.readInt(); ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocHeaderLength); buffer.position(allocHeaderLength); - DiffCompressionState state = new DiffCompressionState(); + DiffEncodingState state = new DiffEncodingState();; while (source.available() > skipLastBytes) { uncompressSingleKeyValue(source, buffer, state); afterDecodingKeyValue(source, buffer, includesMemstoreTS); } if (source.available() != skipLastBytes) { - throw new IllegalStateException("Read too much bytes."); + throw new IllegalStateException("Read too many bytes."); } return buffer; @@ -404,10 +267,145 @@ } @Override - public String toString() { - return DiffKeyDeltaEncoder.class.getSimpleName(); + public DiffKeyDeltaEncoderWriter createWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + return new DiffKeyDeltaEncoderWriter(out, includesMemstoreTS); } + /** + * A writer that incrementally performs Fast Diff Delta Encoding + */ + private static class DiffKeyDeltaEncoderWriter + extends BufferedEncodedWriter { + + public DiffKeyDeltaEncoderWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + super(out, includesMemstoreTS); + } + + @Override + DiffEncodingState createState() { + return new DiffEncodingState(); + } + + @Override + public void updateInitial(final byte[] key, + final int keyOffset, final int keyLength, final byte[] value, + final int valueOffset, final int valueLength) throws IOException { + ByteBuffer keyBuffer = ByteBuffer.wrap(key, keyOffset, keyLength); + long timestamp; + long diffTimestamp = 0; + int diffTimestampFitsInBytes = 0; + byte flag = 0; + int commonPrefix; + int timestampFitsInBytes; + + if (this.prevState == null) { + currentState.readKey(keyBuffer, keyLength, valueLength); + timestamp = currentState.timestamp; + if (timestamp < 0) { + flag |= FLAG_TIMESTAMP_SIGN; + timestamp = -timestamp; + } + timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); + + flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; + commonPrefix = 0; + + // put column family + this.out.write(key, keyOffset + currentState.rowLength + + KeyValue.ROW_LENGTH_SIZE, currentState.familyLength + + KeyValue.FAMILY_LENGTH_SIZE); + } else { + // find a common prefix and skip it + // don't compress timestamp and type using this prefix + commonPrefix = getCommonPrefixLength(key, keyOffset, keyLength - + KeyValue.TIMESTAMP_TYPE_SIZE, this.prevState.key, + this.prevState.keyOffset, this.prevState.keyLength - + KeyValue.TIMESTAMP_TYPE_SIZE); + + currentState.readKey(keyBuffer, keyLength, valueLength, + commonPrefix, this.prevState); + timestamp = currentState.timestamp; + boolean negativeTimestamp = timestamp < 0; + if (negativeTimestamp) { + timestamp = -timestamp; + } + timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp); + + if (keyLength == this.prevState.keyLength) { + flag |= FLAG_SAME_KEY_LENGTH; + } + if (valueLength == this.prevState.valueLength) { + flag |= FLAG_SAME_VALUE_LENGTH; + } + if (currentState.type == this.prevState.type) { + flag |= FLAG_SAME_TYPE; + } + + // encode timestamp + diffTimestamp = this.prevState.timestamp - currentState.timestamp; + boolean negativeDiffTimestamp = diffTimestamp < 0; + if (negativeDiffTimestamp) { + diffTimestamp = -diffTimestamp; + } + diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp); + if (diffTimestampFitsInBytes < timestampFitsInBytes) { + flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; + flag |= FLAG_TIMESTAMP_IS_DIFF; + if (negativeDiffTimestamp) { + flag |= FLAG_TIMESTAMP_SIGN; + } + } else { + flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH; + if (negativeTimestamp) { + flag |= FLAG_TIMESTAMP_SIGN; + } + } + } + + this.out.write(flag); + + if ((flag & FLAG_SAME_KEY_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(this.out, keyLength); + } + if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { + ByteBufferUtils.putCompressedInt(this.out, valueLength); + } + + ByteBufferUtils.putCompressedInt(this.out, commonPrefix); + + if ((this.prevState == null) || + commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) { + int restRowLength = + currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix; + this.out.write(key, keyOffset + commonPrefix, restRowLength); + this.out.write(key, keyOffset + commonPrefix + restRowLength + + currentState.familyLength + KeyValue.FAMILY_LENGTH_SIZE, + currentState.qualifierLength); + } else { + this.out.write(key, keyOffset + commonPrefix, keyLength - + commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE); + } + + if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { + ByteBufferUtils.putLong(this.out, timestamp, timestampFitsInBytes); + } else { + ByteBufferUtils.putLong(this.out, diffTimestamp, + diffTimestampFitsInBytes); + } + + if ((flag & FLAG_SAME_TYPE) == 0) { + this.out.write(currentState.type); + } + + this.out.write(value, valueOffset, valueLength); + + this.currentState.key = key; + this.currentState.keyOffset = keyOffset; + } + } + protected static class DiffSeekerState extends SeekerState { private int rowLengthWithSize; private long timestamp; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (working copy) @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompoundBloomFilter; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; @@ -193,7 +195,7 @@ /** * The on-disk size of the next block, including the header, obtained by - * peeking into the first {@link HEADER_SIZE} bytes of the next block's + * peeking into the first {@link #HEADER_SIZE} bytes of the next block's * header, or -1 if unknown. */ private int nextBlockOnDiskSizeWithHeader = -1; @@ -321,7 +323,7 @@ } /** - * Writes header fields into the first {@link HEADER_SIZE} bytes of the + * Writes header fields into the first {@link #HEADER_SIZE} bytes of the * buffer. Resets the buffer position to the end of header as side effect. */ private void overwriteHeader() { @@ -646,6 +648,8 @@ /** block encoding context for non-data blocks */ private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; + private DataBlockEncoder.EncodedWriter encodedWriter; + /** * The stream we use to accumulate data in uncompressed format for each * block. We reset this stream at the end of each block and reuse it. The @@ -712,9 +716,19 @@ private ChecksumType checksumType; private int bytesPerChecksum; + public void appendEncodedKV(final long memstoreTS, final byte[] key, + final int keyOffset, final int keyLength, final byte[] value, + final int valueOffset, final int valueLength) throws IOException { + if (encodedWriter == null) { + throw new IOException("Must initialize encoded writer"); + } + encodedWriter.update(memstoreTS, key, keyOffset, keyLength, value, + valueOffset, valueLength); + } + /** * @param compressionAlgorithm compression algorithm to use - * @param dataBlockEncoderAlgo data block encoding algorithm to use + * @param dataBlockEncoder data block encoding algorithm to use * @param checksumType type of checksum * @param bytesPerChecksum bytes per checksum */ @@ -725,6 +739,8 @@ compressionAlgorithm; this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; + this.encodedWriter = null; + defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(compressionAlgorithm, null); dataBlockEncodingCtx = @@ -772,18 +788,25 @@ // We will compress it later in finishBlock() userDataStream = new DataOutputStream(baosInMemory); - return userDataStream; + + // We only encode data blocks. + if (this.blockType == BlockType.DATA) { + this.encodedWriter = this.dataBlockEncoder.getEncodedWriter(this.userDataStream, + this.includesMemstoreTS); + this.encodedWriter.reserveMetadataSpace(); + return null; // The caller should use appendEncodedKV + } else { + this.encodedWriter = null; + return userDataStream; + } } /** - * Returns the stream for the user to write to. The block writer takes care - * of handling compression and buffering for caching on write. Can only be - * called in the "writing" state. - * - * @return the data output stream for the user to write to + * Used to get the stream for the user to write data block contents into. This is unsafe and + * should only be used for testing. Key/value pairs should be appended using + * {@link #appendEncodedKV(long, byte[], int, int, byte[], int, int)}. */ - DataOutputStream getUserDataStream() { - expectState(State.WRITING); + DataOutputStream getUserDataStreamUnsafe() { return userDataStream; } @@ -851,20 +874,16 @@ * {@link #dataBlockEncoder}. */ private void encodeDataBlockForDisk() throws IOException { - // do data block encoding, if data block encoder is set - ByteBuffer rawKeyValues = - ByteBuffer.wrap(uncompressedBytesWithHeader, HEADER_SIZE, - uncompressedBytesWithHeader.length - HEADER_SIZE).slice(); + if (this.encodedWriter == null) { + throw new IOException("All data blocks must use an encoded writer"); + } - //do the encoding - dataBlockEncoder.beforeWriteToDisk(rawKeyValues, - includesMemstoreTS, dataBlockEncodingCtx, blockType); - - uncompressedBytesWithHeader = - dataBlockEncodingCtx.getUncompressedBytesWithHeader(); - onDiskBytesWithHeader = - dataBlockEncodingCtx.getOnDiskBytesWithHeader(); - blockType = dataBlockEncodingCtx.getBlockType(); + if (this.dataBlockEncoder.finishEncoding(uncompressedBytesWithHeader, + HEADER_SIZE, + uncompressedBytesWithHeader.length - HEADER_SIZE, this.encodedWriter)) { + this.blockType = BlockType.ENCODED_DATA; + } + this.encodedWriter = null; } /** @@ -1243,13 +1262,21 @@ // Positional read. Better for random reads. int extraSize = peekIntoNextBlock ? hdrSize : 0; - int ret = istream.read(fileOffset, dest, destOffset, size + extraSize); - if (ret < size) { - throw new IOException("Positional read of " + size + " bytes " + - "failed at offset " + fileOffset + " (returned " + ret + ")"); + int sizeToRead = size + extraSize; + int sizeRead = 0; + if (sizeToRead > 0) { + sizeRead = istream.read(fileOffset, dest, destOffset, sizeToRead); + if (size == 0 && sizeRead == -1) { + // a degenerate case of a zero-size block and no next block header. + sizeRead = 0; + } + if (sizeRead < size) { + throw new IOException("Positional read of " + sizeToRead + " bytes " + + "failed at offset " + fileOffset + " (returned " + sizeRead + ")"); + } } - if (ret == size || ret < size + extraSize) { + if (sizeRead == size || sizeRead < sizeToRead) { // Could not read the next block's header, or did not try. return -1; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (working copy) @@ -16,16 +16,17 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; -import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.util.Bytes; @@ -280,4 +281,20 @@ return new HFileBlockDefaultDecodingContext(compressionAlgorithm); } + @Override + public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + if (onDisk != DataBlockEncoding.NONE) { + this.onDisk.writeIdInBytes(out); + } + + return this.onDisk.getEncoder().createWriter(out, includesMemstoreTS); + } + + @Override + public boolean finishEncoding(byte[] data, final int offset, final int length, + DataBlockEncoder.EncodedWriter writer) throws IOException { + int bytesToSkip = onDisk.encodingIdSize(); + return writer.finishEncoding(data, offset + bytesToSkip, length - bytesToSkip); + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (working copy) @@ -16,13 +16,15 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; -import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.util.Bytes; @@ -115,4 +117,23 @@ public HFileBlockDecodingContext newOnDiskDataBlockDecodingContext( Algorithm compressionAlgorithm); + /** + * @return an incremental encoding writer that uses the given output stream + */ + public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException; + + /** + * Complete the encoding process + * + * + * @param data The encoded data so far + * @param offset Location of encoded bytes in data + * @param length Length of encoded bytes in data + * @param writer EncodedWriter used to insert encoded data + * @return true if the data was stored in an encoded format, false if unencoded + * @throws IOException + */ + public boolean finishEncoding(byte[] data, final int offset, final int length, + DataBlockEncoder.EncodedWriter writer) throws IOException; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (working copy) @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; /** * Writes HFile format version 2. @@ -312,9 +311,9 @@ * @param vlength * @throws IOException */ - private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength, - final byte[] value, final int voffset, final int vlength) - throws IOException { + private void append(final long memstoreTS, final byte[] key, + final int koffset, final int klength, final byte[] value, + final int voffset, final int vlength) throws IOException { boolean dupKey = checkKey(key, koffset, klength); checkValue(value, voffset, vlength); if (!dupKey) { @@ -326,18 +325,10 @@ // Write length of key and value and then actual key and value bytes. // Additionally, we may also write down the memstoreTS. - { - DataOutputStream out = fsBlockWriter.getUserDataStream(); - out.writeInt(klength); - totalKeyLength += klength; - out.writeInt(vlength); - totalValueLength += vlength; - out.write(key, koffset, klength); - out.write(value, voffset, vlength); - if (this.includeMemstoreTS) { - WritableUtils.writeVLong(out, memstoreTS); - } - } + this.fsBlockWriter.appendEncodedKV(memstoreTS, key, koffset, klength, value, + voffset, vlength); + totalKeyLength += klength; + totalValueLength += vlength; // Are we the first key in this block? if (firstKeyInBlock == null) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (revision 1402279) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (working copy) @@ -16,15 +16,17 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; -import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; /** @@ -106,4 +108,16 @@ return new HFileBlockDefaultDecodingContext(compressionAlgorithm); } + @Override + public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException { + return DataBlockEncoding.NONE.getEncoder().createWriter(out, + includesMemstoreTS); + } + + @Override + public boolean finishEncoding(byte[] data, final int offset, final int length, + DataBlockEncoder.EncodedWriter writer) throws IOException { + return writer.finishEncoding(data, offset, length); + } } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1402211) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -690,6 +690,9 @@ * The actual value is irrelevant because this is always compared by reference. */ public static final byte [] NO_NEXT_INDEXED_KEY = Bytes.toBytes("NO_NEXT_INDEXED_KEY"); + + public static final boolean[] BOOLEAN_VALUES = { false, true }; + /** delimiter used between portions of a region name */ public static final int DELIMITER = ','; Index: hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1402211) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -2596,4 +2596,15 @@ out.writeInt(this.length); out.write(this.bytes, this.offset, this.length); } + + /** + * Returns the size of a key/value pair in bytes + * @param keyLength length of the key in bytes + * @param valueLength length of the value in bytes + * @return key/value pair size in bytes + */ + public static int getKVSize(final int keyLength, + final int valueLength) { + return ROW_OFFSET + keyLength + valueLength; + } } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (revision 1402211) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (working copy) @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -62,28 +63,17 @@ HFileBlockEncodingContext encodingContext) throws IOException; /** - * Decode. - * @param source Compressed stream of KeyValues. - * @param includesMemstoreTS true if including memstore timestamp after every - * key-value pair - * @return Uncompressed block of KeyValues. - * @throws IOException If there is an error in source. - */ - public ByteBuffer decodeKeyValues(DataInputStream source, - boolean includesMemstoreTS) throws IOException; - - /** * Uncompress. * @param source encoded stream of KeyValues. * @param allocateHeaderLength allocate this many bytes for the header. - * @param skipLastBytes Do not copy n last bytes. + * @param totalEncodedSize the total size of the encoded data to read * @param includesMemstoreTS true if including memstore timestamp after every * key-value pair * @return Uncompressed block of KeyValues. * @throws IOException If there is an error in source. */ public ByteBuffer decodeKeyValues(DataInputStream source, - int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS) + int allocateHeaderLength, boolean includesMemstoreTS, int totalEncodedSize) throws IOException; /** @@ -135,6 +125,54 @@ Algorithm compressionAlgorithm); /** + * Create an incremental writer + * @param out Where to write encoded data + * @param includesMemstoreTS True if including memstore timestamp after every + * key-value pair + * @return + */ + public EncodedWriter createWriter(DataOutputStream out, + boolean includesMemstoreTS) throws IOException; + + /** + * An interface for performing incremental encoding + */ + public static interface EncodedWriter { + /** + * Sets next key to insert for the writer and updates the encoded data + * in the stream if necessary. + * @param memstoreTS Current memstore timestamp + * @param key Source of key bytes + * @param keyOffset Offset of initial byte in key array + * @param keyLength Length of key bytes in key array + * @param value Source of value bytes + * @param valueOffset Offset of initial byte in value array + * @param valueLength Length of value bytes in value array + * @throws IOException + */ + public void update(final long memstoreTS, final byte[] key, + final int keyOffset, final int keyLength, final byte[] value, + final int valueOffset, final int valueLength) throws IOException; + + /** + * Completes the encoding process on the given byte array + * @param data The encoded stream as a byte array + * @param offset Offset of initial byte in data array + * @param length Length of the portion of the array containing encoded data + * @return True if encoding was performed + * @throws IOException + */ + public boolean finishEncoding(byte[] data, final int offset, + final int length) throws IOException; + + /** + * Called before writing anything to the stream to reserve space for the necessary metadata + * such as unencoded length. + */ + void reserveMetadataSpace() throws IOException; + } + + /** * An interface which enable to seek while underlying data is encoded. * * It works on one HFileBlock, but it is reusable. See Index: hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (revision 1402211) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (working copy) @@ -34,7 +34,8 @@ public enum DataBlockEncoding { /** Disable data block encoding. */ - NONE(0, null), + NONE(0, createEncoder("org.apache.hadoop.hbase.io.encoding.CopyKeyDataBlockEncoder")), + // id 1 is reserved for the BITSET algorithm to be added later PREFIX(2, createEncoder("org.apache.hadoop.hbase.io.encoding.PrefixKeyDeltaEncoder")), DIFF(3, createEncoder("org.apache.hadoop.hbase.io.encoding.DiffKeyDeltaEncoder")), @@ -182,4 +183,17 @@ } } -} + /** + * Size of metadata inside the encoded block. This metadata is not considered part of the block + * header, but is part of the block payload. It does not include the encoding id. + */ + public int inBlockMetadataSize() { + if (id == 0) return 0; + return Bytes.SIZEOF_INT; // unencoded size + } + + public int encodingIdSize() { + if (id == 0) return 0; + return ID_SIZE; + } +} \ No newline at end of file Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (revision 1402211) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (working copy) @@ -362,6 +362,22 @@ return result.toString(); } + /** + * Similar to {@link #toStringBinary(byte[])}, but converts the portion of the buffer from the + * current position to the limit to string. + * + * @param buf a byte buffer + * @return a string representation of the buffer's remaining contents + */ + public static String toStringBinaryRemaining(ByteBuffer buf) { + if (buf == null) { + return "null"; + } + int offset = buf.arrayOffset(); + int pos = buf.position(); + return toStringBinary(buf.array(), offset + pos, buf.limit() - pos); + } + private static boolean isHexDigit(char c) { return (c >= 'A' && c <= 'F') ||