Index: hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (revision 1499203) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (working copy) @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -29,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; /** * Utility methods helpful slinging {@link Cell} instances. @@ -128,6 +131,18 @@ return new KeyValue(row, family, qualifier, timestamp, KeyValue.Type.codeToType(type), value); } + + + public static Cell createCell(final byte[] row, final byte[] family, final byte[] qualifier, + final long timestamp, final byte type, final byte[] value, final long memstoreTS) { + // I need a Cell Factory here. Using KeyValue for now. TODO. + // TODO: Make a new Cell implementation that just carries these + // byte arrays. + KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp, + KeyValue.Type.codeToType(type), value); + keyValue.setMvccVersion(memstoreTS); + return keyValue; + } /** * @param cellScannerables @@ -283,4 +298,5 @@ // Serialization is probably preceded by a length (it is in the KeyValueCodec at least). Bytes.SIZEOF_INT; } + } \ No newline at end of file Index: hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java (revision 1499203) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java (working copy) @@ -28,13 +28,14 @@ /** * Basic Cell codec that just writes out all the individual elements of a Cell. Uses ints - * delimiting all lengths. Profligate. Needs tune up. Does not write the mvcc stamp. - * Use a different codec if you want that in the stream. + * delimiting all lengths. Profligate. Needs tune up. */ public class CellCodec implements Codec { static class CellEncoder extends BaseEncoder { - CellEncoder(final OutputStream out) { + private boolean encodeMemstoreTS; + CellEncoder(final OutputStream out, boolean encodeMemstoreTS) { super(out); + this.encodeMemstoreTS = encodeMemstoreTS; } @Override @@ -52,6 +53,10 @@ this.out.write(cell.getTypeByte()); // Value write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + // MvccVersion + if (this.encodeMemstoreTS) { + this.out.write(Bytes.toBytes(cell.getMvccVersion())); + } } /** @@ -69,8 +74,10 @@ } static class CellDecoder extends BaseDecoder { - public CellDecoder(final InputStream in) { + private boolean decodeMemstoreTS; + public CellDecoder(final InputStream in, boolean decodeMemstoreTS) { super(in); + this.decodeMemstoreTS = decodeMemstoreTS; } protected Cell parseCell() throws IOException { @@ -82,7 +89,18 @@ long timestamp = Bytes.toLong(longArray); byte type = (byte) this.in.read(); byte [] value = readByteArray(in); - return CellUtil.createCell(row, family, qualifier, timestamp, type, value); + // Read memstore version + if (this.decodeMemstoreTS) { + byte [] memstoreTSArray = new byte[Bytes.SIZEOF_LONG]; + IOUtils.readFully(this.in, memstoreTSArray); + long memstoreTS = Bytes.toLong(memstoreTSArray); + return CellUtil.createCell(row, family, qualifier, timestamp, type, value, memstoreTS); + //TODO : As part of HBASE-8151 we could ideally avoid reading the memstoreTS itself if we know that + // all memstoreTS are 0. But as we are operating on stream even if we do skip it is going to do a one + // byte read. So the readVLong itself would do that. + } else { + return CellUtil.createCell(row, family, qualifier, timestamp, type, value); + } } /** @@ -100,12 +118,12 @@ } @Override - public Decoder getDecoder(InputStream is) { - return new CellDecoder(is); + public Decoder getDecoder(InputStream is, boolean decodeMemstoreTS) { + return new CellDecoder(is, decodeMemstoreTS); } @Override - public Encoder getEncoder(OutputStream os) { - return new CellEncoder(os); + public Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS) { + return new CellEncoder(os, encodeMemstoreTS); } } \ No newline at end of file Index: hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java (revision 1499203) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java (working copy) @@ -46,6 +46,6 @@ */ public interface Decoder extends CellScanner {}; - Decoder getDecoder(InputStream is); - Encoder getEncoder(OutputStream os); + Decoder getDecoder(InputStream is, boolean decodeMemstoreTS); + Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS); } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java (revision 1499203) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java (working copy) @@ -71,12 +71,12 @@ * Implementation depends on {@link InputStream#available()} */ @Override - public Decoder getDecoder(final InputStream is) { + public Decoder getDecoder(final InputStream is, boolean decodeMemstoreTS) { return new KeyValueDecoder(is); } @Override - public Encoder getEncoder(OutputStream os) { + public Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS) { return new KeyValueEncoder(os); } } \ No newline at end of file Index: hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java =================================================================== --- hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java (revision 1499203) +++ hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java (working copy) @@ -47,7 +47,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); Codec codec = new CellCodec(); - Codec.Encoder encoder = codec.getEncoder(dos); + Codec.Encoder encoder = codec.getEncoder(dos, true); encoder.flush(); dos.close(); long offset = cos.getCount(); @@ -55,7 +55,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = codec.getDecoder(dis); + Codec.Decoder decoder = codec.getDecoder(dis, true); assertFalse(decoder.advance()); dis.close(); assertEquals(0, cis.getCount()); @@ -67,9 +67,10 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); Codec codec = new CellCodec(); - Codec.Encoder encoder = codec.getEncoder(dos); + Codec.Encoder encoder = codec.getEncoder(dos, true); final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); + kv.setMvccVersion(Long.MAX_VALUE); encoder.write(kv); encoder.flush(); dos.close(); @@ -77,7 +78,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = codec.getDecoder(dis); + Codec.Decoder decoder = codec.getDecoder(dis, true); assertTrue(decoder.advance()); // First read should pull in the KV // Second read should trip over the end-of-stream marker and return false assertFalse(decoder.advance()); @@ -91,7 +92,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); Codec codec = new CellCodec(); - Codec.Encoder encoder = codec.getEncoder(dos); + Codec.Encoder encoder = codec.getEncoder(dos, true); final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("1")); final KeyValue kv2 = @@ -107,7 +108,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = codec.getDecoder(dis); + Codec.Decoder decoder = codec.getDecoder(dis, true); assertTrue(decoder.advance()); Cell c = decoder.current(); assertTrue(CellComparator.equals(c, kv1)); Index: hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodec.java =================================================================== --- hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodec.java (revision 1499203) +++ hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodec.java (working copy) @@ -46,7 +46,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); KeyValueCodec kvc = new KeyValueCodec(); - Codec.Encoder encoder = kvc.getEncoder(dos); + Codec.Encoder encoder = kvc.getEncoder(dos, false); encoder.flush(); dos.close(); long offset = cos.getCount(); @@ -54,7 +54,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = kvc.getDecoder(dis); + Codec.Decoder decoder = kvc.getDecoder(dis, false); assertFalse(decoder.advance()); dis.close(); assertEquals(0, cis.getCount()); @@ -66,7 +66,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); KeyValueCodec kvc = new KeyValueCodec(); - Codec.Encoder encoder = kvc.getEncoder(dos); + Codec.Encoder encoder = kvc.getEncoder(dos, false); final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); final long length = kv.getLength() + Bytes.SIZEOF_INT; @@ -78,7 +78,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = kvc.getDecoder(dis); + Codec.Decoder decoder = kvc.getDecoder(dis, false); assertTrue(decoder.advance()); // First read should pull in the KV // Second read should trip over the end-of-stream marker and return false assertFalse(decoder.advance()); @@ -92,7 +92,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); KeyValueCodec kvc = new KeyValueCodec(); - Codec.Encoder encoder = kvc.getEncoder(dos); + Codec.Encoder encoder = kvc.getEncoder(dos, false); final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("1")); final KeyValue kv2 = @@ -110,7 +110,7 @@ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = kvc.getDecoder(dis); + Codec.Decoder decoder = kvc.getDecoder(dis, false); assertTrue(decoder.advance()); KeyValue kv = (KeyValue)decoder.current(); assertTrue(kv1.equals(kv)); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java (revision 1499203) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java (working copy) @@ -80,12 +80,12 @@ } @Override - public Decoder getDecoder(InputStream is) { + public Decoder getDecoder(InputStream is, boolean decodeMemstoreTS) { return new MessageDecoder(is); } @Override - public Encoder getEncoder(OutputStream os) { + public Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS) { return new MessageEncoder(os); } } \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (revision 1499203) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (working copy) @@ -176,7 +176,7 @@ @Override protected void initAfterCompression() throws IOException { WALCellCodec codec = WALCellCodec.create(this.conf, this.compressionContext); - this.cellDecoder = codec.getDecoder(this.inputStream); + this.cellDecoder = codec.getDecoder(this.inputStream, false); if (this.hasCompression) { this.byteStringUncompressor = codec.getByteStringUncompressor(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (revision 1499203) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (working copy) @@ -81,7 +81,7 @@ WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output); WALCellCodec codec = WALCellCodec.create(conf, this.compressionContext); - this.cellEncoder = codec.getEncoder(this.output); + this.cellEncoder = codec.getEncoder(this.output, false); if (doCompress) { this.compressor = codec.getByteStringCompressor(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (revision 1499203) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (working copy) @@ -264,13 +264,13 @@ } @Override - public Decoder getDecoder(InputStream is) { + public Decoder getDecoder(InputStream is, boolean decodeMemstoreTS) { return (compression == null) ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression); } @Override - public Encoder getEncoder(OutputStream os) { + public Encoder getEncoder(OutputStream os, boolean encodeMemstoreTS) { return (compression == null) ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java (revision 1499203) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java (working copy) @@ -102,18 +102,18 @@ } static void doCodec(final Codec codec, final Cell [] cells, final int cycles, final int count, - final int initialBufferSize) + final int initialBufferSize, boolean memstoreTS) throws IOException { byte [] bytes = null; Cell [] cellsDecoded = null; for (int i = 0; i < cycles; i++) { ByteArrayOutputStream baos = new ByteArrayOutputStream(initialBufferSize); - Codec.Encoder encoder = codec.getEncoder(baos); + Codec.Encoder encoder = codec.getEncoder(baos, memstoreTS); bytes = runEncoderTest(i, initialBufferSize, baos, encoder, cells); } for (int i = 0; i < cycles; i++) { ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - Codec.Decoder decoder = codec.getDecoder(bais); + Codec.Decoder decoder = codec.getDecoder(bais, memstoreTS); cellsDecoded = CodecPerformance.runDecoderTest(i, count, decoder); } verifyCells(cells, cellsDecoded); @@ -130,8 +130,8 @@ int initialBufferSize = 2 * size; // Multiply by 2 to ensure we don't have to grow buffer // Test KeyValue codec. - doCodec(new KeyValueCodec(), cells, cycles, count, initialBufferSize); - doCodec(new CellCodec(), cells, cycles, count, initialBufferSize); - doCodec(new MessageCodec(), cells, cycles, count, initialBufferSize); + doCodec(new KeyValueCodec(), cells, cycles, count, initialBufferSize, false); + doCodec(new CellCodec(), cells, cycles, count, initialBufferSize, true); + doCodec(new MessageCodec(), cells, cycles, count, initialBufferSize, false); } } \ No newline at end of file Index: hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java (revision 1499203) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java (working copy) @@ -52,14 +52,14 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); MessageCodec cmc = new MessageCodec(); - Codec.Encoder encoder = cmc.getEncoder(dos); + Codec.Encoder encoder = cmc.getEncoder(dos, false); encoder.flush(); dos.close(); long offset = cos.getCount(); assertEquals(0, offset); CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = cmc.getDecoder(dis); + Codec.Decoder decoder = cmc.getDecoder(dis, false); assertFalse(decoder.advance()); dis.close(); assertEquals(0, cis.getCount()); @@ -71,7 +71,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); MessageCodec cmc = new MessageCodec(); - Codec.Encoder encoder = cmc.getEncoder(dos); + Codec.Encoder encoder = cmc.getEncoder(dos, false); final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); encoder.write(kv); @@ -80,7 +80,7 @@ long offset = cos.getCount(); CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = cmc.getDecoder(dis); + Codec.Decoder decoder = cmc.getDecoder(dis, false); assertTrue(decoder.advance()); // First read should pull in the KV assertFalse(decoder.advance()); // Second read should trip over the end-of-stream marker and return false dis.close(); @@ -93,7 +93,7 @@ CountingOutputStream cos = new CountingOutputStream(baos); DataOutputStream dos = new DataOutputStream(cos); MessageCodec cmc = new MessageCodec(); - Codec.Encoder encoder = cmc.getEncoder(dos); + Codec.Encoder encoder = cmc.getEncoder(dos, false); final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("1")); final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"), Bytes.toBytes("2")); final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"), Bytes.toBytes("3")); @@ -105,7 +105,7 @@ long offset = cos.getCount(); CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray())); DataInputStream dis = new DataInputStream(cis); - Codec.Decoder decoder = cmc.getDecoder(dis); + Codec.Decoder decoder = cmc.getDecoder(dis, false); assertTrue(decoder.advance()); Cell c = decoder.current(); assertTrue(CellComparator.equals(c, kv1));