diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 7988352..e79f3d3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -523,8 +523,9 @@ public final class CellUtil { } @Override - public int write(OutputStream out, boolean withTags) throws IOException { - int len = ((ExtendedCell) this.cell).write(out, false); + public int write(OutputStream out, boolean withTags, + boolean withLengthHeader) throws IOException { + int len = ((ExtendedCell) this.cell).write(out, false, withLengthHeader); if (withTags && this.tags != null) { // Write the tagsLength 2 bytes out.write((byte) (0xff & (this.tags.length >> 8))); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java index 420a5f9..91084d6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -42,11 +42,13 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam * <tags> * @param out Stream to which cell has to be written * @param withTags Whether to write tags. + * @param withLengthHeader Whether to write a header length before data. * @return how many bytes are written. * @throws IOException */ // TODO remove the boolean param once HBASE-16706 is done. - int write(OutputStream out, boolean withTags) throws IOException; + int write(OutputStream out, boolean withTags, boolean withLengthHeader) + throws IOException; /** * @param withTags Whether to write tags. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 8f8554c..205cd10 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.RawComparator; + import com.google.common.annotations.VisibleForTesting; /** @@ -2473,15 +2474,22 @@ public class KeyValue implements ExtendedCell { @Deprecated public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags) throws IOException { - ByteBufferUtils.putInt(out, kv.getSerializedSize(withTags)); - return kv.write(out, withTags) + Bytes.SIZEOF_INT; + return kv.write(out, withTags, true); } @Override - public int write(OutputStream out, boolean withTags) throws IOException { + public int write(OutputStream out, boolean withTags, boolean withLengthHeader) + throws IOException { int len = getSerializedSize(withTags); + if (withLengthHeader) { + ByteBufferUtils.putInt(out, len); + } out.write(this.bytes, this.offset, len); - return len; + if (withLengthHeader) { + return len + Bytes.SIZEOF_INT; + } else { + return len; + } } @Override @@ -2792,7 +2800,8 @@ public class KeyValue implements ExtendedCell { } @Override - public int write(OutputStream out, boolean withTags) throws IOException { + public int write(OutputStream out, boolean withTags, + boolean withLengthHeader) throws IOException { // This type of Cell is used only to maintain some internal states. We never allow this type // of Cell to be returned back over the RPC throw new IllegalStateException("A reader should never return this type of a Cell"); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 077f9ee..307099c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -615,10 +615,16 @@ public class KeyValueUtil { cell.getValueLength(), cell.getTagsLength(), withTags); } - public static int oswrite(final Cell cell, final OutputStream out, final boolean withTags) + public static int oswrite(final Cell cell, final OutputStream out, + final boolean withTags) throws IOException { + return oswrite(cell, out, withTags, false); + } + + public static int oswrite(final Cell cell, final OutputStream out, + final boolean withTags, final boolean withLengthHeader) throws IOException { if (cell instanceof ExtendedCell) { - return ((ExtendedCell)cell).write(out, withTags); + return ((ExtendedCell) cell).write(out, withTags, withLengthHeader); } else { short rlen = cell.getRowLength(); byte flen = cell.getFamilyLength(); @@ -626,6 +632,12 @@ public class KeyValueUtil { int vlen = cell.getValueLength(); int tlen = cell.getTagsLength(); int size = 0; + if (withLengthHeader) { + // write length header + ByteBufferUtils.putInt(out, + length(rlen, flen, qlen, vlen, tlen, withTags)); + size += Bytes.SIZEOF_INT; + } // write key length int klen = keyLength(rlen, flen, qlen); ByteBufferUtils.putInt(out, klen); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java index 715bc1a..1d7930f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; /** * An extension of the KeyValue where the tags length is always 0 @@ -39,9 +41,17 @@ public class NoTagsKeyValue extends KeyValue { } @Override - public int write(OutputStream out, boolean withTags) throws IOException { + public int write(OutputStream out, boolean withTags, boolean withLengthHeader) + throws IOException { + if (withLengthHeader) { + ByteBufferUtils.putInt(out, this.length); + } out.write(this.bytes, this.offset, this.length); - return this.length; + if (withLengthHeader) { + return this.length + Bytes.SIZEOF_INT; + } else { + return this.length; + } } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index ae2496b..ecab5a6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -239,10 +239,18 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { } @Override - public int write(OutputStream out, boolean withTags) throws IOException { + public int write(OutputStream out, boolean withTags, boolean withLengthHeader) + throws IOException { int length = getSerializedSize(withTags); + if (withLengthHeader) { + ByteBufferUtils.putInt(out, length); + } ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length); - return length; + if (withLengthHeader) { + return length + Bytes.SIZEOF_INT; + } else { + return length; + } } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java index 322c668..f795aff 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; /** * This class is an extension to ContentSizeCachedKeyValue where there are no tags in Cell. @@ -42,9 +44,17 @@ public class SizeCachedNoTagsKeyValue extends SizeCachedKeyValue { } @Override - public int write(OutputStream out, boolean withTags) throws IOException { + public int write(OutputStream out, boolean withTags, boolean withLengthHeader) + throws IOException { + if (withLengthHeader) { + ByteBufferUtils.putInt(out, length); + } out.write(this.bytes, this.offset, this.length); - return this.length; + if (withLengthHeader) { + return length + Bytes.SIZEOF_INT; + } else { + return length; + } } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index 2609398..7b424e1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -60,8 +60,7 @@ public class KeyValueCodec implements Codec { public void write(Cell cell) throws IOException { checkFlushed(); // Do not write tags over RPC - ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, false)); - KeyValueUtil.oswrite(cell, out, false); + KeyValueUtil.oswrite(cell, out, false, true); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index 63c02e8..2a7ef80 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteBufferUtils; /** * Codec that does KeyValue version 1 serialization with serializing tags also. @@ -62,8 +61,7 @@ public class KeyValueCodecWithTags implements Codec { public void write(Cell cell) throws IOException { checkFlushed(); // Write tags - ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true)); - KeyValueUtil.oswrite(cell, out, true); + KeyValueUtil.oswrite(cell, out, true, true); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 4d3a26c..29055c3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -426,8 +426,12 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { } @Override - public int write(OutputStream out, boolean withTags) throws IOException { + public int write(OutputStream out, boolean withTags, + boolean withLengthHeader) throws IOException { int lenToWrite = getSerializedSize(withTags); + if (withLengthHeader) { + ByteBufferUtils.putInt(out, lenToWrite); + } ByteBufferUtils.putInt(out, keyOnlyBuffer.length); ByteBufferUtils.putInt(out, valueLength); // Write key @@ -442,7 +446,11 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { out.write((byte) (0xff & this.tagsLength)); out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); } - return lenToWrite; + if (withLengthHeader) { + return lenToWrite + Bytes.SIZEOF_INT; + } else { + return lenToWrite; + } } @Override @@ -665,8 +673,12 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { } @Override - public int write(OutputStream out, boolean withTags) throws IOException { + public int write(OutputStream out, boolean withTags, + boolean withLengthHeader) throws IOException { int lenToWrite = getSerializedSize(withTags); + if (withLengthHeader) { + ByteBufferUtils.putInt(out, lenToWrite); + } ByteBufferUtils.putInt(out, keyBuffer.capacity()); ByteBufferUtils.putInt(out, valueLength); // Write key @@ -681,7 +693,11 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { out.write((byte) (0xff & this.tagsLength)); ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); } - return lenToWrite; + if (withLengthHeader) { + return lenToWrite + Bytes.SIZEOF_INT; + } else { + return lenToWrite; + } } @Override diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java index 4e0090d..1543bee 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertNotEquals; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -34,11 +36,8 @@ import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; -import static org.junit.Assert.assertNotEquals; - public class TestKeyValue extends TestCase { private static final Log LOG = LogFactory.getLog(TestKeyValue.class); @@ -570,10 +569,8 @@ public class TestKeyValue extends TestCase { MockKeyValue mkvA2 = new MockKeyValue(kvA2); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DataOutputStream os = new DataOutputStream(byteArrayOutputStream); - ByteBufferUtils.putInt(os, KeyValueUtil.getSerializedSize(mkvA1, true)); - KeyValueUtil.oswrite(mkvA1, os, true); - ByteBufferUtils.putInt(os, KeyValueUtil.getSerializedSize(mkvA2, true)); - KeyValueUtil.oswrite(mkvA2, os, true); + KeyValueUtil.oswrite(mkvA1, os, true, true); + KeyValueUtil.oswrite(mkvA2, os, true, true); DataInputStream is = new DataInputStream(new ByteArrayInputStream( byteArrayOutputStream.toByteArray())); KeyValue deSerKV1 = KeyValueUtil.iscreate(is, true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 52dfae0..87c68f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -344,8 +344,7 @@ public class WALCellCodec implements Codec { public void write(Cell cell) throws IOException { checkFlushed(); // Make sure to write tags into WAL - ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true)); - KeyValueUtil.oswrite(cell, this.out, true); + KeyValueUtil.oswrite(cell, this.out, true, true); } }