diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java index 5fd4e18..ae9c971 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClusterStatusProtos.java @@ -6914,6 +6914,21 @@ public final class ClusterStatusProtos { * required uint64 replicationLag = 5; */ long getReplicationLag(); + + // optional string walGroupId = 6; + /** + * optional string walGroupId = 6; + */ + boolean hasWalGroupId(); + /** + * optional string walGroupId = 6; + */ + java.lang.String getWalGroupId(); + /** + * optional string walGroupId = 6; + */ + com.google.protobuf.ByteString + getWalGroupIdBytes(); } /** * Protobuf type {@code hbase.pb.ReplicationLoadSource} @@ -6991,6 +7006,11 @@ public final class ClusterStatusProtos { replicationLag_ = input.readUInt64(); break; } + case 50: { + bitField0_ |= 0x00000020; + walGroupId_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -7138,12 +7158,56 @@ public final class ClusterStatusProtos { return replicationLag_; } + // optional string walGroupId = 6; + public static final int WALGROUPID_FIELD_NUMBER = 6; + private java.lang.Object walGroupId_; + /** + * optional string walGroupId = 6; + */ + public boolean hasWalGroupId() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional string walGroupId = 6; + */ + public java.lang.String getWalGroupId() { + java.lang.Object ref = walGroupId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + walGroupId_ = s; + } + return s; + } + } + /** + * optional string walGroupId = 6; + */ + public com.google.protobuf.ByteString + getWalGroupIdBytes() { + java.lang.Object ref = walGroupId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + walGroupId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { peerID_ = ""; ageOfLastShippedOp_ = 0L; sizeOfLogQueue_ = 0; timeStampOfLastShippedOp_ = 0L; replicationLag_ = 0L; + walGroupId_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7192,6 +7256,9 @@ public final class ClusterStatusProtos { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeUInt64(5, replicationLag_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBytes(6, getWalGroupIdBytes()); + } getUnknownFields().writeTo(output); } @@ -7221,6 +7288,10 @@ public final class ClusterStatusProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(5, replicationLag_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(6, getWalGroupIdBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -7269,6 +7340,11 @@ public final class ClusterStatusProtos { result = result && (getReplicationLag() == other.getReplicationLag()); } + result = result && (hasWalGroupId() == other.hasWalGroupId()); + if (hasWalGroupId()) { + result = result && getWalGroupId() + .equals(other.getWalGroupId()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -7302,6 +7378,10 @@ public final class ClusterStatusProtos { hash = (37 * hash) + REPLICATIONLAG_FIELD_NUMBER; hash = (53 * hash) + hashLong(getReplicationLag()); } + if (hasWalGroupId()) { + hash = (37 * hash) + WALGROUPID_FIELD_NUMBER; + hash = (53 * hash) + getWalGroupId().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -7421,6 +7501,8 @@ public final class ClusterStatusProtos { bitField0_ = (bitField0_ & ~0x00000008); replicationLag_ = 0L; bitField0_ = (bitField0_ & ~0x00000010); + walGroupId_ = ""; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -7469,6 +7551,10 @@ public final class ClusterStatusProtos { to_bitField0_ |= 0x00000010; } result.replicationLag_ = replicationLag_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.walGroupId_ = walGroupId_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -7502,6 +7588,11 @@ public final class ClusterStatusProtos { if (other.hasReplicationLag()) { setReplicationLag(other.getReplicationLag()); } + if (other.hasWalGroupId()) { + bitField0_ |= 0x00000020; + walGroupId_ = other.walGroupId_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -7755,6 +7846,80 @@ public final class ClusterStatusProtos { return this; } + // optional string walGroupId = 6; + private java.lang.Object walGroupId_ = ""; + /** + * optional string walGroupId = 6; + */ + public boolean hasWalGroupId() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional string walGroupId = 6; + */ + public java.lang.String getWalGroupId() { + java.lang.Object ref = walGroupId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + walGroupId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string walGroupId = 6; + */ + public com.google.protobuf.ByteString + getWalGroupIdBytes() { + java.lang.Object ref = walGroupId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + walGroupId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string walGroupId = 6; + */ + public Builder setWalGroupId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + walGroupId_ = value; + onChanged(); + return this; + } + /** + * optional string walGroupId = 6; + */ + public Builder clearWalGroupId() { + bitField0_ = (bitField0_ & ~0x00000020); + walGroupId_ = getDefaultInstance().getWalGroupId(); + onChanged(); + return this; + } + /** + * optional string walGroupId = 6; + */ + public Builder setWalGroupIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000020; + walGroupId_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationLoadSource) } @@ -14740,36 +14905,36 @@ public final class ClusterStatusProtos { "\n\032store_complete_sequence_id\030\022 \003(\0132\031.hba" + "se.pb.StoreSequenceId\"T\n\023ReplicationLoad" + "Sink\022\032\n\022ageOfLastAppliedOp\030\001 \002(\004\022!\n\031time" + - "StampsOfLastAppliedOp\030\002 \002(\004\"\225\001\n\025Replicat" + + "StampsOfLastAppliedOp\030\002 \002(\004\"\251\001\n\025Replicat" + "ionLoadSource\022\016\n\006peerID\030\001 \002(\t\022\032\n\022ageOfLa" + "stShippedOp\030\002 \002(\004\022\026\n\016sizeOfLogQueue\030\003 \002(" + "\r\022 \n\030timeStampOfLastShippedOp\030\004 \002(\004\022\026\n\016r" + - "eplicationLag\030\005 \002(\004\"\212\003\n\nServerLoad\022\032\n\022nu" + - "mber_of_requests\030\001 \001(\004\022 \n\030total_number_o", - "f_requests\030\002 \001(\004\022\024\n\014used_heap_MB\030\003 \001(\r\022\023" + - "\n\013max_heap_MB\030\004 \001(\r\022*\n\014region_loads\030\005 \003(" + - "\0132\024.hbase.pb.RegionLoad\022+\n\014coprocessors\030" + - "\006 \003(\0132\025.hbase.pb.Coprocessor\022\031\n\021report_s" + - "tart_time\030\007 \001(\004\022\027\n\017report_end_time\030\010 \001(\004" + - "\022\030\n\020info_server_port\030\t \001(\r\0227\n\016replLoadSo" + - "urce\030\n \003(\0132\037.hbase.pb.ReplicationLoadSou" + - "rce\0223\n\014replLoadSink\030\013 \001(\0132\035.hbase.pb.Rep" + - "licationLoadSink\"a\n\016LiveServerInfo\022$\n\006se" + - "rver\030\001 \002(\0132\024.hbase.pb.ServerName\022)\n\013serv", - "er_load\030\002 \002(\0132\024.hbase.pb.ServerLoad\"\250\003\n\r" + - "ClusterStatus\0228\n\rhbase_version\030\001 \001(\0132!.h" + - "base.pb.HBaseVersionFileContent\022.\n\014live_" + - "servers\030\002 \003(\0132\030.hbase.pb.LiveServerInfo\022" + - "*\n\014dead_servers\030\003 \003(\0132\024.hbase.pb.ServerN" + - "ame\022;\n\025regions_in_transition\030\004 \003(\0132\034.hba" + - "se.pb.RegionInTransition\022\'\n\ncluster_id\030\005" + - " \001(\0132\023.hbase.pb.ClusterId\0222\n\023master_copr" + - "ocessors\030\006 \003(\0132\025.hbase.pb.Coprocessor\022$\n" + - "\006master\030\007 \001(\0132\024.hbase.pb.ServerName\022,\n\016b", - "ackup_masters\030\010 \003(\0132\024.hbase.pb.ServerNam" + - "e\022\023\n\013balancer_on\030\t \001(\010BF\n*org.apache.had" + - "oop.hbase.protobuf.generatedB\023ClusterSta" + - "tusProtosH\001\240\001\001" + "eplicationLag\030\005 \002(\004\022\022\n\nwalGroupId\030\006 \001(\t\"" + + "\212\003\n\nServerLoad\022\032\n\022number_of_requests\030\001 \001", + "(\004\022 \n\030total_number_of_requests\030\002 \001(\004\022\024\n\014" + + "used_heap_MB\030\003 \001(\r\022\023\n\013max_heap_MB\030\004 \001(\r\022" + + "*\n\014region_loads\030\005 \003(\0132\024.hbase.pb.RegionL" + + "oad\022+\n\014coprocessors\030\006 \003(\0132\025.hbase.pb.Cop" + + "rocessor\022\031\n\021report_start_time\030\007 \001(\004\022\027\n\017r" + + "eport_end_time\030\010 \001(\004\022\030\n\020info_server_port" + + "\030\t \001(\r\0227\n\016replLoadSource\030\n \003(\0132\037.hbase.p" + + "b.ReplicationLoadSource\0223\n\014replLoadSink\030" + + "\013 \001(\0132\035.hbase.pb.ReplicationLoadSink\"a\n\016" + + "LiveServerInfo\022$\n\006server\030\001 \002(\0132\024.hbase.p", + "b.ServerName\022)\n\013server_load\030\002 \002(\0132\024.hbas" + + "e.pb.ServerLoad\"\250\003\n\rClusterStatus\0228\n\rhba" + + "se_version\030\001 \001(\0132!.hbase.pb.HBaseVersion" + + "FileContent\022.\n\014live_servers\030\002 \003(\0132\030.hbas" + + "e.pb.LiveServerInfo\022*\n\014dead_servers\030\003 \003(" + + "\0132\024.hbase.pb.ServerName\022;\n\025regions_in_tr" + + "ansition\030\004 \003(\0132\034.hbase.pb.RegionInTransi" + + "tion\022\'\n\ncluster_id\030\005 \001(\0132\023.hbase.pb.Clus" + + "terId\0222\n\023master_coprocessors\030\006 \003(\0132\025.hba" + + "se.pb.Coprocessor\022$\n\006master\030\007 \001(\0132\024.hbas", + "e.pb.ServerName\022,\n\016backup_masters\030\010 \003(\0132" + + "\024.hbase.pb.ServerName\022\023\n\013balancer_on\030\t \001" + + "(\010BF\n*org.apache.hadoop.hbase.protobuf.g" + + "eneratedB\023ClusterStatusProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -14817,7 +14982,7 @@ public final class ClusterStatusProtos { internal_static_hbase_pb_ReplicationLoadSource_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ReplicationLoadSource_descriptor, - new java.lang.String[] { "PeerID", "AgeOfLastShippedOp", "SizeOfLogQueue", "TimeStampOfLastShippedOp", "ReplicationLag", }); + new java.lang.String[] { "PeerID", "AgeOfLastShippedOp", "SizeOfLogQueue", "TimeStampOfLastShippedOp", "ReplicationLag", "WalGroupId", }); internal_static_hbase_pb_ServerLoad_descriptor = getDescriptor().getMessageTypes().get(7); internal_static_hbase_pb_ServerLoad_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto index 228be7e..7c5361e 100644 --- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto @@ -151,6 +151,7 @@ message ReplicationLoadSource { required uint32 sizeOfLogQueue = 3; required uint64 timeStampOfLastShippedOp = 4; required uint64 replicationLag = 5; + optional string walGroupId = 6; } message ServerLoad { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index cf08787..19e1a93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -175,6 +175,10 @@ public class MetricsSource { * @return peerID */ public String getPeerID() { - return id; + return id.split(ReplicationSourceManager.PEER_GROUP_DELIMITER)[0]; + } + + public String getWALGroupId() { + return id.split(ReplicationSourceManager.PEER_GROUP_DELIMITER)[1]; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 8dd42bc..3b1bd21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -88,6 +88,7 @@ public class ReplicationLoad { ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.newBuilder(); rLoadSourceBuild.setPeerID(sm.getPeerID()); + rLoadSourceBuild.setWalGroupId(sm.getWALGroupId()); rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp); rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2126f6d..242b80a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -86,6 +86,8 @@ public class ReplicationSource extends Thread private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; + // id of the wal group this source is handling + private String groupId; // The manager of all sources to which we ping back our progress private ReplicationSourceManager manager; // Should we stop everything? @@ -145,6 +147,7 @@ public class ReplicationSource extends Thread * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation * @param metrics metrics for replication source + * @param walGroupId wal group this source is handling * @throws IOException */ @Override @@ -152,7 +155,7 @@ public class ReplicationSource extends Thread final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) + final MetricsSource metrics, final String walGroupId) throws IOException { this.stopper = stopper; this.conf = HBaseConfiguration.create(conf); @@ -183,6 +186,7 @@ public class ReplicationSource extends Thread this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); + this.groupId = walGroupId; this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); this.replicationEndpoint = replicationEndpoint; } @@ -196,6 +200,10 @@ public class ReplicationSource extends Thread @Override public void enqueueLog(Path log) { + if (LOG.isTraceEnabled()) { + LOG.trace("ReplicationSource for peer and group [" + peerId + + ReplicationSourceManager.PEER_GROUP_DELIMITER + groupId + "] now enqueue log: " + log); + } this.queue.put(log); int queueSize = queue.size(); this.metrics.setSizeOfLogQueue(queueSize); @@ -880,4 +888,9 @@ public class ReplicationSource extends Thread public MetricsSource getSourceMetrics() { return this.metrics; } + + @Override + public String getGroupId() { + return this.groupId; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 1e9c714..0e252d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -46,13 +46,14 @@ public interface ReplicationSourceInterface { * @param stopper the stopper object for this region server * @param peerClusterZnode * @param clusterId + * @param groupId * @throws IOException */ public void init(final Configuration conf, final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Stoppable stopper, final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint, - final MetricsSource metrics) throws IOException; + final MetricsSource metrics, String groupId) throws IOException; /** * Add a log to the list of logs to replicate @@ -105,4 +106,6 @@ public interface ReplicationSourceInterface { */ String getStats(); + String getGroupId(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 0c8f6f9..eb17e4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -23,9 +23,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; @@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -63,7 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * This class is responsible to manage all the replication * sources. There are two classes of sources: * @@ -91,13 +94,14 @@ public class ReplicationSourceManager implements ReplicationListener { // All about stopping private final Server server; // All logs we are currently tracking - private final Map> walsById; + // Index structure of the map is: peer_id->logPrefix/logGroup->logs + private final Map>> walsById; // Logs for recovered sources we are currently tracking private final Map> walsByIdRecoveredQueues; private final Configuration conf; private final FileSystem fs; - // The path to the latest log we saw, for new coming sources - private Path latestPath; + // The paths to the latest log of each wal group, for new coming peers + private Set latestPaths; // Path to the wals directories private final Path logDir; // Path to the wal archive @@ -109,6 +113,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final Random rand; + public static final String PEER_GROUP_DELIMITER = "@"; + /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -133,7 +139,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationPeers = replicationPeers; this.replicationTracker = replicationTracker; this.server = server; - this.walsById = new HashMap>(); + this.walsById = new HashMap>>(); this.walsByIdRecoveredQueues = new ConcurrentHashMap>(); this.oldsources = new CopyOnWriteArrayList(); this.conf = conf; @@ -158,6 +164,7 @@ public class ReplicationSourceManager implements ReplicationListener { tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.rand = new Random(); + this.latestPaths = Collections.synchronizedSet(new HashSet()); } /** @@ -196,8 +203,9 @@ public class ReplicationSourceManager implements ReplicationListener { } } else { synchronized (this.walsById) { - SortedSet wals = walsById.get(id); - if (!wals.first().equals(key)) { + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key); + SortedSet wals = walsById.get(id).get(logPrefix); + if (wals != null && !wals.first().equals(key)) { cleanOldLogs(wals, key, id); } } @@ -218,8 +226,12 @@ public class ReplicationSourceManager implements ReplicationListener { * old region server wal queues */ protected void init() throws IOException, ReplicationException { - for (String id : this.replicationPeers.getPeerIds()) { - addSource(id); + synchronized (this.walsById) { + for (String id : this.replicationPeers.getPeerIds()) { + if (walsById.get(id) == null) { + this.walsById.put(id, new HashMap>()); + } + } } List currentReplicators = this.replicationQueues.getListOfReplicators(); if (currentReplicators == null || currentReplicators.size() == 0) { @@ -238,40 +250,61 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Add a new normal source to this region server + * Add sources for the given peer cluster on this region server. For the newly added peer, we only + * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster - * @return the source that was created * @throws IOException */ - protected ReplicationSourceInterface addSource(String id) throws IOException, + protected void addSource(String id) throws IOException, ReplicationException { - ReplicationPeerConfig peerConfig - = replicationPeers.getReplicationPeerConfig(id); - ReplicationPeer peer = replicationPeers.getPeer(id); - ReplicationSourceInterface src = - getReplicationSource(this.conf, this.fs, this, this.replicationQueues, - this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { - this.sources.add(src); - this.walsById.put(id, new TreeSet()); + if (walsById.get(id) == null) { + this.walsById.put(id, new HashMap>()); + } // Add the latest wal to that source's queue - if (this.latestPath != null) { - String name = this.latestPath.getName(); - this.walsById.get(id).add(name); - try { - this.replicationQueues.addLog(src.getPeerClusterZnode(), name); - } catch (ReplicationException e) { - String message = - "Cannot add log to queue when creating a new source, queueId=" - + src.getPeerClusterZnode() + ", filename=" + name; - server.stop(message); - throw e; + synchronized (latestPaths) { + if (this.latestPaths.size() > 0) { + for (Path logPath : latestPaths) { + String name = logPath.getName(); + String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name); + SortedSet logs = new TreeSet(); + logs.add(name); + this.walsById.get(id).put(walPrefix, logs); + try { + this.replicationQueues.addLog(id, name); + } catch (ReplicationException e) { + String message = + "Cannot add log to queue when creating a new source, queueId=" + + id + ", filename=" + name; + server.stop(message); + throw e; + } + ReplicationSourceInterface src = getReplicationSource(id, walPrefix); + src.enqueueLog(logPath); + } } - src.enqueueLog(this.latestPath); } } - src.startup(); - return src; + } + + private ReplicationSourceInterface getReplicationSource(String id, String logPrefix) + throws IOException, ReplicationException { + for (ReplicationSourceInterface src : sources) { + if (src.getPeerClusterZnode().equals(id) && src.getGroupId().equals(logPrefix)) { + return src; + } + } + synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for + // the to-be-removed peer + ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); + ReplicationPeer peer = replicationPeers.getPeer(id); + ReplicationSourceInterface src = + getReplicationSource(this.conf, this.fs, this, this.replicationQueues, + this.replicationPeers, server, id, this.clusterId, peerConfig, peer, logPrefix); + sources.add(src); + src.startup(); + return src; + } } /** @@ -302,7 +335,7 @@ public class ReplicationSourceManager implements ReplicationListener { * Get a copy of the wals of the first source on this rs * @return a sorted set of wal names */ - protected Map> getWALs() { + protected Map>> getWALs() { return Collections.unmodifiableMap(walsById); } @@ -331,33 +364,104 @@ public class ReplicationSourceManager implements ReplicationListener { } void preLogRoll(Path newLog) throws IOException { - synchronized (this.walsById) { - String name = newLog.getName(); - for (ReplicationSourceInterface source : this.sources) { + checkAndAddSource(newLog); + String logName = newLog.getName(); + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName); + Path toRemove = null; + synchronized (latestPaths) { + for (Path path : latestPaths) { + if (path.getName().contains(logPrefix)) { + toRemove = path; + break; + } + } + if (toRemove != null) { + latestPaths.remove(toRemove); + } + this.latestPaths.add(newLog); + } + } + + /** + * Check and enqueue the given log to the correct source. If there's still no source for the + * group to which the given log belongs, create one + * @param logPath the log path to check and enqueue + * @throws IOException + */ + private void checkAndAddSource(Path logPath) throws IOException { + String logName = logPath.getName(); + String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName); + synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for + // the to-be-removed peer + for (String id : replicationPeers.getPeerIds()) { + // update replication queues try { - this.replicationQueues.addLog(source.getPeerClusterZnode(), name); + this.replicationQueues.addLog(id, logName); } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue with id=" - + source.getPeerClusterZnode() + ", filename=" + name, e); + String message = + "Cannot add log to queue when creating a new source, queueId=" + id + ", filename=" + + logName; + server.stop(message); + throw new IOException(e); + } + // update sources + boolean sourceExists = false; + for (ReplicationSourceInterface src : sources) { + if (src.getPeerClusterZnode().equals(id) && src.getGroupId().equals(logPrefix)) { + sourceExists = true; + break; + } + } + if (!sourceExists) { + ReplicationPeerConfig peerConfig; + try { + peerConfig = replicationPeers.getReplicationPeerConfig(id); + } catch (ReplicationException e) { + LOG.error("Failed to get replication peer config for peer " + id, e); + throw new IOException(e); + } + ReplicationPeer peer = replicationPeers.getPeer(id); + ReplicationSourceInterface src = + getReplicationSource(this.conf, this.fs, this, this.replicationQueues, + this.replicationPeers, server, id, this.clusterId, peerConfig, peer, logPrefix); + sources.add(src); + src.startup(); } } - for (SortedSet wals : this.walsById.values()) { - if (this.sources.isEmpty()) { - // If there's no slaves, don't need to keep the old wals since - // we only consider the last one when a new slave comes in - wals.clear(); + } + // update walsById map + synchronized (walsById) { + for (Map> walsByPrefix : this.walsById.values()) { + boolean existingPrefix = false; + for (Map.Entry> entry : walsByPrefix.entrySet()) { + SortedSet wals = entry.getValue(); + if (this.sources.isEmpty()) { + // If there's no slaves, don't need to keep the old wals since + // we only consider the last one when a new slave comes in + wals.clear(); + } + if (logPrefix.equals(entry.getKey())) { + wals.add(logName); + existingPrefix = true; + } + } + if (!existingPrefix) { + // The new log belongs to a new group, add it into this peer + SortedSet wals = new TreeSet(); + wals.add(logName); + walsByPrefix.put(logPrefix, wals); } - wals.add(name); } } - - this.latestPath = newLog; } void postLogRoll(Path newLog) throws IOException { + String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(newLog.getName()); // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { - source.enqueueLog(newLog); + if (source.getGroupId().equals(walPrefix)) { + source.enqueueLog(newLog); + } } } @@ -368,6 +472,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param manager the manager to use * @param server the server object for this region server * @param peerId the id of the peer cluster + * @param groupId the group this source is handling * @return the created source * @throws IOException */ @@ -375,8 +480,8 @@ public class ReplicationSourceManager implements ReplicationListener { final FileSystem fs, final ReplicationSourceManager manager, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final Server server, final String peerId, final UUID clusterId, - final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer) - throws IOException { + final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer, + final String groupId) throws IOException { RegionServerCoprocessorHost rsServerHost = null; TableDescriptors tableDescriptors = null; if (server instanceof HRegionServer) { @@ -414,14 +519,16 @@ public class ReplicationSourceManager implements ReplicationListener { } } } catch (Exception e) { - LOG.warn("Passed replication endpoint implementation throws errors", e); + LOG.warn("Passed replication endpoint implementation throws errors" + + " while initializing ReplicationSource for peer: " + peerId, e); throw new IOException(e); } - MetricsSource metrics = new MetricsSource(peerId); + String metricsId = (groupId == null) ? peerId : peerId + PEER_GROUP_DELIMITER + groupId; + MetricsSource metrics = new MetricsSource(metricsId); // init replication source src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, - clusterId, replicationEndpoint, metrics); + clusterId, replicationEndpoint, metrics, groupId); // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), @@ -470,7 +577,7 @@ public class ReplicationSourceManager implements ReplicationListener { + sources.size() + " and another " + oldsources.size() + " that were recovered"); String terminateMessage = "Replication stream was removed by a user"; - ReplicationSourceInterface srcToRemove = null; + List srcToRemove = new ArrayList(); List oldSourcesToDelete = new ArrayList(); // First close all the recovered sources for this peer @@ -486,19 +593,23 @@ public class ReplicationSourceManager implements ReplicationListener { LOG.info("Number of deleted recovered sources for " + id + ": " + oldSourcesToDelete.size()); // Now look for the one on this cluster - for (ReplicationSourceInterface src : this.sources) { - if (id.equals(src.getPeerClusterId())) { - srcToRemove = src; - break; + synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source + // for the to-be-removed peer + for (ReplicationSourceInterface src : this.sources) { + if (id.equals(src.getPeerClusterId())) { + srcToRemove.add(src); + } } + if (srcToRemove.size() == 0) { + LOG.error("The queue we wanted to close is missing " + id); + return; + } + for (ReplicationSourceInterface toRemove : srcToRemove) { + toRemove.terminate(terminateMessage); + this.sources.remove(toRemove); + } + deleteSource(id, true); } - if (srcToRemove == null) { - LOG.error("The queue we wanted to close is missing " + id); - return; - } - srcToRemove.terminate(terminateMessage); - this.sources.remove(srcToRemove); - deleteSource(id, true); } @Override @@ -599,7 +710,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface src = getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, - server, peerId, this.clusterId, peerConfig, peer); + server, peerId, this.clusterId, peerConfig, peer, null); if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java index e1417b2..7138d91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java @@ -49,19 +49,24 @@ public class BoundedRegionGroupingProvider extends RegionGroupingProvider { public void init(final WALFactory factory, final Configuration conf, final List listeners, final String providerId) throws IOException { super.init(factory, conf, listeners, providerId); - // no need to check for and close down old providers; our parent class will throw on re-invoke delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS))]; - for (int i = 0; i < delegates.length; i++) { - delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, - providerId + i); - } LOG.info("Configured to run with " + delegates.length + " delegate WAL providers."); } @Override - WALProvider populateCache(final byte[] group) { - final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length]; + WALProvider populateCache(final byte[] group) throws IOException { + int idx = counter.getAndIncrement() % delegates.length; + // there's IO cost to initialize the provider, so we only initialize it when necessary + // no need to check for and close down old providers; our parent class will throw on re-invoke + synchronized (delegates) {// synchronize to avoid duplicated initialization + if (delegates[idx] == null) { + delegates[idx] = + factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, providerId + + idx); + } + } + final WALProvider temp = delegates[idx]; final WALProvider extant = cached.putIfAbsent(group, temp); // if someone else beat us to initializing, just take what they set. // note that in such a case we skew load away from the provider we picked at first diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java index b41bbfb..b18978e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java @@ -363,4 +363,15 @@ public class DefaultWALProvider implements WALProvider { } } + /** + * Get prefix of the log from its name, assuming WAL name in format of + * log_prefix.filenumber.log_suffix @see {@link FSHLog#computeFilename()} + * @param name Name of the WAL to parse + * @return prefix of the log + */ + public static String getWALPrefixFromWALName(String name) { + int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf("."); + return name.substring(0, endIndex); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index eb2c426..1975a96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -123,9 +123,9 @@ class RegionGroupingProvider implements WALProvider { protected RegionGroupingStrategy strategy = null; - private WALFactory factory = null; - private List listeners = null; - private String providerId = null; + protected WALFactory factory = null; + protected List listeners = null; + protected String providerId = null; @Override public void init(final WALFactory factory, final Configuration conf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index f463f76..e8b11f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -37,15 +37,17 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { ReplicationSourceManager manager; String peerClusterId; Path currentPath; + String groupId; @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, - UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) + UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics, String walGroupId) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; + this.groupId = walGroupId; } @Override @@ -89,4 +91,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public String getStats() { return ""; } + + @Override + public String getGroupId() { + return groupId; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 451c39f..c9cad2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -83,6 +83,8 @@ public class TestReplicationEndpoint extends TestReplicationBase { for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) { utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName()); } + // multiple hlog roll requires more time, wait a while for its completion + Threads.sleep(100); } @Test (timeout=120000) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 571be26..c2923ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -25,6 +25,7 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; @@ -229,7 +230,11 @@ public class TestReplicationSourceManager { } wal.sync(); - assertEquals(6, manager.getWALs().get(slaveId).size()); + int logNumber = 0; + for (Map.Entry> entry : manager.getWALs().get(slaveId).entrySet()) { + logNumber += entry.getValue().size(); + } + assertEquals(6, logNumber); wal.rollWriter(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java index 577f0ba..8586d33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java @@ -39,8 +39,10 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -75,6 +77,7 @@ public class TestReplicationWALReaderManager { private int nbRows; private int walEditKVs; private final AtomicLong sequenceId = new AtomicLong(1); + @Rule public TestName tn = new TestName(); @Parameters public static Collection parameters() { @@ -127,7 +130,7 @@ public class TestReplicationWALReaderManager { List listeners = new ArrayList(); pathWatcher = new PathWatcher(); listeners.add(pathWatcher); - final WALFactory wals = new WALFactory(conf, listeners, "some server"); + final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName()); log = wals.getWAL(info.getEncodedNameAsBytes()); }