.../java/org/apache/hadoop/hbase/CellUtil.java | 58 ++++++++++++++++++---- .../hadoop/hbase/io/TagCompressionContext.java | 32 +----------- .../apache/hadoop/hbase/io/util/Dictionary.java | 51 +++++++++++++++++++ .../hbase/regionserver/wal/SecureWALCellCodec.java | 35 +++++++------ .../hbase/regionserver/wal/WALCellCodec.java | 38 ++++---------- .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 1 - .../wal/TestWALCellCodecWithCompression.java | 40 ++++++++++++--- .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 31 ++++++++++-- 8 files changed, 189 insertions(+), 97 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 5c26df8..4012758 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience.Private; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; @@ -1331,12 +1332,12 @@ public final class CellUtil { /** * Writes the row from the given cell to the output stream - * @param out The dataoutputstream to which the data has to be written + * @param out The outputstream to which the data has to be written * @param cell The cell whose contents has to be written * @param rlength the row length * @throws IOException */ - public static void writeRow(DataOutputStream out, Cell cell, short rlength) throws IOException { + public static void writeRow(OutputStream out, Cell cell, short rlength) throws IOException { if (cell instanceof ByteBufferCell) { ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(), ((ByteBufferCell) cell).getRowPosition(), rlength); @@ -1364,12 +1365,12 @@ public final class CellUtil { /** * Writes the family from the given cell to the output stream - * @param out The dataoutputstream to which the data has to be written + * @param out The outputstream to which the data has to be written * @param cell The cell whose contents has to be written * @param flength the family length * @throws IOException */ - public static void writeFamily(DataOutputStream out, Cell cell, byte flength) throws IOException { + public static void writeFamily(OutputStream out, Cell cell, byte flength) throws IOException { if (cell instanceof ByteBufferCell) { ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getFamilyByteBuffer(), ((ByteBufferCell) cell).getFamilyPosition(), flength); @@ -1380,12 +1381,12 @@ public final class CellUtil { /** * Writes the qualifier from the given cell to the output stream - * @param out The dataoutputstream to which the data has to be written + * @param out The outputstream to which the data has to be written * @param cell The cell whose contents has to be written * @param qlength the qualifier length * @throws IOException */ - public static void writeQualifier(DataOutputStream out, Cell cell, int qlength) + public static void writeQualifier(OutputStream out, Cell cell, int qlength) throws IOException { if (cell instanceof ByteBufferCell) { ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(), @@ -1415,12 +1416,12 @@ public final class CellUtil { /** * Writes the value from the given cell to the output stream - * @param out The dataoutputstream to which the data has to be written + * @param out The outputstream to which the data has to be written * @param cell The cell whose contents has to be written * @param vlength the value length * @throws IOException */ - public static void writeValue(DataOutputStream out, Cell cell, int vlength) throws IOException { + public static void writeValue(OutputStream out, Cell cell, int vlength) throws IOException { if (cell instanceof ByteBufferCell) { ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getValueByteBuffer(), ((ByteBufferCell) cell).getValuePosition(), vlength); @@ -1431,12 +1432,12 @@ public final class CellUtil { /** * Writes the tag from the given cell to the output stream - * @param out The dataoutputstream to which the data has to be written + * @param out The outputstream to which the data has to be written * @param cell The cell whose contents has to be written * @param tagsLength the tag length * @throws IOException */ - public static void writeTags(DataOutputStream out, Cell cell, int tagsLength) throws IOException { + public static void writeTags(OutputStream out, Cell cell, int tagsLength) throws IOException { if (cell instanceof ByteBufferCell) { ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getTagsByteBuffer(), ((ByteBufferCell) cell).getTagsPosition(), tagsLength); @@ -1983,7 +1984,8 @@ public final class CellUtil { * @param tagCompressionContext the TagCompressionContext * @throws IOException can throw IOException if the compression encounters issue */ - public static void compressTags(DataOutputStream out, Cell cell, + @InterfaceAudience.Private + public static void compressTags(OutputStream out, Cell cell, TagCompressionContext tagCompressionContext) throws IOException { if (cell instanceof ByteBufferCell) { tagCompressionContext.compressTags(out, ((ByteBufferCell) cell).getTagsByteBuffer(), @@ -1995,6 +1997,40 @@ public final class CellUtil { } @InterfaceAudience.Private + public static void compressRow(OutputStream out, Cell cell, Dictionary dict) throws IOException { + if (cell instanceof ByteBufferCell) { + Dictionary.write(out, ((ByteBufferCell) cell).getRowByteBuffer(), + ((ByteBufferCell) cell).getRowPosition(), cell.getRowLength(), dict); + } else { + Dictionary.write(out, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), dict); + } + } + + @InterfaceAudience.Private + public static void compressFamily(OutputStream out, Cell cell, Dictionary dict) + throws IOException { + if (cell instanceof ByteBufferCell) { + Dictionary.write(out, ((ByteBufferCell) cell).getFamilyByteBuffer(), + ((ByteBufferCell) cell).getFamilyPosition(), cell.getFamilyLength(), dict); + } else { + Dictionary.write(out, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + dict); + } + } + + @InterfaceAudience.Private + public static void compressQualifier(OutputStream out, Cell cell, Dictionary dict) + throws IOException { + if (cell instanceof ByteBufferCell) { + Dictionary.write(out, ((ByteBufferCell) cell).getQualifierByteBuffer(), + ((ByteBufferCell) cell).getQualifierPosition(), cell.getQualifierLength(), dict); + } else { + Dictionary.write(out, cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), dict); + } + } + + @InterfaceAudience.Private /** * These cells are used in reseeks/seeks to improve the read performance. * They are not real cells that are returned back to the clients diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java index 278dfc4..fea2f0c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -70,7 +70,7 @@ public class TagCompressionContext { while (pos < endOffset) { int tagLen = Bytes.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); pos += Tag.TAG_LENGTH_SIZE; - write(in, pos, tagLen, out); + Dictionary.write(out, in, pos, tagLen, tagDict); pos += tagLen; } } @@ -94,7 +94,7 @@ public class TagCompressionContext { while (pos < endOffset) { int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); pos += Tag.TAG_LENGTH_SIZE; - write(in, pos, tagLen, out); + Dictionary.write(out, in, pos, tagLen, tagDict);; pos += tagLen; } } @@ -185,32 +185,4 @@ public class TagCompressionContext { dest.put(tagBuf); } } - - private void write(byte[] data, int offset, int length, OutputStream out) throws IOException { - short dictIdx = Dictionary.NOT_IN_DICTIONARY; - if (tagDict != null) { - dictIdx = tagDict.findEntry(data, offset, length); - } - if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { - out.write(Dictionary.NOT_IN_DICTIONARY); - StreamUtils.writeRawVInt32(out, length); - out.write(data, offset, length); - } else { - StreamUtils.writeShort(out, dictIdx); - } - } - - private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException { - short dictIdx = Dictionary.NOT_IN_DICTIONARY; - if (tagDict != null) { - dictIdx = tagDict.findEntry(data, offset, length); - } - if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { - out.write(Dictionary.NOT_IN_DICTIONARY); - StreamUtils.writeRawVInt32(out, length); - ByteBufferUtils.copyBufferToStream(out, data, offset, length); - } else { - StreamUtils.writeShort(out, dictIdx); - } - } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java index 54677da..e6384e1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hbase.io.util; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; /** * Dictionary interface @@ -80,4 +83,52 @@ public interface Dictionary { * Flushes the dictionary, empties all values. */ void clear(); + + /** + * Helper methods to write the dictionary data to the OutputStream + * @param out the outputstream to which data needs to be written + * @param data the data to be written in byte[] + * @param offset the offset + * @param length length to be written + * @param dict the dictionary whose contents are to written + * @throws IOException + */ + public static void write(OutputStream out, byte[] data, int offset, int length, Dictionary dict) + throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (dict != null) { + dictIdx = dict.findEntry(data, offset, length); + } + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + out.write(Dictionary.NOT_IN_DICTIONARY); + StreamUtils.writeRawVInt32(out, length); + out.write(data, offset, length); + } else { + StreamUtils.writeShort(out, dictIdx); + } + } + + /** + * Helper methods to write the dictionary data to the OutputStream + * @param out the outputstream to which data needs to be written + * @param data the data to be written in ByteBuffer + * @param offset the offset + * @param length length to be written + * @param dict the dictionary whose contents are to written + * @throws IOException + */ + public static void write(OutputStream out, ByteBuffer data, int offset, int length, + Dictionary dict) throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (dict != null) { + dictIdx = dict.findEntry(data, offset, length); + } + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + out.write(Dictionary.NOT_IN_DICTIONARY); + StreamUtils.writeRawVInt32(out, length); + ByteBufferUtils.copyBufferToStream(out, data, offset, length); + } else { + StreamUtils.writeShort(out, dictIdx); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java index 603496f..027ff11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java @@ -28,9 +28,11 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream; import org.apache.hadoop.hbase.io.crypto.Decryptor; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.Encryptor; @@ -195,29 +197,32 @@ public class SecureWALCellCodec extends WALCellCodec { ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStream cout = encryptor.createEncryptionStream(baos); - + ByteBufferWriterOutputStream bos = new ByteBufferWriterOutputStream(cout); int tlen = cell.getTagsLength(); // Write the KeyValue infrastructure as VInts. - StreamUtils.writeRawVInt32(cout, KeyValueUtil.keyLength(cell)); - StreamUtils.writeRawVInt32(cout, cell.getValueLength()); + StreamUtils.writeRawVInt32(bos, KeyValueUtil.keyLength(cell)); + StreamUtils.writeRawVInt32(bos, cell.getValueLength()); // To support tags - StreamUtils.writeRawVInt32(cout, tlen); + StreamUtils.writeRawVInt32(bos, tlen); // Write row, qualifier, and family - StreamUtils.writeRawVInt32(cout, cell.getRowLength()); - cout.write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - StreamUtils.writeRawVInt32(cout, cell.getFamilyLength()); - cout.write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - StreamUtils.writeRawVInt32(cout, cell.getQualifierLength()); - cout.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + short rowLength = cell.getRowLength(); + StreamUtils.writeRawVInt32(bos, rowLength); + CellUtil.writeRow(bos, cell, rowLength); + byte familyLength = cell.getFamilyLength(); + StreamUtils.writeRawVInt32(bos, familyLength); + CellUtil.writeFamily(bos, cell, familyLength); + int qualifierLength = cell.getQualifierLength(); + StreamUtils.writeRawVInt32(bos, qualifierLength); + CellUtil.writeQualifier(bos, cell, qualifierLength); // Write the rest ie. ts, type, value and tags parts - StreamUtils.writeLong(cout, cell.getTimestamp()); - cout.write(cell.getTypeByte()); - cout.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + StreamUtils.writeLong(bos, cell.getTimestamp()); + bos.write(cell.getTypeByte()); + CellUtil.writeValue(bos, cell, cell.getValueLength()); if (tlen > 0) { - cout.write(cell.getTagsArray(), cell.getTagsOffset(), tlen); + CellUtil.writeTags(bos, cell, tlen); } - cout.close(); + bos.close(); StreamUtils.writeRawVInt32(out, baos.size()); baos.writeTo(out); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 1a18087..baa940b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -207,45 +208,24 @@ public class WALCellCodec implements Codec { // To support tags int tagsLength = cell.getTagsLength(); StreamUtils.writeRawVInt32(out, tagsLength); - - // Write row, qualifier, and family; use dictionary - // compression as they're likely to have duplicates. - write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict); - write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - compression.familyDict); - write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - compression.qualifierDict); - + CellUtil.compressRow(out, cell, compression.rowDict); + CellUtil.compressFamily(out, cell, compression.familyDict); + CellUtil.compressQualifier(out, cell, compression.qualifierDict); // Write timestamp, type and value as uncompressed. StreamUtils.writeLong(out, cell.getTimestamp()); out.write(cell.getTypeByte()); - out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + CellUtil.writeValue(out, cell, cell.getValueLength()); if (tagsLength > 0) { if (compression.tagCompressionContext != null) { // Write tags using Dictionary compression - compression.tagCompressionContext.compressTags(out, cell.getTagsArray(), - cell.getTagsOffset(), tagsLength); + CellUtil.compressTags(out, cell, compression.tagCompressionContext); } else { // Tag compression is disabled within the WAL compression. Just write the tags bytes as // it is. - out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + CellUtil.writeTags(out, cell, tagsLength); } } } - - private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException { - short dictIdx = Dictionary.NOT_IN_DICTIONARY; - if (dict != null) { - dictIdx = dict.findEntry(data, offset, length); - } - if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { - out.write(Dictionary.NOT_IN_DICTIONARY); - StreamUtils.writeRawVInt32(out, length); - out.write(data, offset, length); - } else { - StreamUtils.writeShort(out, dictIdx); - } - } } static class CompressedKvDecoder extends BaseDecoder { @@ -364,9 +344,9 @@ public class WALCellCodec implements Codec { @Override public Encoder getEncoder(OutputStream os) { + os = (os instanceof ByteBufferWriter) ? os + : new ByteBufferWriterOutputStream(os); if (compression == null) { - os = (os instanceof ByteBufferWriter) ? os - : new ByteBufferWriterOutputStream(os); return new EnsureKvEncoder(os); } return new CompressedKvEncoder(os, compression); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java index 9f3bd94..6fd65f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java @@ -41,7 +41,6 @@ public class TestRpcServer { @Test public void testAllocateByteBuffToReadInto() throws Exception { - System.out.println(Long.MAX_VALUE); int maxBuffersInPool = 10; ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool); initPoolWithAllBuffers(pool, maxBuffersInPool); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index e834ac8..ba5bfa3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -23,12 +23,14 @@ import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.ArrayBackedTag; @@ -46,24 +48,35 @@ public class TestWALCellCodecWithCompression { @Test public void testEncodeDecodeKVsWithTags() throws Exception { - doTest(false); + doTest(false, false); } @Test public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception { - doTest(true); + doTest(true, false); } - private void doTest(boolean compressTags) throws Exception { + @Test + public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception { + doTest(true, true); + } + + private void doTest(boolean compressTags, boolean offheapKV) throws Exception { Configuration conf = new Configuration(false); conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags); WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, compressTags)); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); Encoder encoder = codec.getEncoder(bos); - encoder.write(createKV(1)); - encoder.write(createKV(0)); - encoder.write(createKV(2)); + if (offheapKV) { + encoder.write(createOffheapKV(1)); + encoder.write(createOffheapKV(0)); + encoder.write(createOffheapKV(2)); + } else { + encoder.write(createKV(1)); + encoder.write(createKV(0)); + encoder.write(createKV(2)); + } InputStream is = new ByteArrayInputStream(bos.toByteArray()); Decoder decoder = codec.getDecoder(is); @@ -95,4 +108,19 @@ public class TestWALCellCodecWithCompression { } return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); } + + private OffheapKeyValue createOffheapKV(int noOfTags) { + byte[] row = Bytes.toBytes("myRow"); + byte[] cf = Bytes.toBytes("myCF"); + byte[] q = Bytes.toBytes("myQualifier"); + byte[] value = Bytes.toBytes("myValue"); + List tags = new ArrayList(noOfTags); + for (int i = 1; i <= noOfTags; i++) { + tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); + } + KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); + ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length); + dbb.put(kv.getBuffer()); + return new OffheapKeyValue(dbb, 0, kv.getBuffer().length); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index 3e060ab..0562fd9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.wal; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.NavigableMap; import java.util.TreeMap; @@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; @@ -90,7 +93,8 @@ public class TestWALReaderOnSecureWAL { FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir()); } - private Path writeWAL(final WALFactory wals, final String tblName) throws IOException { + @SuppressWarnings("deprecation") + private Path writeWAL(final WALFactory wals, final String tblName, boolean offheap) throws IOException { Configuration conf = TEST_UTIL.getConfiguration(); String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class, @@ -116,7 +120,15 @@ public class TestWALReaderOnSecureWAL { wals.getWAL(regioninfo.getEncodedNameAsBytes(), regioninfo.getTable().getNamespace()); for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); - kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); + KeyValue kv = new KeyValue(row, family, Bytes.toBytes(i), value); + if (offheap) { + ByteBuffer bb = ByteBuffer.allocateDirect(kv.getBuffer().length); + bb.put(kv.getBuffer()); + OffheapKeyValue offheapKV = new OffheapKeyValue(bb, 0, kv.getLength()); + kvs.add(offheapKV); + } else { + kvs.add(kv); + } wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } @@ -132,7 +144,16 @@ public class TestWALReaderOnSecureWAL { } @Test() - public void testWALReaderOnSecureWAL() throws Exception { + public void testWALReaderOnSecureWALWithKeyValues() throws Exception { + testSecureWALInternal(false); + } + + @Test() + public void testWALReaderOnSecureWALWithOffheapKeyValues() throws Exception { + testSecureWALInternal(true); + } + + private void testSecureWALInternal(boolean offheap) throws IOException, FileNotFoundException { Configuration conf = TEST_UTIL.getConfiguration(); conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, WAL.Reader.class); @@ -143,7 +164,7 @@ public class TestWALReaderOnSecureWAL { conf.setBoolean(WAL_ENCRYPTION, true); FileSystem fs = TEST_UTIL.getTestFileSystem(); final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName()); - Path walPath = writeWAL(wals, currentTest.getMethodName()); + Path walPath = writeWAL(wals, currentTest.getMethodName(), offheap); // Insure edits are not plaintext long length = fs.getFileStatus(walPath).getLen(); @@ -188,7 +209,7 @@ public class TestWALReaderOnSecureWAL { conf.setBoolean(WAL_ENCRYPTION, false); FileSystem fs = TEST_UTIL.getTestFileSystem(); final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName()); - Path walPath = writeWAL(wals, currentTest.getMethodName()); + Path walPath = writeWAL(wals, currentTest.getMethodName(), false); // Ensure edits are plaintext long length = fs.getFileStatus(walPath).getLen();