diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java
index 5fd4e18..ae9c971 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java
@@ -6914,6 +6914,21 @@ public final class ClusterStatusProtos {
* required uint64 replicationLag = 5;
*/
long getReplicationLag();
+
+ // optional string walGroupId = 6;
+ /**
+ * optional string walGroupId = 6;
+ */
+ boolean hasWalGroupId();
+ /**
+ * optional string walGroupId = 6;
+ */
+ java.lang.String getWalGroupId();
+ /**
+ * optional string walGroupId = 6;
+ */
+ com.google.protobuf.ByteString
+ getWalGroupIdBytes();
}
/**
* Protobuf type {@code hbase.pb.ReplicationLoadSource}
@@ -6991,6 +7006,11 @@ public final class ClusterStatusProtos {
replicationLag_ = input.readUInt64();
break;
}
+ case 50: {
+ bitField0_ |= 0x00000020;
+ walGroupId_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -7138,12 +7158,56 @@ public final class ClusterStatusProtos {
return replicationLag_;
}
+ // optional string walGroupId = 6;
+ public static final int WALGROUPID_FIELD_NUMBER = 6;
+ private java.lang.Object walGroupId_;
+ /**
+ * optional string walGroupId = 6;
+ */
+ public boolean hasWalGroupId() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * optional string walGroupId = 6;
+ */
+ public java.lang.String getWalGroupId() {
+ java.lang.Object ref = walGroupId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ walGroupId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * optional string walGroupId = 6;
+ */
+ public com.google.protobuf.ByteString
+ getWalGroupIdBytes() {
+ java.lang.Object ref = walGroupId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ walGroupId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
peerID_ = "";
ageOfLastShippedOp_ = 0L;
sizeOfLogQueue_ = 0;
timeStampOfLastShippedOp_ = 0L;
replicationLag_ = 0L;
+ walGroupId_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7192,6 +7256,9 @@ public final class ClusterStatusProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeUInt64(5, replicationLag_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBytes(6, getWalGroupIdBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -7221,6 +7288,10 @@ public final class ClusterStatusProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(5, replicationLag_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(6, getWalGroupIdBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -7269,6 +7340,11 @@ public final class ClusterStatusProtos {
result = result && (getReplicationLag()
== other.getReplicationLag());
}
+ result = result && (hasWalGroupId() == other.hasWalGroupId());
+ if (hasWalGroupId()) {
+ result = result && getWalGroupId()
+ .equals(other.getWalGroupId());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -7302,6 +7378,10 @@ public final class ClusterStatusProtos {
hash = (37 * hash) + REPLICATIONLAG_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getReplicationLag());
}
+ if (hasWalGroupId()) {
+ hash = (37 * hash) + WALGROUPID_FIELD_NUMBER;
+ hash = (53 * hash) + getWalGroupId().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -7421,6 +7501,8 @@ public final class ClusterStatusProtos {
bitField0_ = (bitField0_ & ~0x00000008);
replicationLag_ = 0L;
bitField0_ = (bitField0_ & ~0x00000010);
+ walGroupId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -7469,6 +7551,10 @@ public final class ClusterStatusProtos {
to_bitField0_ |= 0x00000010;
}
result.replicationLag_ = replicationLag_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.walGroupId_ = walGroupId_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -7502,6 +7588,11 @@ public final class ClusterStatusProtos {
if (other.hasReplicationLag()) {
setReplicationLag(other.getReplicationLag());
}
+ if (other.hasWalGroupId()) {
+ bitField0_ |= 0x00000020;
+ walGroupId_ = other.walGroupId_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -7755,6 +7846,80 @@ public final class ClusterStatusProtos {
return this;
}
+ // optional string walGroupId = 6;
+ private java.lang.Object walGroupId_ = "";
+ /**
+ * optional string walGroupId = 6;
+ */
+ public boolean hasWalGroupId() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * optional string walGroupId = 6;
+ */
+ public java.lang.String getWalGroupId() {
+ java.lang.Object ref = walGroupId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ walGroupId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * optional string walGroupId = 6;
+ */
+ public com.google.protobuf.ByteString
+ getWalGroupIdBytes() {
+ java.lang.Object ref = walGroupId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ walGroupId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * optional string walGroupId = 6;
+ */
+ public Builder setWalGroupId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000020;
+ walGroupId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string walGroupId = 6;
+ */
+ public Builder clearWalGroupId() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ walGroupId_ = getDefaultInstance().getWalGroupId();
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string walGroupId = 6;
+ */
+ public Builder setWalGroupIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000020;
+ walGroupId_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationLoadSource)
}
@@ -14740,36 +14905,36 @@ public final class ClusterStatusProtos {
"\n\032store_complete_sequence_id\030\022 \003(\0132\031.hba" +
"se.pb.StoreSequenceId\"T\n\023ReplicationLoad" +
"Sink\022\032\n\022ageOfLastAppliedOp\030\001 \002(\004\022!\n\031time" +
- "StampsOfLastAppliedOp\030\002 \002(\004\"\225\001\n\025Replicat" +
+ "StampsOfLastAppliedOp\030\002 \002(\004\"\251\001\n\025Replicat" +
"ionLoadSource\022\016\n\006peerID\030\001 \002(\t\022\032\n\022ageOfLa" +
"stShippedOp\030\002 \002(\004\022\026\n\016sizeOfLogQueue\030\003 \002(" +
"\r\022 \n\030timeStampOfLastShippedOp\030\004 \002(\004\022\026\n\016r" +
- "eplicationLag\030\005 \002(\004\"\212\003\n\nServerLoad\022\032\n\022nu" +
- "mber_of_requests\030\001 \001(\004\022 \n\030total_number_o",
- "f_requests\030\002 \001(\004\022\024\n\014used_heap_MB\030\003 \001(\r\022\023" +
- "\n\013max_heap_MB\030\004 \001(\r\022*\n\014region_loads\030\005 \003(" +
- "\0132\024.hbase.pb.RegionLoad\022+\n\014coprocessors\030" +
- "\006 \003(\0132\025.hbase.pb.Coprocessor\022\031\n\021report_s" +
- "tart_time\030\007 \001(\004\022\027\n\017report_end_time\030\010 \001(\004" +
- "\022\030\n\020info_server_port\030\t \001(\r\0227\n\016replLoadSo" +
- "urce\030\n \003(\0132\037.hbase.pb.ReplicationLoadSou" +
- "rce\0223\n\014replLoadSink\030\013 \001(\0132\035.hbase.pb.Rep" +
- "licationLoadSink\"a\n\016LiveServerInfo\022$\n\006se" +
- "rver\030\001 \002(\0132\024.hbase.pb.ServerName\022)\n\013serv",
- "er_load\030\002 \002(\0132\024.hbase.pb.ServerLoad\"\250\003\n\r" +
- "ClusterStatus\0228\n\rhbase_version\030\001 \001(\0132!.h" +
- "base.pb.HBaseVersionFileContent\022.\n\014live_" +
- "servers\030\002 \003(\0132\030.hbase.pb.LiveServerInfo\022" +
- "*\n\014dead_servers\030\003 \003(\0132\024.hbase.pb.ServerN" +
- "ame\022;\n\025regions_in_transition\030\004 \003(\0132\034.hba" +
- "se.pb.RegionInTransition\022\'\n\ncluster_id\030\005" +
- " \001(\0132\023.hbase.pb.ClusterId\0222\n\023master_copr" +
- "ocessors\030\006 \003(\0132\025.hbase.pb.Coprocessor\022$\n" +
- "\006master\030\007 \001(\0132\024.hbase.pb.ServerName\022,\n\016b",
- "ackup_masters\030\010 \003(\0132\024.hbase.pb.ServerNam" +
- "e\022\023\n\013balancer_on\030\t \001(\010BF\n*org.apache.had" +
- "oop.hbase.protobuf.generatedB\023ClusterSta" +
- "tusProtosH\001\240\001\001"
+ "eplicationLag\030\005 \002(\004\022\022\n\nwalGroupId\030\006 \001(\t\"" +
+ "\212\003\n\nServerLoad\022\032\n\022number_of_requests\030\001 \001",
+ "(\004\022 \n\030total_number_of_requests\030\002 \001(\004\022\024\n\014" +
+ "used_heap_MB\030\003 \001(\r\022\023\n\013max_heap_MB\030\004 \001(\r\022" +
+ "*\n\014region_loads\030\005 \003(\0132\024.hbase.pb.RegionL" +
+ "oad\022+\n\014coprocessors\030\006 \003(\0132\025.hbase.pb.Cop" +
+ "rocessor\022\031\n\021report_start_time\030\007 \001(\004\022\027\n\017r" +
+ "eport_end_time\030\010 \001(\004\022\030\n\020info_server_port" +
+ "\030\t \001(\r\0227\n\016replLoadSource\030\n \003(\0132\037.hbase.p" +
+ "b.ReplicationLoadSource\0223\n\014replLoadSink\030" +
+ "\013 \001(\0132\035.hbase.pb.ReplicationLoadSink\"a\n\016" +
+ "LiveServerInfo\022$\n\006server\030\001 \002(\0132\024.hbase.p",
+ "b.ServerName\022)\n\013server_load\030\002 \002(\0132\024.hbas" +
+ "e.pb.ServerLoad\"\250\003\n\rClusterStatus\0228\n\rhba" +
+ "se_version\030\001 \001(\0132!.hbase.pb.HBaseVersion" +
+ "FileContent\022.\n\014live_servers\030\002 \003(\0132\030.hbas" +
+ "e.pb.LiveServerInfo\022*\n\014dead_servers\030\003 \003(" +
+ "\0132\024.hbase.pb.ServerName\022;\n\025regions_in_tr" +
+ "ansition\030\004 \003(\0132\034.hbase.pb.RegionInTransi" +
+ "tion\022\'\n\ncluster_id\030\005 \001(\0132\023.hbase.pb.Clus" +
+ "terId\0222\n\023master_coprocessors\030\006 \003(\0132\025.hba" +
+ "se.pb.Coprocessor\022$\n\006master\030\007 \001(\0132\024.hbas",
+ "e.pb.ServerName\022,\n\016backup_masters\030\010 \003(\0132" +
+ "\024.hbase.pb.ServerName\022\023\n\013balancer_on\030\t \001" +
+ "(\010BF\n*org.apache.hadoop.hbase.protobuf.g" +
+ "eneratedB\023ClusterStatusProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14817,7 +14982,7 @@ public final class ClusterStatusProtos {
internal_static_hbase_pb_ReplicationLoadSource_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_ReplicationLoadSource_descriptor,
- new java.lang.String[] { "PeerID", "AgeOfLastShippedOp", "SizeOfLogQueue", "TimeStampOfLastShippedOp", "ReplicationLag", });
+ new java.lang.String[] { "PeerID", "AgeOfLastShippedOp", "SizeOfLogQueue", "TimeStampOfLastShippedOp", "ReplicationLag", "WalGroupId", });
internal_static_hbase_pb_ServerLoad_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_hbase_pb_ServerLoad_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
index 228be7e..7c5361e 100644
--- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
@@ -151,6 +151,7 @@ message ReplicationLoadSource {
required uint32 sizeOfLogQueue = 3;
required uint64 timeStampOfLastShippedOp = 4;
required uint64 replicationLag = 5;
+ optional string walGroupId = 6;
}
message ServerLoad {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index cf08787..19e1a93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -175,6 +175,10 @@ public class MetricsSource {
* @return peerID
*/
public String getPeerID() {
- return id;
+ return id.split(ReplicationSourceManager.PEER_GROUP_DELIMITER)[0];
+ }
+
+ public String getWALGroupId() {
+ return id.split(ReplicationSourceManager.PEER_GROUP_DELIMITER)[1];
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index 8dd42bc..3b1bd21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -88,6 +88,7 @@ public class ReplicationLoad {
ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
ClusterStatusProtos.ReplicationLoadSource.newBuilder();
rLoadSourceBuild.setPeerID(sm.getPeerID());
+ rLoadSourceBuild.setWalGroupId(sm.getWALGroupId());
rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2126f6d..ee2177a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -86,6 +86,8 @@ public class ReplicationSource extends Thread
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
+ // id of the wal group this source is handling
+ private String walGroupId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
@@ -145,6 +147,7 @@ public class ReplicationSource extends Thread
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source
+ * @param walGroupId wal group this source is handling
* @throws IOException
*/
@Override
@@ -152,7 +155,7 @@ public class ReplicationSource extends Thread
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
- final MetricsSource metrics)
+ final MetricsSource metrics, final String walGroupId)
throws IOException {
this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf);
@@ -183,6 +186,7 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
+ this.walGroupId = walGroupId;
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
}
@@ -196,6 +200,11 @@ public class ReplicationSource extends Thread
@Override
public void enqueueLog(Path log) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ReplicationSource for peer and group [" + peerId
+ + ReplicationSourceManager.PEER_GROUP_DELIMITER + walGroupId + "] now enqueue log: "
+ + log);
+ }
this.queue.put(log);
int queueSize = queue.size();
this.metrics.setSizeOfLogQueue(queueSize);
@@ -880,4 +889,9 @@ public class ReplicationSource extends Thread
public MetricsSource getSourceMetrics() {
return this.metrics;
}
+
+ @Override
+ public String getWALGroupId() {
+ return this.walGroupId;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 1e9c714..fa25628 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -46,13 +46,14 @@ public interface ReplicationSourceInterface {
* @param stopper the stopper object for this region server
* @param peerClusterZnode
* @param clusterId
+ * @param groupId
* @throws IOException
*/
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
- final MetricsSource metrics) throws IOException;
+ final MetricsSource metrics, String groupId) throws IOException;
/**
* Add a log to the list of logs to replicate
@@ -105,4 +106,6 @@ public interface ReplicationSourceInterface {
*/
String getStats();
+ String getWALGroupId();
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0c8f6f9..2fb5c19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -23,9 +23,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -63,7 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* This class is responsible to manage all the replication
* sources. There are two classes of sources:
*