Index: hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (revision 1499203) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (working copy) @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -29,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; /** * Utility methods helpful slinging {@link Cell} instances. @@ -128,6 +131,18 @@ return new KeyValue(row, family, qualifier, timestamp, KeyValue.Type.codeToType(type), value); } + + + public static Cell createCell(final byte[] row, final byte[] family, final byte[] qualifier, + final long timestamp, final byte type, final byte[] value, final long memstoreTS) { + // I need a Cell Factory here. Using KeyValue for now. TODO. + // TODO: Make a new Cell implementation that just carries these + // byte arrays. + KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp, + KeyValue.Type.codeToType(type), value); + keyValue.setMvccVersion(memstoreTS); + return keyValue; + } /** * @param cellScannerables @@ -283,4 +298,69 @@ // Serialization is probably preceded by a length (it is in the KeyValueCodec at least). Bytes.SIZEOF_INT; } + + + /** -- Copied from WritableUtil but to use output stream-- + * Serializes a long to a binary stream with zero-compressed encoding. For + * -112 <= i <= 127, only one byte is used with the actual value. For other + * values of i, the first byte value indicates whether the long is positive or + * negative, and the number of bytes that follow. If the first byte value v is + * between -113 and -120, the following long is positive, with number of bytes + * that follow are -(v+112). If the first byte value v is between -121 and + * -128, the following long is negative, with number of bytes that follow are + * -(v+120). Bytes are stored in the high-non-zero-byte-first order. + * + * @param stream + * Binary output stream + * @param i + * Long to be serialized + * @throws java.io.IOException + */ + public static void writeVLong(OutputStream stream, long i) throws IOException { + + if (i >= -112 && i <= 127) { + stream.write((byte) i); + return; + } + + int len = -112; + if (i < 0) { + i ^= -1L; // take one's complement' + len = -120; + } + + long tmp = i; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + stream.write((byte) len); + + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + long mask = 0xFFL << shiftbits; + stream.write((byte) ((i & mask) >> shiftbits)); + } + } + + public static long readVLong(InputStream in) throws IOException { + byte firstByte = (byte) in.read(); + int len = WritableUtils.decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len - 1; idx++) { + byte b = (byte) in.read(); + i = i << 8; + i = i | (b & 0xFF); + } + return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); + } + + + } \ No newline at end of file Index: hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java (revision 1499203) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java (working copy) @@ -33,8 +33,10 @@ */ public class CellCodec implements Codec { static class CellEncoder extends BaseEncoder { - CellEncoder(final OutputStream out) { + private boolean encodeMemstoreTS; + CellEncoder(final OutputStream out, boolean encodeMemstoreTS) { super(out); + this.encodeMemstoreTS = encodeMemstoreTS; } @Override @@ -52,6 +54,10 @@ this.out.write(cell.getTypeByte()); // Value write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + // MvccVersion + if (this.encodeMemstoreTS) { + CellUtil.writeVLong(this.out, cell.getMvccVersion()); + } } /** @@ -69,8 +75,10 @@ } static class CellDecoder extends BaseDecoder { - public CellDecoder(final InputStream in) { + private boolean decodeMemstoreTS; + public CellDecoder(final InputStream in, boolean decodeMemstoreTS) { super(in); + this.decodeMemstoreTS = decodeMemstoreTS; } protected Cell parseCell() throws IOException { @@ -82,7 +90,16 @@ long timestamp = Bytes.toLong(longArray); byte type = (byte) this.in.read(); byte [] value = readByteArray(in); - return CellUtil.createCell(row, family, qualifier, timestamp, type, value); + // Read memstore version + if (this.decodeMemstoreTS) { + long memstoreTS = CellUtil.readVLong(in); + return CellUtil.createCell(row, family, qualifier, timestamp, type, value, memstoreTS); + //TODO : As part of HBASE-8151 we could ideally avoid reading the memstoreTS itself if we know that + // all memstoreTS are 0. But as we are operating on stream even if we do skip it is going to do a one + // byte read. So the readVLong itself would do that. + } else { + return CellUtil.createCell(row, family, qualifier, timestamp, type, value); + } } /** @@ -100,12 +117,12 @@ } @Override - public Decoder getDecoder(InputStream is) { - return new CellDecoder(is); + public Decoder getDecoder(InputStream is, boolean decodeMemstoreTS) { + return new CellDecoder(is, decodeMemstoreTS); } @Override - public Encoder getEncoder(OutputStream os) { - return new CellEncoder(os); + public Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS) { + return new CellEncoder(os, encodeMemstoreTS); } } \ No newline at end of file Index: hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java (revision 1499203) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java (working copy) @@ -46,6 +46,6 @@ */ public interface Decoder extends CellScanner {}; - Decoder getDecoder(InputStream is); - Encoder getEncoder(OutputStream os); + Decoder getDecoder(InputStream is, boolean decodeMemstoreTS); + Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS); } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java (revision 1499203) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java (working copy) @@ -71,12 +71,12 @@ * Implementation depends on {@link InputStream#available()} */ @Override - public Decoder getDecoder(final InputStream is) { + public Decoder getDecoder(final InputStream is, boolean decodeMemstoreTS) { return new KeyValueDecoder(is); } @Override - public Encoder getEncoder(OutputStream os) { + public Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS) { return new KeyValueEncoder(os); } } \ No newline at end of file Index: hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java =================================================================== --- hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java (revision 1499203) +++ hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java (working copy) @@ -47,7 +47,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); Codec codec = new CellCodec(); - Codec.Encoder encoder = codec.getEncoder(dos); + Codec.Encoder encoder = codec.getEncoder(dos, true); encoder.flush(); dos.close(); long offset = cos.getCount(); @@ -55,7 +55,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = codec.getDecoder(dis); + Codec.Decoder decoder = codec.getDecoder(dis, true); assertFalse(decoder.advance()); dis.close(); assertEquals(0, cis.getCount()); @@ -67,9 +67,10 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); Codec codec = new CellCodec(); - Codec.Encoder encoder = codec.getEncoder(dos); + Codec.Encoder encoder = codec.getEncoder(dos, true); final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); + kv.setMvccVersion(Long.MAX_VALUE); encoder.write(kv); encoder.flush(); dos.close(); @@ -77,7 +78,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = codec.getDecoder(dis); + Codec.Decoder decoder = codec.getDecoder(dis, true); assertTrue(decoder.advance()); // First read should pull in the KV // Second read should trip over the end-of-stream marker and return false assertFalse(decoder.advance()); @@ -91,7 +92,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); Codec codec = new CellCodec(); - Codec.Encoder encoder = codec.getEncoder(dos); + Codec.Encoder encoder = codec.getEncoder(dos, true); final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("1")); final KeyValue kv2 = @@ -107,7 +108,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = codec.getDecoder(dis); + Codec.Decoder decoder = codec.getDecoder(dis, true); assertTrue(decoder.advance()); Cell c = decoder.current(); assertTrue(CellComparator.equals(c, kv1)); Index: hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodec.java =================================================================== --- hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodec.java (revision 1499203) +++ hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodec.java (working copy) @@ -46,7 +46,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); KeyValueCodec kvc = new KeyValueCodec(); - Codec.Encoder encoder = kvc.getEncoder(dos); + Codec.Encoder encoder = kvc.getEncoder(dos, false); encoder.flush(); dos.close(); long offset = cos.getCount(); @@ -54,7 +54,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = kvc.getDecoder(dis); + Codec.Decoder decoder = kvc.getDecoder(dis, false); assertFalse(decoder.advance()); dis.close(); assertEquals(0, cis.getCount()); @@ -66,7 +66,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); KeyValueCodec kvc = new KeyValueCodec(); - Codec.Encoder encoder = kvc.getEncoder(dos); + Codec.Encoder encoder = kvc.getEncoder(dos, false); final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); final long length = kv.getLength() + Bytes.SIZEOF_INT; @@ -78,7 +78,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = kvc.getDecoder(dis); + Codec.Decoder decoder = kvc.getDecoder(dis, false); assertTrue(decoder.advance()); // First read should pull in the KV // Second read should trip over the end-of-stream marker and return false assertFalse(decoder.advance()); @@ -92,7 +92,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); KeyValueCodec kvc = new KeyValueCodec(); - Codec.Encoder encoder = kvc.getEncoder(dos); + Codec.Encoder encoder = kvc.getEncoder(dos, false); final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("1")); final KeyValue kv2 = @@ -110,7 +110,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = kvc.getDecoder(dis); + Codec.Decoder decoder = kvc.getDecoder(dis, false); assertTrue(decoder.advance()); KeyValue kv = (KeyValue)decoder.current(); assertTrue(kv1.equals(kv)); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java (revision 1499203) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java (working copy) @@ -80,12 +80,12 @@ } @Override - public Decoder getDecoder(InputStream is) { + public Decoder getDecoder(InputStream is, boolean decodeMemstoreTS) { return new MessageDecoder(is); } @Override - public Encoder getEncoder(OutputStream os) { + public Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS) { return new MessageEncoder(os); } } \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (revision 1499203) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (working copy) @@ -176,7 +176,7 @@ @Override protected void initAfterCompression() throws IOException { WALCellCodec codec = WALCellCodec.create(this.conf, this.compressionContext); - this.cellDecoder = codec.getDecoder(this.inputStream); + this.cellDecoder = codec.getDecoder(this.inputStream, false); if (this.hasCompression) { this.byteStringUncompressor = codec.getByteStringUncompressor(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (revision 1499203) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (working copy) @@ -81,7 +81,7 @@ WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output); WALCellCodec codec = WALCellCodec.create(conf, this.compressionContext); - this.cellEncoder = codec.getEncoder(this.output); + this.cellEncoder = codec.getEncoder(this.output, false); if (doCompress) { this.compressor = codec.getByteStringCompressor(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (revision 1499203) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (working copy) @@ -264,13 +264,13 @@ } @Override - public Decoder getDecoder(InputStream is) { + public Decoder getDecoder(InputStream is, boolean decodeMemstoreTS) { return (compression == null) ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression); } @Override - public Encoder getEncoder(OutputStream os) { + public Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS) { return (compression == null) ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java (revision 1499203) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java (working copy) @@ -102,18 +102,18 @@ } static void doCodec(final Codec codec, final Cell [] cells, final int cycles, final int count, - final int initialBufferSize) + final int initialBufferSize, boolean memstoreTS) throws IOException { byte [] bytes = null; Cell [] cellsDecoded = null; for (int i = 0; i < cycles; i++) { ByteArrayOutputStream baos = new ByteArrayOutputStream(initialBufferSize); - Codec.Encoder encoder = codec.getEncoder(baos); + Codec.Encoder encoder = codec.getEncoder(baos, memstoreTS); bytes = runEncoderTest(i, initialBufferSize, baos, encoder, cells); } for (int i = 0; i < cycles; i++) { ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - Codec.Decoder decoder = codec.getDecoder(bais); + Codec.Decoder decoder = codec.getDecoder(bais, memstoreTS); cellsDecoded = CodecPerformance.runDecoderTest(i, count, decoder); } verifyCells(cells, cellsDecoded); @@ -130,8 +130,8 @@ int initialBufferSize = 2 * size; // Multiply by 2 to ensure we don't have to grow buffer // Test KeyValue codec. - doCodec(new KeyValueCodec(), cells, cycles, count, initialBufferSize); - doCodec(new CellCodec(), cells, cycles, count, initialBufferSize); - doCodec(new MessageCodec(), cells, cycles, count, initialBufferSize); + doCodec(new KeyValueCodec(), cells, cycles, count, initialBufferSize, false); + doCodec(new CellCodec(), cells, cycles, count, initialBufferSize, true); + doCodec(new MessageCodec(), cells, cycles, count, initialBufferSize, false); } } \ No newline at end of file Index: hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java (revision 1499203) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java (working copy) @@ -52,14 +52,14 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); MessageCodec cmc = new MessageCodec(); - Codec.Encoder encoder = cmc.getEncoder(dos); + Codec.Encoder encoder = cmc.getEncoder(dos, false); encoder.flush(); dos.close(); long offset = cos.getCount(); assertEquals(0, offset); CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = cmc.getDecoder(dis); + Codec.Decoder decoder = cmc.getDecoder(dis, false); assertFalse(decoder.advance()); dis.close(); assertEquals(0, cis.getCount()); @@ -71,7 +71,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); MessageCodec cmc = new MessageCodec(); - Codec.Encoder encoder = cmc.getEncoder(dos); + Codec.Encoder encoder = cmc.getEncoder(dos, false); final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); encoder.write(kv); @@ -80,7 +80,7 @@ long offset = cos.getCount(); CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = cmc.getDecoder(dis); + Codec.Decoder decoder = cmc.getDecoder(dis, false); assertTrue(decoder.advance()); // First read should pull in the KV assertFalse(decoder.advance()); // Second read should trip over the end-of-stream marker and return false dis.close(); @@ -93,7 +93,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); MessageCodec cmc = new MessageCodec(); - Codec.Encoder encoder = cmc.getEncoder(dos); + Codec.Encoder encoder = cmc.getEncoder(dos, false); final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("1")); final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"), Bytes.toBytes("2")); final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"), Bytes.toBytes("3")); @@ -105,7 +105,7 @@ long offset = cos.getCount(); CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = cmc.getDecoder(dis); + Codec.Decoder decoder = cmc.getDecoder(dis, false); assertTrue(decoder.advance()); Cell c = decoder.current(); assertTrue(CellComparator.equals(c, kv1));