From 8740161a448cb87dedad60d03fa62f964c3f1bf9 Mon Sep 17 00:00:00 2001 From: "St.Ack" Date: Wed, 26 Jun 2013 10:14:43 -0700 Subject: [PATCH] HBASE-8737 [replication] Change replication RPC to use cell blocks Add constructors to Delete and Put so I can pass in row array, offset, length; helps when working with Cells. In IPCUtil, add calculating an estimate of needed buffer size. Means double iteration of all Cells; once to get size and then again to add to the cell block. Supposition is that its better doing this than let the buffer resize itself; to be verified. Add some util to CellUtil to help slinging Cells Minor cleanup of imports in KV. Add count of associated cells carried in cellblock to AdminProtos. Change replication sink so it can take raw pb version rather than expect pojo version; saves a transform. --- .../org/apache/hadoop/hbase/client/Delete.java | 17 +++ .../java/org/apache/hadoop/hbase/client/Put.java | 11 ++ .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java | 39 +++--- .../hadoop/hbase/protobuf/RequestConverter.java | 2 +- .../java/org/apache/hadoop/hbase/CellUtil.java | 39 +++++- .../java/org/apache/hadoop/hbase/KeyValue.java | 3 - .../hbase/protobuf/generated/AdminProtos.java | 138 +++++++++++++++------ hbase-protocol/src/main/protobuf/Admin.proto | 8 +- .../hbase/protobuf/ReplicationProtbufUtil.java | 84 +++++++++---- .../hadoop/hbase/regionserver/HRegionServer.java | 12 +- .../hbase/regionserver/ReplicationSinkService.java | 15 ++- .../replication/regionserver/Replication.java | 15 +++ .../replication/regionserver/ReplicationSink.java | 85 ++++++++++++- .../regionserver/ReplicationSource.java | 3 + .../hbase/protobuf/TestReplicationProtobuf.java | 78 ++++++++++++ 15 files changed, 449 insertions(+), 100 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index 79eae20..f7562af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -106,6 +106,23 @@ public class Delete extends Mutation implements Comparable { * @param rowArray We make a local copy of this passed in row. * @param rowOffset * @param rowLength + */ + public Delete(final byte [] rowArray, final int rowOffset, final int rowLength) { + this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP); + } + + /** + * Create a Delete operation for the specified row and timestamp.

+ * + * If no further operations are done, this will delete all columns in all + * families of the specified row with a timestamp less than or equal to the + * specified timestamp.

+ * + * This timestamp is ONLY used for a delete row operation. If specifying + * families or columns, you must specify each timestamp individually. + * @param rowArray We make a local copy of this passed in row. + * @param rowOffset + * @param rowLength * @param ts maximum version timestamp (only for delete row) */ public Delete(final byte [] rowArray, final int rowOffset, final int rowLength, long ts) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java index be028c6..1e388a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java @@ -70,6 +70,17 @@ public class Put extends Mutation implements HeapSize, Comparable { * @param rowLength * @param ts */ + public Put(byte [] rowArray, int rowOffset, int rowLength) { + this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP); + } + + /** + * We make a copy of the passed in row key to keep local. + * @param rowArray + * @param rowOffset + * @param rowLength + * @param ts + */ public Put(byte [] rowArray, int rowOffset, int rowLength, long ts) { checkRow(rowArray, rowOffset, rowLength); this.row = Bytes.copy(rowArray, rowOffset, rowLength); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 13b55a1..dc86433 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -24,18 +24,19 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -47,14 +48,12 @@ import com.google.common.base.Preconditions; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; -import com.google.protobuf.TextFormat; /** * Utility to help ipc'ing. */ class IPCUtil { public static final Log LOG = LogFactory.getLog(IPCUtil.class); - private final int cellBlockBuildingInitialBufferSize; /** * How much we think the decompressor will expand the original compressed content. */ @@ -64,8 +63,6 @@ class IPCUtil { IPCUtil(final Configuration conf) { super(); this.conf = conf; - this.cellBlockBuildingInitialBufferSize = - conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024); this.cellBlockDecompressionMultiplier = conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); } @@ -82,14 +79,19 @@ class IPCUtil { */ @SuppressWarnings("resource") ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, - final CellScanner cells) + final CellScanner cellScanner) throws IOException { - if (cells == null) return null; - // TOOD: Reuse buffers? - // Presizing doesn't work because can't tell what size will be when serialized. - // BBOS will resize itself. - ByteBufferOutputStream baos = - new ByteBufferOutputStream(this.cellBlockBuildingInitialBufferSize); + if (cellScanner == null) return null; + // Estimate size of needed buffer. Notion is that doing this sizing is less costly than + // having to grow the buffer repeatedly if we have sized it wrong. + int bufferSize = 0; + // Need to keep cells seen; can't redo a cellScanner + List cells = new ArrayList(); + while(cellScanner.advance()) { + bufferSize += CellUtil.estimatedSizeOf(cellScanner.current()); + cells.add(cellScanner.current()); + } + ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); OutputStream os = baos; Compressor poolCompressor = null; try { @@ -99,18 +101,15 @@ class IPCUtil { os = compressor.createOutputStream(os, poolCompressor); } Codec.Encoder encoder = codec.getEncoder(os); - while (cells.advance()) { - encoder.write(cells.current()); - } + for (Cell cell: cells) encoder.write(cell); encoder.flush(); } finally { os.close(); if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); } if (LOG.isTraceEnabled()) { - if (this.cellBlockBuildingInitialBufferSize < baos.size()) { - LOG.trace("Buffer grew from " + this.cellBlockBuildingInitialBufferSize + - " to " + baos.size()); + if (bufferSize < baos.size()) { + LOG.trace("Buffer grew from estimated bufferSize=" + bufferSize + " to " + baos.size()); } } return baos.getByteBuffer(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index bb25111..6b7d62a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -1277,4 +1277,4 @@ public final class RequestConverter { } return builder.build(); } -} +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index c677a0d..8fe9a20 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -27,7 +27,9 @@ import java.util.NavigableMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.Bytes; /** * Utility methods helpful slinging {@link Cell} instances. @@ -242,4 +244,39 @@ public final class CellUtil { } }; } -} + + /** + * @param left + * @param right + * @return True if the rows in left and right Cells match + */ + public static boolean matchingRow(final Cell left, final Cell right) { + return Bytes.equals(left.getRowArray(), left.getRowOffset(), left.getRowLength(), + right.getRowArray(), right.getRowOffset(), right.getRowLength()); + } + + /** + * @return True if a delete type, a {@link KeyValue.Type#Delete} or + * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn} + * KeyValue type. + */ + public static boolean isDelete(final Cell cell) { + return KeyValue.isDelete(cell.getTypeByte()); + } + + /** + * @param cell + * @return Estimate of the cell size. + */ + public static int estimatedSizeOf(final Cell cell) { + // TODO: Should we add to Cell a sizeOf? Would it help? Does it make sense if Cell is + // prefix encoded or compressed? + return cell.getRowLength() + cell.getFamilyLength() + cell.getQualifierLength() + + cell.getValueLength() + + // Use the KeyValue's infrastructure size presuming that another implementation would have + // same basic cost. + KeyValue.KEY_INFRASTRUCTURE_SIZE + + // Add a few more arbitrary bytes -- 8 in this case. + 8; + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index a589cce..e32163f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -37,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index da5d121..a3bd954 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -9894,6 +9894,10 @@ public final class AdminProtos { java.util.List getKeyValueBytesList(); int getKeyValueBytesCount(); com.google.protobuf.ByteString getKeyValueBytes(int index); + + // optional int32 associatedCellCount = 3; + boolean hasAssociatedCellCount(); + int getAssociatedCellCount(); } public static final class WALEntry extends com.google.protobuf.GeneratedMessage @@ -9951,9 +9955,20 @@ public final class AdminProtos { return keyValueBytes_.get(index); } + // optional int32 associatedCellCount = 3; + public static final int ASSOCIATEDCELLCOUNT_FIELD_NUMBER = 3; + private int associatedCellCount_; + public boolean hasAssociatedCellCount() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getAssociatedCellCount() { + return associatedCellCount_; + } + private void initFields() { key_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.getDefaultInstance(); keyValueBytes_ = java.util.Collections.emptyList();; + associatedCellCount_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -9981,6 +9996,9 @@ public final class AdminProtos { for (int i = 0; i < keyValueBytes_.size(); i++) { output.writeBytes(2, keyValueBytes_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeInt32(3, associatedCellCount_); + } getUnknownFields().writeTo(output); } @@ -10003,6 +10021,10 @@ public final class AdminProtos { size += dataSize; size += 1 * getKeyValueBytesList().size(); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, associatedCellCount_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10033,6 +10055,11 @@ public final class AdminProtos { } result = result && getKeyValueBytesList() .equals(other.getKeyValueBytesList()); + result = result && (hasAssociatedCellCount() == other.hasAssociatedCellCount()); + if (hasAssociatedCellCount()) { + result = result && (getAssociatedCellCount() + == other.getAssociatedCellCount()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10050,6 +10077,10 @@ public final class AdminProtos { hash = (37 * hash) + KEYVALUEBYTES_FIELD_NUMBER; hash = (53 * hash) + getKeyValueBytesList().hashCode(); } + if (hasAssociatedCellCount()) { + hash = (37 * hash) + ASSOCIATEDCELLCOUNT_FIELD_NUMBER; + hash = (53 * hash) + getAssociatedCellCount(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -10175,6 +10206,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000001); keyValueBytes_ = java.util.Collections.emptyList();; bitField0_ = (bitField0_ & ~0x00000002); + associatedCellCount_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -10226,6 +10259,10 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); } result.keyValueBytes_ = keyValueBytes_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.associatedCellCount_ = associatedCellCount_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10255,6 +10292,9 @@ public final class AdminProtos { } onChanged(); } + if (other.hasAssociatedCellCount()) { + setAssociatedCellCount(other.getAssociatedCellCount()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -10308,6 +10348,11 @@ public final class AdminProtos { keyValueBytes_.add(input.readBytes()); break; } + case 24: { + bitField0_ |= 0x00000004; + associatedCellCount_ = input.readInt32(); + break; + } } } } @@ -10455,6 +10500,27 @@ public final class AdminProtos { return this; } + // optional int32 associatedCellCount = 3; + private int associatedCellCount_ ; + public boolean hasAssociatedCellCount() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public int getAssociatedCellCount() { + return associatedCellCount_; + } + public Builder setAssociatedCellCount(int value) { + bitField0_ |= 0x00000004; + associatedCellCount_ = value; + onChanged(); + return this; + } + public Builder clearAssociatedCellCount() { + bitField0_ = (bitField0_ & ~0x00000004); + associatedCellCount_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALEntry) } @@ -15359,41 +15425,41 @@ public final class AdminProtos { "gionsRequest\022!\n\007regionA\030\001 \002(\0132\020.RegionSp" + "ecifier\022!\n\007regionB\030\002 \002(\0132\020.RegionSpecifi" + "er\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024MergeRegi" + - "onsResponse\"7\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W", - "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\"4\n\030Replicat" + - "eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" + - "ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" + - "ALWriterRequest\".\n\025RollWALWriterResponse" + - "\022\025\n\rregionToFlush\030\001 \003(\014\"#\n\021StopServerReq" + - "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" + - "se\"\026\n\024GetServerInfoRequest\"@\n\nServerInfo" + - "\022\037\n\nserverName\030\001 \002(\0132\013.ServerName\022\021\n\tweb" + - "uiPort\030\002 \001(\r\"8\n\025GetServerInfoResponse\022\037\n" + - "\nserverInfo\030\001 \002(\0132\013.ServerInfo2\337\006\n\014Admin", - "Service\022>\n\rgetRegionInfo\022\025.GetRegionInfo" + - "Request\032\026.GetRegionInfoResponse\022;\n\014getSt" + - "oreFile\022\024.GetStoreFileRequest\032\025.GetStore" + - "FileResponse\022D\n\017getOnlineRegion\022\027.GetOnl" + - "ineRegionRequest\032\030.GetOnlineRegionRespon" + - "se\0225\n\nopenRegion\022\022.OpenRegionRequest\032\023.O" + - "penRegionResponse\0228\n\013closeRegion\022\023.Close" + - "RegionRequest\032\024.CloseRegionResponse\0228\n\013f" + - "lushRegion\022\023.FlushRegionRequest\032\024.FlushR" + - "egionResponse\0228\n\013splitRegion\022\023.SplitRegi", - "onRequest\032\024.SplitRegionResponse\022>\n\rcompa" + - "ctRegion\022\025.CompactRegionRequest\032\026.Compac" + - "tRegionResponse\022;\n\014mergeRegions\022\024.MergeR" + - "egionsRequest\032\025.MergeRegionsResponse\022J\n\021" + - "replicateWALEntry\022\031.ReplicateWALEntryReq" + - "uest\032\032.ReplicateWALEntryResponse\022\'\n\006repl" + - "ay\022\r.MultiRequest\032\016.MultiResponse\022>\n\rrol" + - "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + - "WALWriterResponse\022>\n\rgetServerInfo\022\025.Get" + - "ServerInfoRequest\032\026.GetServerInfoRespons", - "e\0225\n\nstopServer\022\022.StopServerRequest\032\023.St" + - "opServerResponseBA\n*org.apache.hadoop.hb" + - "ase.protobuf.generatedB\013AdminProtosH\001\210\001\001" + - "\240\001\001" + "onsResponse\"T\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W", + "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\022\033\n\023associat" + + "edCellCount\030\003 \001(\005\"4\n\030ReplicateWALEntryRe" + + "quest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Repli" + + "cateWALEntryResponse\"\026\n\024RollWALWriterReq" + + "uest\".\n\025RollWALWriterResponse\022\025\n\rregionT" + + "oFlush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006rea" + + "son\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetSe" + + "rverInfoRequest\"@\n\nServerInfo\022\037\n\nserverN" + + "ame\030\001 \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(" + + "\r\"8\n\025GetServerInfoResponse\022\037\n\nserverInfo", + "\030\001 \002(\0132\013.ServerInfo2\337\006\n\014AdminService\022>\n\r" + + "getRegionInfo\022\025.GetRegionInfoRequest\032\026.G" + + "etRegionInfoResponse\022;\n\014getStoreFile\022\024.G" + + "etStoreFileRequest\032\025.GetStoreFileRespons" + + "e\022D\n\017getOnlineRegion\022\027.GetOnlineRegionRe" + + "quest\032\030.GetOnlineRegionResponse\0225\n\nopenR" + + "egion\022\022.OpenRegionRequest\032\023.OpenRegionRe" + + "sponse\0228\n\013closeRegion\022\023.CloseRegionReque" + + "st\032\024.CloseRegionResponse\0228\n\013flushRegion\022" + + "\023.FlushRegionRequest\032\024.FlushRegionRespon", + "se\0228\n\013splitRegion\022\023.SplitRegionRequest\032\024" + + ".SplitRegionResponse\022>\n\rcompactRegion\022\025." + + "CompactRegionRequest\032\026.CompactRegionResp" + + "onse\022;\n\014mergeRegions\022\024.MergeRegionsReque" + + "st\032\025.MergeRegionsResponse\022J\n\021replicateWA" + + "LEntry\022\031.ReplicateWALEntryRequest\032\032.Repl" + + "icateWALEntryResponse\022\'\n\006replay\022\r.MultiR" + + "equest\032\016.MultiResponse\022>\n\rrollWALWriter\022" + + "\025.RollWALWriterRequest\032\026.RollWALWriterRe" + + "sponse\022>\n\rgetServerInfo\022\025.GetServerInfoR", + "equest\032\026.GetServerInfoResponse\0225\n\nstopSe" + + "rver\022\022.StopServerRequest\032\023.StopServerRes" + + "ponseBA\n*org.apache.hadoop.hbase.protobu" + + "f.generatedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -15557,7 +15623,7 @@ public final class AdminProtos { internal_static_WALEntry_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALEntry_descriptor, - new java.lang.String[] { "Key", "KeyValueBytes", }, + new java.lang.String[] { "Key", "KeyValueBytes", "AssociatedCellCount", }, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.class, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.Builder.class); internal_static_ReplicateWALEntryRequest_descriptor = diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index 733eadf..75b11dd 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -161,14 +161,18 @@ message MergeRegionsResponse { // Protocol buffer version of WAL for replication message WALEntry { required WALKey key = 1; + // Following may be null if the KVs/Cells are carried along the side in a cellblock (See + // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null + // and associatedCellCount has count of Cells associated w/ this WALEntry repeated bytes keyValueBytes = 2; + // If Cell data is carried alongside in a cellblock, this is count of Cells in the cellblock. + optional int32 associatedCellCount = 3; } /** * Replicates the given entries. The guarantee is that the given entries * will be durable on the slave cluster if this method returns without - * any exception. - * hbase.replication has to be set to true for this to work. + * any exception. hbase.replication has to be set to true for this to work. */ message ReplicateWALEntryRequest { repeated WALEntry entry = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 29f0154..cb11688 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -20,26 +20,31 @@ package org.apache.hadoop.hbase.protobuf; -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; public class ReplicationProtbufUtil { /** @@ -81,10 +86,11 @@ public class ReplicationProtbufUtil { */ public static void replicateWALEntry(final AdminService.BlockingInterface admin, final HLog.Entry[] entries) throws IOException { - AdminProtos.ReplicateWALEntryRequest request = + Pair p = buildReplicateWALEntryRequest(entries); try { - admin.replicateWALEntry(null, request); + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -94,10 +100,13 @@ public class ReplicationProtbufUtil { * Create a new ReplicateWALEntryRequest from a list of HLog entries * * @param entries the HLog entries to be replicated - * @return a ReplicateWALEntryRequest + * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values + * found. */ - public static AdminProtos.ReplicateWALEntryRequest + public static Pair buildReplicateWALEntryRequest(final HLog.Entry[] entries) { + // Accumulate all the KVs seen in here. + List> allkvs = new ArrayList>(entries.length); WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder(); AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); AdminProtos.ReplicateWALEntryRequest.Builder builder = @@ -128,13 +137,46 @@ public class ReplicationProtbufUtil { keyBuilder.addScopes(scopeBuilder.build()); } } - List keyValues = edit.getKeyValues(); - for (KeyValue value: keyValues) { - entryBuilder.addKeyValueBytes(ByteString.copyFrom( - value.getBuffer(), value.getOffset(), value.getLength())); - } + List kvs = edit.getKeyValues(); + // Collect up the kvs + allkvs.add(kvs); + // Write out how many kvs associated with this entry. + entryBuilder.setAssociatedCellCount(kvs.size()); builder.addEntry(entryBuilder.build()); } - return builder.build(); + return new Pair(builder.build(), + getCellScanner(allkvs)); + } + + /** + * @param cells + * @return cells packaged as a CellScanner + */ + static CellScanner getCellScanner(final List> cells) { + return new CellScanner() { + private final Iterator> entries = cells.iterator(); + private Iterator currentIterator = null; + private Cell currentCell; + + @Override + public Cell current() { + return this.currentCell; + } + + @Override + public boolean advance() { + if (this.currentIterator == null) { + if (!this.entries.hasNext()) return false; + this.currentIterator = this.entries.next().iterator(); + } + if (this.currentIterator.hasNext()) { + this.currentCell = this.currentIterator.next(); + return true; + } + this.currentCell = null; + this.currentIterator = null; + return advance(); + } + }; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 334754a..ffe88a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2214,8 +2214,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa conf, server, fs, logDir, oldLogDir); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; - } - else { + } else { server.replicationSourceHandler = (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, fs, logDir, oldLogDir); @@ -3710,15 +3709,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa @Override @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, - final ReplicateWALEntryRequest request) throws ServiceException { + final ReplicateWALEntryRequest request) + throws ServiceException { try { if (replicationSinkHandler != null) { checkOpen(); requestCount.increment(); - HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList()); - if (entries != null && entries.length > 0) { - replicationSinkHandler.replicateLogEntries(entries); - } + this.replicationSinkHandler.replicateLogEntries(request.getEntryList(), + ((PayloadCarryingRpcController)controller).cellScanner()); } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java index 754cff0..2b99616 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java @@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.regionserver.wal.HLog; /** @@ -30,11 +33,17 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; */ @InterfaceAudience.Private public interface ReplicationSinkService extends ReplicationService { - - /** + /** * Carry on the list of log entries down to the sink * @param entries list of entries to replicate * @throws IOException */ public void replicateLogEntries(HLog.Entry[] entries) throws IOException; -} + /** + * Carry on the list of log entries down to the sink + * @param entries list of WALEntries to replicate + * @param cells Cells that the WALEntries refer to (if cells is non-null) + * @throws IOException + */ + public void replicateLogEntries(List entries, CellScanner cells) throws IOException; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 7b84ccd..794421b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.Executors; @@ -33,10 +34,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -172,6 +175,18 @@ public class Replication implements WALActionsListener, } /** + * Carry on the list of log entries down to the sink + * @param entries list of entries to replicate + * @param cells + * @throws IOException + */ + public void replicateLogEntries(List entries, CellScanner cells) throws IOException { + if (this.replication) { + this.replicationSink.replicateEntries(entries, cells); + } + } + + /** * If replication is enabled and this cluster is a master, * it starts * @throws IOException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 67d921c..a5dabeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -34,17 +34,24 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -108,17 +115,83 @@ public class ReplicationSink { } /** + * Replicate this array of entries directly into the local cluster using the native client. + * Like {@link #replicateEntries(org.apache.hadoop.hbase.regionserver.wal.HLog.Entry[])} only + * operates against raw protobuf type saving on a convertion from pb to pojo. + * + * @param entries + * @param cells + * @throws IOException + */ + public void replicateEntries(List entries, final CellScanner cells) throws IOException { + if (entries.isEmpty()) return; + if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner"); + // Very simple optimization where we batch sequences of rows going + // to the same table. + try { + long totalReplicated = 0; + // Map of table => list of Rows, we only want to flushCommits once per + // invocation of this method per table. + Map> rows = new TreeMap>(Bytes.BYTES_COMPARATOR); + for (WALEntry entry : entries) { + byte[] table = entry.getKey().getTableName().toByteArray(); + Cell previousCell = null; + Mutation m = null; + java.util.UUID uuid = toUUID(entry.getKey().getClusterId()); + int index = 0; + int count = entry.getAssociatedCellCount(); + while (index < count) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) throw new ArrayIndexOutOfBoundsException(); + Cell cell = cells.current(); + if (isNewRowOrType(previousCell, cell)) { + // Create new mutation + m = CellUtil.isDelete(cell)? + new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()): + new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + m.setClusterId(uuid); + addToMultiMap(rows, table, m); + } + if (CellUtil.isDelete(cell)) { + ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); + } else { + ((Put)m).add(KeyValueUtil.ensureKeyValue(cell)); + } + previousCell = cell; + } + totalReplicated++; + } + for (Entry> entry : rows.entrySet()) { + batch(entry.getKey(), entry.getValue()); + } + int size = entries.size(); + this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); + this.metrics.applyBatch(size); + this.totalReplicatedEdits.addAndGet(totalReplicated); + } catch (IOException ex) { + LOG.error("Unable to accept edit because:", ex); + throw ex; + } + } + + private boolean isNewRowOrType(final Cell previousCell, final Cell cell) { + return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() || + !CellUtil.matchingRow(previousCell, cell); + } + + private java.util.UUID toUUID(final HBaseProtos.UUID uuid) { + return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); + } + + /** * Replicate this array of entries directly into the local cluster * using the native client. * * @param entries * @throws IOException */ - public void replicateEntries(HLog.Entry[] entries) - throws IOException { - if (entries.length == 0) { - return; - } + public void replicateEntries(HLog.Entry[] entries) throws IOException { + if (entries.length == 0) return; // Very simple optimization where we batch sequences of rows going // to the same table. try { @@ -134,7 +207,7 @@ public class ReplicationSink { KeyValue lastKV = null; List kvs = edit.getKeyValues(); for (KeyValue kv : kvs) { - if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { + if (isNewRowOrType(lastKV, kv)) { if (kv.isDelete()) { del = new Delete(kv.getRow()); del.setClusterId(entry.getKey().getClusterId()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fb15ce4..92ed5cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -177,6 +177,9 @@ public class ReplicationSource extends Thread new PriorityBlockingQueue( conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); + // TODO: This connection is replication specific or we should make it particular to + // replication and make replication specific settings such as compression or codec to use + // passing Cells. this.conn = HConnectionManager.getConnection(conf); this.zkHelper = manager.getRepZkWrapper(); this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java new file mode 100644 index 0000000..932e45c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.protobuf; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestReplicationProtobuf { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + /** + * Little test to check we can basically convert list of a list of KVs into a CellScanner + * @throws IOException + */ + @Test + public void testGetCellScanner() throws IOException { + List a = new ArrayList(); + KeyValue akv = new KeyValue(Bytes.toBytes("a"), -1L); + a.add(akv); + // Add a few just to make it less regular. + a.add(new KeyValue(Bytes.toBytes("aa"), -1L)); + a.add(new KeyValue(Bytes.toBytes("aaa"), -1L)); + List b = new ArrayList(); + KeyValue bkv = new KeyValue(Bytes.toBytes("b"), -1L); + a.add(bkv); + List c = new ArrayList(); + KeyValue ckv = new KeyValue(Bytes.toBytes("c"), -1L); + c.add(ckv); + List> all = new ArrayList>(); + all.add(a); + all.add(b); + all.add(c); + CellScanner scanner = ReplicationProtbufUtil.getCellScanner(all); + testAdvancetHasSameRow(scanner, akv); + // Skip over aa + scanner.advance(); + // Skip over aaa + scanner.advance(); + testAdvancetHasSameRow(scanner, bkv); + testAdvancetHasSameRow(scanner, ckv); + assertFalse(scanner.advance()); + } + + private void testAdvancetHasSameRow(CellScanner scanner, final KeyValue kv) throws IOException { + scanner.advance(); + assertTrue(Bytes.equals(scanner.current().getRowArray(), scanner.current().getRowOffset(), + scanner.current().getRowLength(), + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + } +} \ No newline at end of file -- 1.8.1.2