diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index 79eae20..f7562af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -106,6 +106,23 @@ public class Delete extends Mutation implements Comparable { * @param rowArray We make a local copy of this passed in row. * @param rowOffset * @param rowLength + */ + public Delete(final byte [] rowArray, final int rowOffset, final int rowLength) { + this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP); + } + + /** + * Create a Delete operation for the specified row and timestamp.

+ * + * If no further operations are done, this will delete all columns in all + * families of the specified row with a timestamp less than or equal to the + * specified timestamp.

+ * + * This timestamp is ONLY used for a delete row operation. If specifying + * families or columns, you must specify each timestamp individually. + * @param rowArray We make a local copy of this passed in row. + * @param rowOffset + * @param rowLength * @param ts maximum version timestamp (only for delete row) */ public Delete(final byte [] rowArray, final int rowOffset, final int rowLength, long ts) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java index be028c6..1e388a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java @@ -70,6 +70,17 @@ public class Put extends Mutation implements HeapSize, Comparable { * @param rowLength * @param ts */ + public Put(byte [] rowArray, int rowOffset, int rowLength) { + this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP); + } + + /** + * We make a copy of the passed in row key to keep local. + * @param rowArray + * @param rowOffset + * @param rowLength + * @param ts + */ public Put(byte [] rowArray, int rowOffset, int rowLength, long ts) { checkRow(rowArray, rowOffset, rowLength); this.row = Bytes.copy(rowArray, rowOffset, rowLength); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 13b55a1..4c48948 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -24,18 +24,20 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -47,14 +49,12 @@ import com.google.common.base.Preconditions; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; -import com.google.protobuf.TextFormat; /** * Utility to help ipc'ing. */ class IPCUtil { public static final Log LOG = LogFactory.getLog(IPCUtil.class); - private final int cellBlockBuildingInitialBufferSize; /** * How much we think the decompressor will expand the original compressed content. */ @@ -64,32 +64,50 @@ class IPCUtil { IPCUtil(final Configuration conf) { super(); this.conf = conf; - this.cellBlockBuildingInitialBufferSize = - conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024); this.cellBlockDecompressionMultiplier = conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); } /** - * Build a cell block using passed in codec + * Puts CellScanner Cells into a cell block using passed in codec and/or + * compressor. * @param codec * @param compressor - * @Param cells - * @return Null or byte buffer filled with passed-in Cells encoded using passed in - * codec; the returned buffer has been flipped and is ready for - * reading. Use limit to find total size. + * @Param cellScanner + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in codec and/or compressor; the returned buffer has been + * flipped and is ready for reading. Use limit to find total size. * @throws IOException */ @SuppressWarnings("resource") - ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, - final CellScanner cells) + ByteBuffer jailer(final Codec codec, final CompressionCodec compressor, + final CellScanner cellScanner) throws IOException { - if (cells == null) return null; - // TOOD: Reuse buffers? - // Presizing doesn't work because can't tell what size will be when serialized. - // BBOS will resize itself. - ByteBufferOutputStream baos = - new ByteBufferOutputStream(this.cellBlockBuildingInitialBufferSize); + if (cellScanner == null) return null; + int bufferSize = 0; + List cells = null; + // See if CellScanner implements HeapSize. If it does, use this to estimate initial buffersize + // for the cellblock. + long longSize = (cellScanner instanceof HeapSize)? ((HeapSize)cellScanner).heapSize(): -1; + if (longSize != -1) { + // Just make sure we don't have a size bigger than an int. + if (longSize > Integer.MAX_VALUE) { + throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); + } + bufferSize = (int)longSize; + } else { + // The CellScanner does NOT implement HeapSize. Need to get an estimate of what is in + // CellScanner. Its ugly but iterate it and ask each Cell for an estimate of its size. + // We can't re-iterate a cell scanner so need to save off seen Cells to iterate later when + // serializing. TODO: Verify it is better to do this rather than just let the backing + // buffer resize. + cells = new ArrayList(); + while(cellScanner.advance()) { + bufferSize += CellUtil.estimatedSizeOf(cellScanner.current()); + cells.add(cellScanner.current()); + } + } + ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); OutputStream os = baos; Compressor poolCompressor = null; try { @@ -99,8 +117,11 @@ class IPCUtil { os = compressor.createOutputStream(os, poolCompressor); } Codec.Encoder encoder = codec.getEncoder(os); - while (cells.advance()) { - encoder.write(cells.current()); + if (cells != null) { + // cells will be non-null if we iterated the CellScanner already. + for (Cell cell: cells) encoder.write(cell); + } else { + while (cellScanner.advance()) encoder.write(cellScanner.current()); } encoder.flush(); } finally { @@ -108,9 +129,8 @@ class IPCUtil { if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); } if (LOG.isTraceEnabled()) { - if (this.cellBlockBuildingInitialBufferSize < baos.size()) { - LOG.trace("Buffer grew from " + this.cellBlockBuildingInitialBufferSize + - " to " + baos.size()); + if (bufferSize < baos.size()) { + LOG.trace("Buffer grew from estimated bufferSize=" + bufferSize + " to " + baos.size()); } } return baos.getByteBuffer(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 108c6d0..8b56ed4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -978,7 +978,7 @@ public class RpcClient { } builder.setMethodName(call.md.getName()); builder.setRequestParam(call.param != null); - ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); + ByteBuffer cellBlock = ipcUtil.jailer(this.codec, this.compressor, call.cells); if (cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); cellBlockBuilder.setLength(cellBlock.limit()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index bb25111..6b7d62a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -1277,4 +1277,4 @@ public final class RequestConverter { } return builder.build(); } -} +} \ No newline at end of file diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 47bee96..4890e52 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -58,7 +58,7 @@ public class TestIPCUtil { throws IOException { final int count = 10; Cell [] cells = getCells(count); - ByteBuffer bb = this.util.buildCellBlock(codec, compressor, + ByteBuffer bb = this.util.jailer(codec, compressor, CellUtil.createCellScanner(Arrays.asList(cells).iterator())); CellScanner scanner = this.util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index c677a0d..8fe9a20 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -27,7 +27,9 @@ import java.util.NavigableMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.Bytes; /** * Utility methods helpful slinging {@link Cell} instances. @@ -242,4 +244,39 @@ public final class CellUtil { } }; } -} + + /** + * @param left + * @param right + * @return True if the rows in left and right Cells match + */ + public static boolean matchingRow(final Cell left, final Cell right) { + return Bytes.equals(left.getRowArray(), left.getRowOffset(), left.getRowLength(), + right.getRowArray(), right.getRowOffset(), right.getRowLength()); + } + + /** + * @return True if a delete type, a {@link KeyValue.Type#Delete} or + * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn} + * KeyValue type. + */ + public static boolean isDelete(final Cell cell) { + return KeyValue.isDelete(cell.getTypeByte()); + } + + /** + * @param cell + * @return Estimate of the cell size. + */ + public static int estimatedSizeOf(final Cell cell) { + // TODO: Should we add to Cell a sizeOf? Would it help? Does it make sense if Cell is + // prefix encoded or compressed? + return cell.getRowLength() + cell.getFamilyLength() + cell.getQualifierLength() + + cell.getValueLength() + + // Use the KeyValue's infrastructure size presuming that another implementation would have + // same basic cost. + KeyValue.KEY_INFRASTRUCTURE_SIZE + + // Add a few more arbitrary bytes -- 8 in this case. + 8; + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index a589cce..e32163f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java index 10ff57c..23b172d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java @@ -46,5 +46,4 @@ public interface HeapSize { * count of payload and hosting object sizings. */ public long heapSize(); - -} +} \ No newline at end of file diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index da5d121..a3bd954 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -9894,6 +9894,10 @@ public final class AdminProtos { java.util.List getKeyValueBytesList(); int getKeyValueBytesCount(); com.google.protobuf.ByteString getKeyValueBytes(int index); + + // optional int32 associatedCellCount = 3; + boolean hasAssociatedCellCount(); + int getAssociatedCellCount(); } public static final class WALEntry extends com.google.protobuf.GeneratedMessage @@ -9951,9 +9955,20 @@ public final class AdminProtos { return keyValueBytes_.get(index); } + // optional int32 associatedCellCount = 3; + public static final int ASSOCIATEDCELLCOUNT_FIELD_NUMBER = 3; + private int associatedCellCount_; + public boolean hasAssociatedCellCount() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getAssociatedCellCount() { + return associatedCellCount_; + } + private void initFields() { key_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.getDefaultInstance(); keyValueBytes_ = java.util.Collections.emptyList();; + associatedCellCount_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -9981,6 +9996,9 @@ public final class AdminProtos { for (int i = 0; i < keyValueBytes_.size(); i++) { output.writeBytes(2, keyValueBytes_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeInt32(3, associatedCellCount_); + } getUnknownFields().writeTo(output); } @@ -10003,6 +10021,10 @@ public final class AdminProtos { size += dataSize; size += 1 * getKeyValueBytesList().size(); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, associatedCellCount_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10033,6 +10055,11 @@ public final class AdminProtos { } result = result && getKeyValueBytesList() .equals(other.getKeyValueBytesList()); + result = result && (hasAssociatedCellCount() == other.hasAssociatedCellCount()); + if (hasAssociatedCellCount()) { + result = result && (getAssociatedCellCount() + == other.getAssociatedCellCount()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10050,6 +10077,10 @@ public final class AdminProtos { hash = (37 * hash) + KEYVALUEBYTES_FIELD_NUMBER; hash = (53 * hash) + getKeyValueBytesList().hashCode(); } + if (hasAssociatedCellCount()) { + hash = (37 * hash) + ASSOCIATEDCELLCOUNT_FIELD_NUMBER; + hash = (53 * hash) + getAssociatedCellCount(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -10175,6 +10206,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000001); keyValueBytes_ = java.util.Collections.emptyList();; bitField0_ = (bitField0_ & ~0x00000002); + associatedCellCount_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -10226,6 +10259,10 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); } result.keyValueBytes_ = keyValueBytes_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.associatedCellCount_ = associatedCellCount_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10255,6 +10292,9 @@ public final class AdminProtos { } onChanged(); } + if (other.hasAssociatedCellCount()) { + setAssociatedCellCount(other.getAssociatedCellCount()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -10308,6 +10348,11 @@ public final class AdminProtos { keyValueBytes_.add(input.readBytes()); break; } + case 24: { + bitField0_ |= 0x00000004; + associatedCellCount_ = input.readInt32(); + break; + } } } } @@ -10455,6 +10500,27 @@ public final class AdminProtos { return this; } + // optional int32 associatedCellCount = 3; + private int associatedCellCount_ ; + public boolean hasAssociatedCellCount() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public int getAssociatedCellCount() { + return associatedCellCount_; + } + public Builder setAssociatedCellCount(int value) { + bitField0_ |= 0x00000004; + associatedCellCount_ = value; + onChanged(); + return this; + } + public Builder clearAssociatedCellCount() { + bitField0_ = (bitField0_ & ~0x00000004); + associatedCellCount_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALEntry) } @@ -15359,41 +15425,41 @@ public final class AdminProtos { "gionsRequest\022!\n\007regionA\030\001 \002(\0132\020.RegionSp" + "ecifier\022!\n\007regionB\030\002 \002(\0132\020.RegionSpecifi" + "er\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024MergeRegi" + - "onsResponse\"7\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W", - "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\"4\n\030Replicat" + - "eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" + - "ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" + - "ALWriterRequest\".\n\025RollWALWriterResponse" + - "\022\025\n\rregionToFlush\030\001 \003(\014\"#\n\021StopServerReq" + - "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" + - "se\"\026\n\024GetServerInfoRequest\"@\n\nServerInfo" + - "\022\037\n\nserverName\030\001 \002(\0132\013.ServerName\022\021\n\tweb" + - "uiPort\030\002 \001(\r\"8\n\025GetServerInfoResponse\022\037\n" + - "\nserverInfo\030\001 \002(\0132\013.ServerInfo2\337\006\n\014Admin", - "Service\022>\n\rgetRegionInfo\022\025.GetRegionInfo" + - "Request\032\026.GetRegionInfoResponse\022;\n\014getSt" + - "oreFile\022\024.GetStoreFileRequest\032\025.GetStore" + - "FileResponse\022D\n\017getOnlineRegion\022\027.GetOnl" + - "ineRegionRequest\032\030.GetOnlineRegionRespon" + - "se\0225\n\nopenRegion\022\022.OpenRegionRequest\032\023.O" + - "penRegionResponse\0228\n\013closeRegion\022\023.Close" + - "RegionRequest\032\024.CloseRegionResponse\0228\n\013f" + - "lushRegion\022\023.FlushRegionRequest\032\024.FlushR" + - "egionResponse\0228\n\013splitRegion\022\023.SplitRegi", - "onRequest\032\024.SplitRegionResponse\022>\n\rcompa" + - "ctRegion\022\025.CompactRegionRequest\032\026.Compac" + - "tRegionResponse\022;\n\014mergeRegions\022\024.MergeR" + - "egionsRequest\032\025.MergeRegionsResponse\022J\n\021" + - "replicateWALEntry\022\031.ReplicateWALEntryReq" + - "uest\032\032.ReplicateWALEntryResponse\022\'\n\006repl" + - "ay\022\r.MultiRequest\032\016.MultiResponse\022>\n\rrol" + - "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + - "WALWriterResponse\022>\n\rgetServerInfo\022\025.Get" + - "ServerInfoRequest\032\026.GetServerInfoRespons", - "e\0225\n\nstopServer\022\022.StopServerRequest\032\023.St" + - "opServerResponseBA\n*org.apache.hadoop.hb" + - "ase.protobuf.generatedB\013AdminProtosH\001\210\001\001" + - "\240\001\001" + "onsResponse\"T\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W", + "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\022\033\n\023associat" + + "edCellCount\030\003 \001(\005\"4\n\030ReplicateWALEntryRe" + + "quest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Repli" + + "cateWALEntryResponse\"\026\n\024RollWALWriterReq" + + "uest\".\n\025RollWALWriterResponse\022\025\n\rregionT" + + "oFlush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006rea" + + "son\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetSe" + + "rverInfoRequest\"@\n\nServerInfo\022\037\n\nserverN" + + "ame\030\001 \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(" + + "\r\"8\n\025GetServerInfoResponse\022\037\n\nserverInfo", + "\030\001 \002(\0132\013.ServerInfo2\337\006\n\014AdminService\022>\n\r" + + "getRegionInfo\022\025.GetRegionInfoRequest\032\026.G" + + "etRegionInfoResponse\022;\n\014getStoreFile\022\024.G" + + "etStoreFileRequest\032\025.GetStoreFileRespons" + + "e\022D\n\017getOnlineRegion\022\027.GetOnlineRegionRe" + + "quest\032\030.GetOnlineRegionResponse\0225\n\nopenR" + + "egion\022\022.OpenRegionRequest\032\023.OpenRegionRe" + + "sponse\0228\n\013closeRegion\022\023.CloseRegionReque" + + "st\032\024.CloseRegionResponse\0228\n\013flushRegion\022" + + "\023.FlushRegionRequest\032\024.FlushRegionRespon", + "se\0228\n\013splitRegion\022\023.SplitRegionRequest\032\024" + + ".SplitRegionResponse\022>\n\rcompactRegion\022\025." + + "CompactRegionRequest\032\026.CompactRegionResp" + + "onse\022;\n\014mergeRegions\022\024.MergeRegionsReque" + + "st\032\025.MergeRegionsResponse\022J\n\021replicateWA" + + "LEntry\022\031.ReplicateWALEntryRequest\032\032.Repl" + + "icateWALEntryResponse\022\'\n\006replay\022\r.MultiR" + + "equest\032\016.MultiResponse\022>\n\rrollWALWriter\022" + + "\025.RollWALWriterRequest\032\026.RollWALWriterRe" + + "sponse\022>\n\rgetServerInfo\022\025.GetServerInfoR", + "equest\032\026.GetServerInfoResponse\0225\n\nstopSe" + + "rver\022\022.StopServerRequest\032\023.StopServerRes" + + "ponseBA\n*org.apache.hadoop.hbase.protobu" + + "f.generatedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -15557,7 +15623,7 @@ public final class AdminProtos { internal_static_WALEntry_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALEntry_descriptor, - new java.lang.String[] { "Key", "KeyValueBytes", }, + new java.lang.String[] { "Key", "KeyValueBytes", "AssociatedCellCount", }, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.class, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.Builder.class); internal_static_ReplicateWALEntryRequest_descriptor = diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index 733eadf..75b11dd 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -161,14 +161,18 @@ message MergeRegionsResponse { // Protocol buffer version of WAL for replication message WALEntry { required WALKey key = 1; + // Following may be null if the KVs/Cells are carried along the side in a cellblock (See + // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null + // and associatedCellCount has count of Cells associated w/ this WALEntry repeated bytes keyValueBytes = 2; + // If Cell data is carried alongside in a cellblock, this is count of Cells in the cellblock. + optional int32 associatedCellCount = 3; } /** * Replicates the given entries. The guarantee is that the given entries * will be durable on the slave cluster if this method returns without - * any exception. - * hbase.replication has to be set to true for this to work. + * any exception. hbase.replication has to be set to true for this to work. */ message ReplicateWALEntryRequest { repeated WALEntry entry = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index dbafafb..e79ab17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -349,7 +349,7 @@ public class RpcServer implements RpcServerInterface { headerBuilder.setException(exceptionBuilder.build()); } ByteBuffer cellBlock = - ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells); + ipcUtil.jailer(this.connection.codec, this.connection.compressionCodec, cells); if (cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 29f0154..752cd4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -20,26 +20,33 @@ package org.apache.hadoop.hbase.protobuf; -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.mortbay.log.Log; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; public class ReplicationProtbufUtil { /** @@ -81,10 +88,11 @@ public class ReplicationProtbufUtil { */ public static void replicateWALEntry(final AdminService.BlockingInterface admin, final HLog.Entry[] entries) throws IOException { - AdminProtos.ReplicateWALEntryRequest request = + Pair p = buildReplicateWALEntryRequest(entries); try { - admin.replicateWALEntry(null, request); + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -94,10 +102,14 @@ public class ReplicationProtbufUtil { * Create a new ReplicateWALEntryRequest from a list of HLog entries * * @param entries the HLog entries to be replicated - * @return a ReplicateWALEntryRequest + * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values + * found. */ - public static AdminProtos.ReplicateWALEntryRequest + public static Pair buildReplicateWALEntryRequest(final HLog.Entry[] entries) { + // Accumulate all the KVs seen in here. + List> allkvs = new ArrayList>(entries.length); + int size = 0; WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder(); AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); AdminProtos.ReplicateWALEntryRequest.Builder builder = @@ -128,13 +140,58 @@ public class ReplicationProtbufUtil { keyBuilder.addScopes(scopeBuilder.build()); } } - List keyValues = edit.getKeyValues(); - for (KeyValue value: keyValues) { - entryBuilder.addKeyValueBytes(ByteString.copyFrom( - value.getBuffer(), value.getOffset(), value.getLength())); - } + List kvs = edit.getKeyValues(); + // Add up the size. It is used later serializing out the kvs. + for (KeyValue kv: kvs) size += kv.getLength(); + // Collect up the kvs + allkvs.add(kvs); + // Write out how many kvs associated with this entry. + entryBuilder.setAssociatedCellCount(kvs.size()); builder.addEntry(entryBuilder.build()); } - return builder.build(); + return new Pair(builder.build(), + getCellScanner(allkvs, size)); + } + + /** + * Define this new Interface here so can use it below. + */ + interface SizedCellScanner extends CellScanner, HeapSize {} + + /** + * @param cells + * @return cells packaged as a CellScanner + */ + static CellScanner getCellScanner(final List> cells, final int size) { + return new SizedCellScanner() { + private final Iterator> entries = cells.iterator(); + private Iterator currentIterator = null; + private Cell currentCell; + + @Override + public Cell current() { + return this.currentCell; + } + + @Override + public boolean advance() { + if (this.currentIterator == null) { + if (!this.entries.hasNext()) return false; + this.currentIterator = this.entries.next().iterator(); + } + if (this.currentIterator.hasNext()) { + this.currentCell = this.currentIterator.next(); + return true; + } + this.currentCell = null; + this.currentIterator = null; + return advance(); + } + + @Override + public long heapSize() { + return size; + } + }; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 334754a..ffe88a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2214,8 +2214,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa conf, server, fs, logDir, oldLogDir); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; - } - else { + } else { server.replicationSourceHandler = (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, fs, logDir, oldLogDir); @@ -3710,15 +3709,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, - final ReplicateWALEntryRequest request) throws ServiceException { + final ReplicateWALEntryRequest request) + throws ServiceException { try { if (replicationSinkHandler != null) { checkOpen(); requestCount.increment(); - HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList()); - if (entries != null && entries.length > 0) { - replicationSinkHandler.replicateLogEntries(entries); - } + this.replicationSinkHandler.replicateLogEntries(request.getEntryList(), + ((PayloadCarryingRpcController)controller).cellScanner()); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java index 754cff0..2b99616 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java @@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.regionserver.wal.HLog; /** @@ -30,11 +33,17 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; */ @InterfaceAudience.Private public interface ReplicationSinkService extends ReplicationService { - - /** + /** * Carry on the list of log entries down to the sink * @param entries list of entries to replicate * @throws IOException */ public void replicateLogEntries(HLog.Entry[] entries) throws IOException; -} + /** + * Carry on the list of log entries down to the sink + * @param entries list of WALEntries to replicate + * @param cells Cells that the WALEntries refer to (if cells is non-null) + * @throws IOException + */ + public void replicateLogEntries(List entries, CellScanner cells) throws IOException; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index b908323..edc5c6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -30,10 +28,9 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; */ @InterfaceAudience.Private public interface ReplicationSourceService extends ReplicationService { - /** * Returns a WALObserver for the service. This is needed to * observe log rolls and log archival events. */ public WALActionsListener getWALActionsListener(); -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 7b84ccd..794421b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.Executors; @@ -33,10 +34,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -172,6 +175,18 @@ public class Replication implements WALActionsListener, } /** + * Carry on the list of log entries down to the sink + * @param entries list of entries to replicate + * @param cells + * @throws IOException + */ + public void replicateLogEntries(List entries, CellScanner cells) throws IOException { + if (this.replication) { + this.replicationSink.replicateEntries(entries, cells); + } + } + + /** * If replication is enabled and this cluster is a master, * it starts * @throws IOException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 67d921c..481d3ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -34,17 +34,24 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -108,17 +115,84 @@ public class ReplicationSink { } /** + * Replicate this array of entries directly into the local cluster using the native client. + * Like {@link #replicateEntries(org.apache.hadoop.hbase.regionserver.wal.HLog.Entry[])} only + * operates against raw protobuf type saving on a convertion from pb to pojo. + * + * @param entries + * @param cells + * @throws IOException + */ + public void replicateEntries(List entries, final CellScanner cells) throws IOException { + if (entries.isEmpty()) return; + if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner"); + // Very simple optimization where we batch sequences of rows going + // to the same table. + try { + long totalReplicated = 0; + // Map of table => list of Rows, we only want to flushCommits once per + // invocation of this method per table. + Map> rows = new TreeMap>(Bytes.BYTES_COMPARATOR); + for (WALEntry entry : entries) { + byte[] table = entry.getKey().getTableName().toByteArray(); + Cell previousCell = null; + Mutation m = null; + java.util.UUID uuid = toUUID(entry.getKey().getClusterId()); + int count = entry.getAssociatedCellCount(); + for (int i = 0; i < count; i++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); + } + Cell cell = cells.current(); + if (isNewRowOrType(previousCell, cell)) { + // Create new mutation + m = CellUtil.isDelete(cell)? + new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()): + new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + m.setClusterId(uuid); + addToMultiMap(rows, table, m); + } + if (CellUtil.isDelete(cell)) { + ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); + } else { + ((Put)m).add(KeyValueUtil.ensureKeyValue(cell)); + } + previousCell = cell; + } + totalReplicated++; + } + for (Entry> entry : rows.entrySet()) { + batch(entry.getKey(), entry.getValue()); + } + int size = entries.size(); + this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); + this.metrics.applyBatch(size); + this.totalReplicatedEdits.addAndGet(totalReplicated); + } catch (IOException ex) { + LOG.error("Unable to accept edit because:", ex); + throw ex; + } + } + + private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { + return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || + !CellUtil.matchingRow(previousCell, cell); + } + + private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { + return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); + } + + /** * Replicate this array of entries directly into the local cluster * using the native client. * * @param entries * @throws IOException */ - public void replicateEntries(HLog.Entry[] entries) - throws IOException { - if (entries.length == 0) { - return; - } + public void replicateEntries(HLog.Entry[] entries) throws IOException { + if (entries.length == 0) return; // Very simple optimization where we batch sequences of rows going // to the same table. try { @@ -134,7 +208,7 @@ public class ReplicationSink { KeyValue lastKV = null; List kvs = edit.getKeyValues(); for (KeyValue kv : kvs) { - if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { + if (isNewRowOrType(lastKV, kv)) { if (kv.isDelete()) { del = new Delete(kv.getRow()); del.setClusterId(entry.getKey().getClusterId()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fb15ce4..bf9266b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -177,6 +177,9 @@ public class ReplicationSource extends Thread new PriorityBlockingQueue( conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); + // TODO: This connection is replication specific or we should make it particular to + // replication and make replication specific settings such as compression or codec to use + // passing Cells. this.conn = HConnectionManager.getConnection(conf); this.zkHelper = manager.getRepZkWrapper(); this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); @@ -456,7 +459,6 @@ public class ReplicationSource extends Thread // Connect to peer cluster first, unless we have to stop while (this.isActive() && this.currentPeers.size() == 0) { - chooseSinks(); if (this.isActive() && this.currentPeers.size() == 0) { if (sleepForRetries("Waiting for peers", sleepMultiplier)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java new file mode 100644 index 0000000..6aea022 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.protobuf; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category(SmallTests.class) +public class TestReplicationProtobuf { + /** + * Little test to check we can basically convert list of a list of KVs into a CellScanner + * @throws IOException + */ + @Test + public void testGetCellScanner() throws IOException { + List a = new ArrayList(); + KeyValue akv = new KeyValue(Bytes.toBytes("a"), -1L); + a.add(akv); + // Add a few just to make it less regular. + a.add(new KeyValue(Bytes.toBytes("aa"), -1L)); + a.add(new KeyValue(Bytes.toBytes("aaa"), -1L)); + List b = new ArrayList(); + KeyValue bkv = new KeyValue(Bytes.toBytes("b"), -1L); + a.add(bkv); + List c = new ArrayList(); + KeyValue ckv = new KeyValue(Bytes.toBytes("c"), -1L); + c.add(ckv); + List> all = new ArrayList>(); + all.add(a); + all.add(b); + all.add(c); + CellScanner scanner = ReplicationProtbufUtil.getCellScanner(all, 0); + testAdvancetHasSameRow(scanner, akv); + // Skip over aa + scanner.advance(); + // Skip over aaa + scanner.advance(); + testAdvancetHasSameRow(scanner, bkv); + testAdvancetHasSameRow(scanner, ckv); + assertFalse(scanner.advance()); + } + + private void testAdvancetHasSameRow(CellScanner scanner, final KeyValue kv) throws IOException { + scanner.advance(); + assertTrue(Bytes.equals(scanner.current().getRowArray(), scanner.current().getRowOffset(), + scanner.current().getRowLength(), + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 61ae5b5..ddbd56d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -449,13 +449,14 @@ public class TestReplicationSmallTests extends TestReplicationBase { scan = new Scan(); + long start = System.currentTimeMillis(); for (int i = 0; i < NB_RETRIES; i++) { scanner = htable2.getScanner(scan); res = scanner.next(NB_ROWS_IN_BIG_BATCH); scanner.close(); if (res.length != NB_ROWS_IN_BIG_BATCH) { - if (i == NB_RETRIES-1) { + if (i == NB_RETRIES - 1) { int lastRow = -1; for (Result result : res) { int currentRow = Bytes.toInt(result.getRow()); @@ -465,8 +466,9 @@ public class TestReplicationSmallTests extends TestReplicationBase { lastRow = currentRow; } LOG.error("Last row: " + lastRow); - fail("Waited too much time for normal batch replication, " - + res.length + " instead of " + NB_ROWS_IN_BIG_BATCH); + fail("Waited too much time for normal batch replication, " + + res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" + + (System.currentTimeMillis() - start) + "ms"); } else { LOG.info("Only got " + res.length + " rows"); Thread.sleep(SLEEP_TIME);