diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index c677a0d..0bdcd4a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -242,4 +242,4 @@ public final class CellUtil { } }; } -} +} \ No newline at end of file diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java index da5d121..a3bd954 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java @@ -9894,6 +9894,10 @@ public final class AdminProtos { java.util.List getKeyValueBytesList(); int getKeyValueBytesCount(); com.google.protobuf.ByteString getKeyValueBytes(int index); + + // optional int32 associatedCellCount = 3; + boolean hasAssociatedCellCount(); + int getAssociatedCellCount(); } public static final class WALEntry extends com.google.protobuf.GeneratedMessage @@ -9951,9 +9955,20 @@ public final class AdminProtos { return keyValueBytes_.get(index); } + // optional int32 associatedCellCount = 3; + public static final int ASSOCIATEDCELLCOUNT_FIELD_NUMBER = 3; + private int associatedCellCount_; + public boolean hasAssociatedCellCount() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getAssociatedCellCount() { + return associatedCellCount_; + } + private void initFields() { key_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.getDefaultInstance(); keyValueBytes_ = java.util.Collections.emptyList();; + associatedCellCount_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -9981,6 +9996,9 @@ public final class AdminProtos { for (int i = 0; i < keyValueBytes_.size(); i++) { output.writeBytes(2, keyValueBytes_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeInt32(3, associatedCellCount_); + } getUnknownFields().writeTo(output); } @@ -10003,6 +10021,10 @@ public final class AdminProtos { size += dataSize; size += 1 * getKeyValueBytesList().size(); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, associatedCellCount_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10033,6 +10055,11 @@ public final class AdminProtos { } result = result && getKeyValueBytesList() .equals(other.getKeyValueBytesList()); + result = result && (hasAssociatedCellCount() == other.hasAssociatedCellCount()); + if (hasAssociatedCellCount()) { + result = result && (getAssociatedCellCount() + == other.getAssociatedCellCount()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10050,6 +10077,10 @@ public final class AdminProtos { hash = (37 * hash) + KEYVALUEBYTES_FIELD_NUMBER; hash = (53 * hash) + getKeyValueBytesList().hashCode(); } + if (hasAssociatedCellCount()) { + hash = (37 * hash) + ASSOCIATEDCELLCOUNT_FIELD_NUMBER; + hash = (53 * hash) + getAssociatedCellCount(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -10175,6 +10206,8 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000001); keyValueBytes_ = java.util.Collections.emptyList();; bitField0_ = (bitField0_ & ~0x00000002); + associatedCellCount_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -10226,6 +10259,10 @@ public final class AdminProtos { bitField0_ = (bitField0_ & ~0x00000002); } result.keyValueBytes_ = keyValueBytes_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.associatedCellCount_ = associatedCellCount_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10255,6 +10292,9 @@ public final class AdminProtos { } onChanged(); } + if (other.hasAssociatedCellCount()) { + setAssociatedCellCount(other.getAssociatedCellCount()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -10308,6 +10348,11 @@ public final class AdminProtos { keyValueBytes_.add(input.readBytes()); break; } + case 24: { + bitField0_ |= 0x00000004; + associatedCellCount_ = input.readInt32(); + break; + } } } } @@ -10455,6 +10500,27 @@ public final class AdminProtos { return this; } + // optional int32 associatedCellCount = 3; + private int associatedCellCount_ ; + public boolean hasAssociatedCellCount() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public int getAssociatedCellCount() { + return associatedCellCount_; + } + public Builder setAssociatedCellCount(int value) { + bitField0_ |= 0x00000004; + associatedCellCount_ = value; + onChanged(); + return this; + } + public Builder clearAssociatedCellCount() { + bitField0_ = (bitField0_ & ~0x00000004); + associatedCellCount_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALEntry) } @@ -15359,41 +15425,41 @@ public final class AdminProtos { "gionsRequest\022!\n\007regionA\030\001 \002(\0132\020.RegionSp" + "ecifier\022!\n\007regionB\030\002 \002(\0132\020.RegionSpecifi" + "er\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024MergeRegi" + - "onsResponse\"7\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W", - "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\"4\n\030Replicat" + - "eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" + - "ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" + - "ALWriterRequest\".\n\025RollWALWriterResponse" + - "\022\025\n\rregionToFlush\030\001 \003(\014\"#\n\021StopServerReq" + - "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" + - "se\"\026\n\024GetServerInfoRequest\"@\n\nServerInfo" + - "\022\037\n\nserverName\030\001 \002(\0132\013.ServerName\022\021\n\tweb" + - "uiPort\030\002 \001(\r\"8\n\025GetServerInfoResponse\022\037\n" + - "\nserverInfo\030\001 \002(\0132\013.ServerInfo2\337\006\n\014Admin", - "Service\022>\n\rgetRegionInfo\022\025.GetRegionInfo" + - "Request\032\026.GetRegionInfoResponse\022;\n\014getSt" + - "oreFile\022\024.GetStoreFileRequest\032\025.GetStore" + - "FileResponse\022D\n\017getOnlineRegion\022\027.GetOnl" + - "ineRegionRequest\032\030.GetOnlineRegionRespon" + - "se\0225\n\nopenRegion\022\022.OpenRegionRequest\032\023.O" + - "penRegionResponse\0228\n\013closeRegion\022\023.Close" + - "RegionRequest\032\024.CloseRegionResponse\0228\n\013f" + - "lushRegion\022\023.FlushRegionRequest\032\024.FlushR" + - "egionResponse\0228\n\013splitRegion\022\023.SplitRegi", - "onRequest\032\024.SplitRegionResponse\022>\n\rcompa" + - "ctRegion\022\025.CompactRegionRequest\032\026.Compac" + - "tRegionResponse\022;\n\014mergeRegions\022\024.MergeR" + - "egionsRequest\032\025.MergeRegionsResponse\022J\n\021" + - "replicateWALEntry\022\031.ReplicateWALEntryReq" + - "uest\032\032.ReplicateWALEntryResponse\022\'\n\006repl" + - "ay\022\r.MultiRequest\032\016.MultiResponse\022>\n\rrol" + - "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" + - "WALWriterResponse\022>\n\rgetServerInfo\022\025.Get" + - "ServerInfoRequest\032\026.GetServerInfoRespons", - "e\0225\n\nstopServer\022\022.StopServerRequest\032\023.St" + - "opServerResponseBA\n*org.apache.hadoop.hb" + - "ase.protobuf.generatedB\013AdminProtosH\001\210\001\001" + - "\240\001\001" + "onsResponse\"T\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W", + "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\022\033\n\023associat" + + "edCellCount\030\003 \001(\005\"4\n\030ReplicateWALEntryRe" + + "quest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Repli" + + "cateWALEntryResponse\"\026\n\024RollWALWriterReq" + + "uest\".\n\025RollWALWriterResponse\022\025\n\rregionT" + + "oFlush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006rea" + + "son\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetSe" + + "rverInfoRequest\"@\n\nServerInfo\022\037\n\nserverN" + + "ame\030\001 \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(" + + "\r\"8\n\025GetServerInfoResponse\022\037\n\nserverInfo", + "\030\001 \002(\0132\013.ServerInfo2\337\006\n\014AdminService\022>\n\r" + + "getRegionInfo\022\025.GetRegionInfoRequest\032\026.G" + + "etRegionInfoResponse\022;\n\014getStoreFile\022\024.G" + + "etStoreFileRequest\032\025.GetStoreFileRespons" + + "e\022D\n\017getOnlineRegion\022\027.GetOnlineRegionRe" + + "quest\032\030.GetOnlineRegionResponse\0225\n\nopenR" + + "egion\022\022.OpenRegionRequest\032\023.OpenRegionRe" + + "sponse\0228\n\013closeRegion\022\023.CloseRegionReque" + + "st\032\024.CloseRegionResponse\0228\n\013flushRegion\022" + + "\023.FlushRegionRequest\032\024.FlushRegionRespon", + "se\0228\n\013splitRegion\022\023.SplitRegionRequest\032\024" + + ".SplitRegionResponse\022>\n\rcompactRegion\022\025." + + "CompactRegionRequest\032\026.CompactRegionResp" + + "onse\022;\n\014mergeRegions\022\024.MergeRegionsReque" + + "st\032\025.MergeRegionsResponse\022J\n\021replicateWA" + + "LEntry\022\031.ReplicateWALEntryRequest\032\032.Repl" + + "icateWALEntryResponse\022\'\n\006replay\022\r.MultiR" + + "equest\032\016.MultiResponse\022>\n\rrollWALWriter\022" + + "\025.RollWALWriterRequest\032\026.RollWALWriterRe" + + "sponse\022>\n\rgetServerInfo\022\025.GetServerInfoR", + "equest\032\026.GetServerInfoResponse\0225\n\nstopSe" + + "rver\022\022.StopServerRequest\032\023.StopServerRes" + + "ponseBA\n*org.apache.hadoop.hbase.protobu" + + "f.generatedB\013AdminProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -15557,7 +15623,7 @@ public final class AdminProtos { internal_static_WALEntry_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALEntry_descriptor, - new java.lang.String[] { "Key", "KeyValueBytes", }, + new java.lang.String[] { "Key", "KeyValueBytes", "AssociatedCellCount", }, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.class, org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.Builder.class); internal_static_ReplicateWALEntryRequest_descriptor = diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index 733eadf..75b11dd 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -161,14 +161,18 @@ message MergeRegionsResponse { // Protocol buffer version of WAL for replication message WALEntry { required WALKey key = 1; + // Following may be null if the KVs/Cells are carried along the side in a cellblock (See + // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null + // and associatedCellCount has count of Cells associated w/ this WALEntry repeated bytes keyValueBytes = 2; + // If Cell data is carried alongside in a cellblock, this is count of Cells in the cellblock. + optional int32 associatedCellCount = 3; } /** * Replicates the given entries. The guarantee is that the given entries * will be durable on the slave cluster if this method returns without - * any exception. - * hbase.replication has to be set to true for this to work. + * any exception. hbase.replication has to be set to true for this to work. */ message ReplicateWALEntryRequest { repeated WALEntry entry = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 29f0154..ea37fbc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -20,26 +20,31 @@ package org.apache.hadoop.hbase.protobuf; -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; public class ReplicationProtbufUtil { /** @@ -81,10 +86,11 @@ public class ReplicationProtbufUtil { */ public static void replicateWALEntry(final AdminService.BlockingInterface admin, final HLog.Entry[] entries) throws IOException { - AdminProtos.ReplicateWALEntryRequest request = + Pair p = buildReplicateWALEntryRequest(entries); try { - admin.replicateWALEntry(null, request); + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -94,10 +100,13 @@ public class ReplicationProtbufUtil { * Create a new ReplicateWALEntryRequest from a list of HLog entries * * @param entries the HLog entries to be replicated - * @return a ReplicateWALEntryRequest + * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values + * found. */ - public static AdminProtos.ReplicateWALEntryRequest + public static Pair buildReplicateWALEntryRequest(final HLog.Entry[] entries) { + // Accumulate all the KVs seen in here. + List> allkvs = new ArrayList>(entries.length); WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder(); AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); AdminProtos.ReplicateWALEntryRequest.Builder builder = @@ -128,13 +137,41 @@ public class ReplicationProtbufUtil { keyBuilder.addScopes(scopeBuilder.build()); } } - List keyValues = edit.getKeyValues(); - for (KeyValue value: keyValues) { - entryBuilder.addKeyValueBytes(ByteString.copyFrom( - value.getBuffer(), value.getOffset(), value.getLength())); - } + List kvs = edit.getKeyValues(); + allkvs.add(kvs); + // Write out how many kvs associated with this entry. + entryBuilder.setAssociatedCellCount(kvs.size()); builder.addEntry(entryBuilder.build()); } - return builder.build(); + return new Pair(builder.build(), + getCellScanner(allkvs)); + } + + static CellScanner getCellScanner(final List> cells) { + return new CellScanner() { + private final Iterator> entries = cells.iterator(); + private Iterator currentIterator = null; + private Cell currentCell; + + @Override + public Cell current() { + return this.currentCell; + } + + @Override + public boolean advance() { + if (this.currentIterator == null) { + if (!this.entries.hasNext()) return false; + this.currentIterator = this.entries.next().iterator(); + } + if (this.currentIterator.hasNext()) { + this.currentCell = this.currentIterator.next(); + return true; + } + this.currentCell = null; + this.currentIterator = null; + return advance(); + } + }; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ed8f709..d193f46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3714,7 +3714,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (replicationSinkHandler != null) { checkOpen(); requestCount.increment(); - HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList()); + HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList(), + (controller != null)? ((PayloadCarryingRpcController)controller).cellScanner(): null); if (entries != null && entries.length > 0) { replicationSinkHandler.replicateLogEntries(entries); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fb15ce4..b51ab74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -177,6 +177,9 @@ public class ReplicationSource extends Thread new PriorityBlockingQueue( conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator()); + // TODO: This connection is replication specific or we should make it particular to + // replication and make replication specific settins stuch as compression or codec to use + // passing Cells. this.conn = HConnectionManager.getConnection(conf); this.zkHelper = manager.getRepZkWrapper(); this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java new file mode 100644 index 0000000..932e45c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.protobuf; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestReplicationProtobuf { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + /** + * Little test to check we can basically convert list of a list of KVs into a CellScanner + * @throws IOException + */ + @Test + public void testGetCellScanner() throws IOException { + List a = new ArrayList(); + KeyValue akv = new KeyValue(Bytes.toBytes("a"), -1L); + a.add(akv); + // Add a few just to make it less regular. + a.add(new KeyValue(Bytes.toBytes("aa"), -1L)); + a.add(new KeyValue(Bytes.toBytes("aaa"), -1L)); + List b = new ArrayList(); + KeyValue bkv = new KeyValue(Bytes.toBytes("b"), -1L); + a.add(bkv); + List c = new ArrayList(); + KeyValue ckv = new KeyValue(Bytes.toBytes("c"), -1L); + c.add(ckv); + List> all = new ArrayList>(); + all.add(a); + all.add(b); + all.add(c); + CellScanner scanner = ReplicationProtbufUtil.getCellScanner(all); + testAdvancetHasSameRow(scanner, akv); + // Skip over aa + scanner.advance(); + // Skip over aaa + scanner.advance(); + testAdvancetHasSameRow(scanner, bkv); + testAdvancetHasSameRow(scanner, ckv); + assertFalse(scanner.advance()); + } + + private void testAdvancetHasSameRow(CellScanner scanner, final KeyValue kv) throws IOException { + scanner.advance(); + assertTrue(Bytes.equals(scanner.current().getRowArray(), scanner.current().getRowOffset(), + scanner.current().getRowLength(), + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + } +} \ No newline at end of file