.../java/org/apache/hadoop/hbase/CellUtil.java | 97 ++++++++++++++++++++++ .../hbase/regionserver/wal/WALCellCodec.java | 51 ++---------- .../wal/TestWALCellCodecWithCompression.java | 40 +++++++-- 3 files changed, 136 insertions(+), 52 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 484eebd..1b19194 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -37,8 +37,11 @@ import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience.Private; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.util.Dictionary; +import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; @@ -1936,6 +1939,100 @@ public final class CellUtil { } @InterfaceAudience.Private + public static void compressCellForWal(OutputStream out, Cell cell, Dictionary rowDict, + Dictionary famDict, Dictionary colDict, TagCompressionContext tagCompressionContext) + throws IOException { + // We first write the KeyValue infrastructure as VInts. + StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); + StreamUtils.writeRawVInt32(out, cell.getValueLength()); + // To support tags + int tagsLength = cell.getTagsLength(); + StreamUtils.writeRawVInt32(out, tagsLength); + + // Write row, qualifier, and family; use dictionary + // compression as they're likely to have duplicates. + if (cell instanceof ByteBufferedCell) { + assert out instanceof ByteBufferWriter; + write(out, ((ByteBufferedCell) cell).getRowByteBuffer(), + ((ByteBufferedCell) cell).getRowPosition(), cell.getRowLength(), rowDict); + write(out, ((ByteBufferedCell) cell).getFamilyByteBuffer(), + ((ByteBufferedCell) cell).getFamilyPosition(), cell.getFamilyLength(), famDict); + write(out, ((ByteBufferedCell) cell).getQualifierByteBuffer(), + ((ByteBufferedCell) cell).getQualifierPosition(), cell.getQualifierLength(), colDict); + + // Write timestamp, type and value as uncompressed. + StreamUtils.writeLong(out, cell.getTimestamp()); + out.write(cell.getTypeByte()); + ((ByteBufferWriter) out).write(((ByteBufferedCell) cell).getValueByteBuffer(), + ((ByteBufferedCell) cell).getValuePosition(), cell.getValueLength()); + if (tagsLength > 0) { + if (tagCompressionContext != null) { + // Write tags using Dictionary compression + tagCompressionContext.compressTags(out, ((ByteBufferedCell) cell).getTagsByteBuffer(), + ((ByteBufferedCell) cell).getTagsPosition(), tagsLength); + } else { + // Tag compression is disabled within the WAL compression. Just write the tags bytes as + // it is. + ((ByteBufferWriter) out).write(((ByteBufferedCell) cell).getTagsByteBuffer(), + cell.getTagsOffset(), tagsLength); + } + } + } else { + write(out, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), rowDict); + write(out, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), famDict); + write(out, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + colDict); + + // Write timestamp, type and value as uncompressed. + StreamUtils.writeLong(out, cell.getTimestamp()); + out.write(cell.getTypeByte()); + out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + if (tagsLength > 0) { + if (tagCompressionContext != null) { + // Write tags using Dictionary compression + tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), + tagsLength); + } else { + // Tag compression is disabled within the WAL compression. Just write the tags bytes as + // it is. + out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + } + } + } + } + + private static void write(OutputStream out, byte[] data, int offset, int length, Dictionary dict) + throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (dict != null) { + dictIdx = dict.findEntry(data, offset, length); + } + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + out.write(Dictionary.NOT_IN_DICTIONARY); + StreamUtils.writeRawVInt32(out, length); + out.write(data, offset, length); + } else { + StreamUtils.writeShort(out, dictIdx); + } + } + + private static void write(OutputStream out, ByteBuffer data, int offset, int length, + Dictionary dict) throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (dict != null) { + dictIdx = dict.findEntry(data, offset, length); + } + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + assert out instanceof ByteBufferWriter; + out.write(Dictionary.NOT_IN_DICTIONARY); + StreamUtils.writeRawVInt32(out, length); + ((ByteBufferWriter) out).write(data, offset, length); + } else { + StreamUtils.writeShort(out, dictIdx); + } + } + + @InterfaceAudience.Private /** * These cells are used in reseeks/seeks to improve the read performance. * They are not real cells that are returned back to the clients diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 1a18087..3e713ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -201,50 +202,8 @@ public class WALCellCodec implements Codec { @Override public void write(Cell cell) throws IOException { - // We first write the KeyValue infrastructure as VInts. - StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); - StreamUtils.writeRawVInt32(out, cell.getValueLength()); - // To support tags - int tagsLength = cell.getTagsLength(); - StreamUtils.writeRawVInt32(out, tagsLength); - - // Write row, qualifier, and family; use dictionary - // compression as they're likely to have duplicates. - write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict); - write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - compression.familyDict); - write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - compression.qualifierDict); - - // Write timestamp, type and value as uncompressed. - StreamUtils.writeLong(out, cell.getTimestamp()); - out.write(cell.getTypeByte()); - out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - if (tagsLength > 0) { - if (compression.tagCompressionContext != null) { - // Write tags using Dictionary compression - compression.tagCompressionContext.compressTags(out, cell.getTagsArray(), - cell.getTagsOffset(), tagsLength); - } else { - // Tag compression is disabled within the WAL compression. Just write the tags bytes as - // it is. - out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); - } - } - } - - private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException { - short dictIdx = Dictionary.NOT_IN_DICTIONARY; - if (dict != null) { - dictIdx = dict.findEntry(data, offset, length); - } - if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { - out.write(Dictionary.NOT_IN_DICTIONARY); - StreamUtils.writeRawVInt32(out, length); - out.write(data, offset, length); - } else { - StreamUtils.writeShort(out, dictIdx); - } + CellUtil.compressCellForWal(out, cell, compression.rowDict, compression.familyDict, + compression.qualifierDict, compression.tagCompressionContext); } } @@ -364,9 +323,9 @@ public class WALCellCodec implements Codec { @Override public Encoder getEncoder(OutputStream os) { + os = (os instanceof ByteBufferWriter) ? os + : new ByteBufferWriterOutputStream(os); if (compression == null) { - os = (os instanceof ByteBufferWriter) ? os - : new ByteBufferWriterOutputStream(os); return new EnsureKvEncoder(os); } return new CompressedKvEncoder(os, compression); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index e834ac8..ba5bfa3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -23,12 +23,14 @@ import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.ArrayBackedTag; @@ -46,24 +48,35 @@ public class TestWALCellCodecWithCompression { @Test public void testEncodeDecodeKVsWithTags() throws Exception { - doTest(false); + doTest(false, false); } @Test public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception { - doTest(true); + doTest(true, false); } - private void doTest(boolean compressTags) throws Exception { + @Test + public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception { + doTest(true, true); + } + + private void doTest(boolean compressTags, boolean offheapKV) throws Exception { Configuration conf = new Configuration(false); conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags); WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, compressTags)); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); Encoder encoder = codec.getEncoder(bos); - encoder.write(createKV(1)); - encoder.write(createKV(0)); - encoder.write(createKV(2)); + if (offheapKV) { + encoder.write(createOffheapKV(1)); + encoder.write(createOffheapKV(0)); + encoder.write(createOffheapKV(2)); + } else { + encoder.write(createKV(1)); + encoder.write(createKV(0)); + encoder.write(createKV(2)); + } InputStream is = new ByteArrayInputStream(bos.toByteArray()); Decoder decoder = codec.getDecoder(is); @@ -95,4 +108,19 @@ public class TestWALCellCodecWithCompression { } return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); } + + private OffheapKeyValue createOffheapKV(int noOfTags) { + byte[] row = Bytes.toBytes("myRow"); + byte[] cf = Bytes.toBytes("myCF"); + byte[] q = Bytes.toBytes("myQualifier"); + byte[] value = Bytes.toBytes("myValue"); + List tags = new ArrayList(noOfTags); + for (int i = 1; i <= noOfTags; i++) { + tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); + } + KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); + ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length); + dbb.put(kv.getBuffer()); + return new OffheapKeyValue(dbb, 0, kv.getBuffer().length); + } }