From fb05d014570fc0500511e341e1f801040652fb74 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 3 Jan 2018 15:15:51 +0800 Subject: [PATCH] HBASE-19636 All rs should already start work with the new peer change when replication peer procedure is finished --- .../hbase/replication/ReplicationQueueInfo.java | 17 +- .../regionserver/PeerProcedureHandlerImpl.java | 2 + .../regionserver/RecoveredReplicationSource.java | 6 +- .../RecoveredReplicationSourceShipper.java | 8 +- .../regionserver/ReplicationSource.java | 32 +- .../regionserver/ReplicationSourceFactory.java | 4 +- .../regionserver/ReplicationSourceInterface.java | 8 +- .../regionserver/ReplicationSourceManager.java | 549 ++++++++++++--------- .../regionserver/ReplicationSourceShipper.java | 6 +- .../regionserver/ReplicationSourceWALReader.java | 2 +- .../hbase/replication/ReplicationSourceDummy.java | 2 +- .../regionserver/TestReplicationSourceManager.java | 2 +- 12 files changed, 349 insertions(+), 289 deletions(-) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java index ecd888f..20eb171 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java @@ -38,7 +38,7 @@ public class ReplicationQueueInfo { private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class); private final String peerId; - private final String peerClusterZnode; + private final String queueId; private boolean queueRecovered; // List of all the dead region servers that had this queue (if recovered) private List deadRegionServers = new ArrayList<>(); @@ -47,12 +47,12 @@ public class ReplicationQueueInfo { * The passed znode will be either the id of the peer cluster or * the handling story of that queue in the form of id-servername-* */ - public ReplicationQueueInfo(String znode) { - this.peerClusterZnode = znode; - String[] parts = znode.split("-", 2); + public ReplicationQueueInfo(String queueId) { + this.queueId = queueId; + String[] parts = queueId.split("-", 2); this.queueRecovered = parts.length != 1; this.peerId = this.queueRecovered ? - parts[0] : peerClusterZnode; + parts[0] : queueId; if (parts.length >= 2) { // extract dead servers extractDeadServersFromZNodeString(parts[1], this.deadRegionServers); @@ -119,11 +119,10 @@ public class ReplicationQueueInfo { return this.peerId; } - public String getPeerClusterZnode() { - return this.peerClusterZnode; + public String getQueueId() { + return this.queueId; } - public boolean isQueueRecovered() { - return queueRecovered; + public boolean isQueueRecovered() { return queueRecovered; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index c09c6a0..af315f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -66,6 +66,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { Lock peerLock = peersLock.acquireLock(peerId); try { newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); + replicationSourceManager.refreshSource(peerId); } finally { peerLock.unlock(); } @@ -89,6 +90,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { Lock peerLock = peersLock.acquireLock(peerId); try { replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); + replicationSourceManager.refreshSource(peerId); } finally { peerLock.unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 7bceb78..1be9a88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -81,7 +81,7 @@ public class RecoveredReplicationSource extends ReplicationSource { ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); Threads.setDaemonThreadRunning(walReader, threadName - + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, + + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, getUncaughtExceptionHandler()); return walReader; } @@ -178,8 +178,8 @@ public class RecoveredReplicationSource extends ReplicationSource { } } if (allTasksDone) { - manager.closeRecoveredQueue(this); - LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: " + manager.removeRecoveredSource(this); + LOG.info("Finished recovering queue " + queueId + " with the following stats: " + getStats()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index fb365bc..1e45496 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -77,7 +77,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper if (entryBatch.getWalEntries().isEmpty() && entryBatch.getLastSeqIds().isEmpty()) { LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + source.getPeerClusterZnode()); + + source.getQueueId()); source.getSourceMetrics().incrCompletedRecoveryQueue(); setWorkerState(WorkerState.FINISHED); continue; @@ -114,7 +114,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper // normally has a position (unless the RS failed between 2 logs) private long getRecoveredQueueStartPos() { long startPosition = 0; - String peerClusterZnode = source.getPeerClusterZnode(); + String peerClusterZnode = source.getQueueId(); try { startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(), peerClusterZnode, this.queue.peek().getName()); @@ -130,8 +130,8 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper @Override protected void updateLogPosition(long lastReadPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(), - lastReadPosition, true, false); + source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), + lastReadPosition, true); lastLoggedPosition = lastReadPosition; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 0b44ba4..2e9d72f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -106,7 +106,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // total number of edits we replicated private AtomicLong totalReplicatedEdits = new AtomicLong(0); // The znode we currently play with - protected String peerClusterZnode; + protected String queueId; // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; // Indicates if this particular source is running @@ -142,14 +142,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf * @param fs file system to use * @param manager replication manager to ping to * @param server the server for this region server - * @param peerClusterZnode the name of our znode + * @param queueId the id of our replication queue * @param clusterId unique UUID for the cluster * @param metrics metrics for replication source */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); @@ -168,8 +168,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.metrics = metrics; this.clusterId = clusterId; - this.peerClusterZnode = peerClusterZnode; - this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); + this.queueId = queueId; + this.replicationQueueInfo = new ReplicationQueueInfo(queueId); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); @@ -179,7 +179,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; - LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId + LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId + ", currentBandwidth=" + this.currentBandwidth); } @@ -217,11 +217,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf @Override public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { - String peerId = peerClusterZnode; + String peerId = queueId; if (peerId.contains("-")) { // peerClusterZnode will be in the form peerId + "-" + rsZNode. // A peerId will not have "-" in its name, see HBASE-11394 - peerId = peerClusterZnode.split("-")[0]; + peerId = queueId.split("-")[0]; } Map> tableCFMap = replicationPeer.getTableCFs(); if (tableCFMap != null) { @@ -311,7 +311,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); - this.manager.closeQueue(this); + this.manager.removeSource(this); return; } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); @@ -356,7 +356,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf ReplicationSourceWALReader walReader = new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode, + threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId, getUncaughtExceptionHandler()); } @@ -450,7 +450,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf LOG.error("Unexpected exception in ReplicationSource", e); } }; - Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, + Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId, handler); } @@ -466,9 +466,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public void terminate(String reason, Exception cause, boolean join) { if (cause == null) { - LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason); + LOG.info("Closing source " + this.queueId + " because: " + reason); } else { - LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason, + LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason, cause); } this.sourceRunning = false; @@ -491,7 +491,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" - + this.peerClusterZnode, + + this.queueId, te); } } @@ -499,8 +499,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } @Override - public String getPeerClusterZnode() { - return this.peerClusterZnode; + public String getQueueId() { + return this.queueId; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index 865a202..93e8331 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -32,8 +32,8 @@ public class ReplicationSourceFactory { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class); - static ReplicationSourceInterface create(Configuration conf, String peerId) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + static ReplicationSourceInterface create(Configuration conf, String queueId) { + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); ReplicationSourceInterface src; try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 4f10c73..d7cf9a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -51,7 +51,7 @@ public interface ReplicationSourceInterface { */ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; /** @@ -96,11 +96,11 @@ public interface ReplicationSourceInterface { Path getCurrentPath(); /** - * Get the id that the source is replicating to + * Get the queue id that the source is replicating to * - * @return peer cluster id + * @return queue id */ - String getPeerClusterZnode(); + String getQueueId(); /** * Get the id that the source is replicating to. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index fc978be..9ae4d79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -100,10 +100,14 @@ public class ReplicationSourceManager implements ReplicationListener { private final UUID clusterId; // All about stopping private final Server server; + // All logs we are currently tracking - // Index structure of the map is: peer_id->logPrefix/logGroup->logs + // Index structure of the map is: queue_id->logPrefix/logGroup->logs + // For normal replication source, the peer id is same with the queue id private final Map>> walsById; // Logs for recovered sources we are currently tracking + // the map is: queue_id->logPrefix/logGroup->logs + // For recovered source, the queue id's format is peer_id-servername-* private final Map>> walsByIdRecoveredQueues; private final Configuration conf; private final FileSystem fs; @@ -181,74 +185,22 @@ public class ReplicationSourceManager implements ReplicationListener { connection = ConnectionFactory.createConnection(conf); } - @FunctionalInterface - private interface ReplicationQueueOperation { - void exec() throws ReplicationException; - } - - private void abortWhenFail(ReplicationQueueOperation op) { - try { - op.exec(); - } catch (ReplicationException e) { - server.abort("Failed to operate on replication queue", e); - } - } - - /** - * Provide the id of the peer and a log key and this method will figure which - * wal it belongs to and will log, for this region server, the current - * position. It will also clean old logs from the queue. - * @param log Path to the log currently being replicated from - * replication status in zookeeper. It will also delete older entries. - * @param id id of the peer cluster - * @param position current location in the log - * @param queueRecovered indicates if this queue comes from another region server - * @param holdLogInZK if true then the log is retained in ZK - */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, - boolean holdLogInZK) { - String fileName = log.getName(); - abortWhenFail( - () -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position)); - if (holdLogInZK) { - return; - } - cleanOldLogs(fileName, id, queueRecovered); - } - /** - * Cleans a log file and all older files from ZK. Called when we are sure that a - * log file is closed and has no more entries. - * @param key Path to the log - * @param id id of the peer cluster - * @param queueRecovered Whether this is a recovered queue + * Adds a normal source per registered peer cluster and tries to process all old region server wal + * queues + *

+ * The returned future is for adoptAbandonedQueues task. */ - public void cleanOldLogs(String key, String id, boolean queueRecovered) { - String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key); - if (queueRecovered) { - SortedSet wals = walsByIdRecoveredQueues.get(id).get(logPrefix); - if (wals != null && !wals.first().equals(key)) { - cleanOldLogs(wals, key, id); - } - } else { - synchronized (this.walsById) { - SortedSet wals = walsById.get(id).get(logPrefix); - if (wals != null && !wals.first().equals(key)) { - cleanOldLogs(wals, key, id); - } + Future init() throws IOException, ReplicationException { + for (String id : this.replicationPeers.getAllPeerIds()) { + addSource(id); + if (replicationForBulkLoadDataEnabled) { + // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case + // when a peer was added before replication for bulk loaded data was enabled. + this.queueStorage.addPeerToHFileRefs(id); } } - } - - private void cleanOldLogs(SortedSet wals, String key, String id) { - SortedSet walSet = wals.headSet(key); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); - } - for (String wal : walSet) { - abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); - } - walSet.clear(); + return this.executor.submit(this::adoptAbandonedQueues); } private void adoptAbandonedQueues() { @@ -275,22 +227,83 @@ public class ReplicationSourceManager implements ReplicationListener { } } - /** - * Adds a normal source per registered peer cluster and tries to process all old region server wal - * queues - *

- * The returned future is for adoptAbandonedQueues task. - */ - Future init() throws IOException, ReplicationException { - for (String id : this.replicationPeers.getAllPeerIds()) { + public void addPeer(String id) throws ReplicationException, IOException { + LOG.info("Trying to add peer, peerId: " + id); + boolean added = this.replicationPeers.addPeer(id); + if (added) { + LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); addSource(id); if (replicationForBulkLoadDataEnabled) { - // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case - // when a peer was added before replication for bulk loaded data was enabled. this.queueStorage.addPeerToHFileRefs(id); } } - return this.executor.submit(this::adoptAbandonedQueues); + } + + /** + * Thie method first deletes all the recovered sources for the specified + * id, then deletes the normal source (deleting all related data in ZK). + * @param id The id of the peer cluster + */ + public void removePeer(String id) { + LOG.info("Closing the following queue " + id + ", currently have " + + sources.size() + " and another " + + oldsources.size() + " that were recovered"); + String terminateMessage = "Replication stream was removed by a user"; + List oldSourcesToDelete = new ArrayList<>(); + // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer + // see NodeFailoverWorker.run + synchronized (oldsources) { + // First close all the recovered sources for this peer + for (ReplicationSourceInterface src : oldsources) { + if (id.equals(src.getPeerId())) { + oldSourcesToDelete.add(src); + } + } + for (ReplicationSourceInterface src : oldSourcesToDelete) { + src.terminate(terminateMessage); + removeRecoveredSource(src); + } + } + LOG.info("Number of deleted recovered sources for " + id + ": " + + oldSourcesToDelete.size()); + // Now look for the one on this cluster + List srcToRemove = new ArrayList<>(); + synchronized (this.sources) { + for (ReplicationSourceInterface src : this.sources) { + if (id.equals(src.getPeerId())) { + srcToRemove.add(src); + } + } + if (srcToRemove.isEmpty()) { + LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " + + "This could mean that ReplicationSourceInterface initialization failed for this peer " + + "and that replication on this peer may not be caught up. peerId=" + id); + } + for (ReplicationSourceInterface toRemove : srcToRemove) { + toRemove.terminate(terminateMessage); + removeSource(toRemove); + } + } + + this.replicationPeers.removePeer(id); + // Remove HFile Refs znode from zookeeper + abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id)); + } + + /** + * Factory method to create a replication source + * @param queueId the id of the replication queue + * @return the created source + */ + private ReplicationSourceInterface createSource(String queueId, + ReplicationPeer replicationPeer) throws IOException { + ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); + + MetricsSource metrics = new MetricsSource(queueId); + // init replication source + src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, + walFileLengthProvider, metrics); + return src; } /** @@ -302,9 +315,11 @@ public class ReplicationSourceManager implements ReplicationListener { @VisibleForTesting ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeer peer = replicationPeers.getPeer(id); - ReplicationSourceInterface src = getReplicationSource(id, peer); - synchronized (this.walsById) { + ReplicationSourceInterface src = createSource(id, peer); + synchronized (this.sources) { this.sources.add(src); + } + synchronized (this.walsById) { Map> walsByGroup = new HashMap<>(); this.walsById.put(id, walsByGroup); // Add the latest wal to that source's queue @@ -333,80 +348,161 @@ public class ReplicationSourceManager implements ReplicationListener { return src; } - @VisibleForTesting - int getSizeOfLatestPath() { - synchronized (latestPaths) { - return latestPaths.size(); - } - } - /** - * Delete a complete queue of wals associated with a peer cluster - * @param peerId Id of the peer cluster queue of wals to delete + * Close the previous replication source of this peer id and open new one to trigger the + * new replication state changes or new replication config changes + * @param peerId + * @throws IOException */ - public void deleteSource(String peerId, boolean closeConnection) { - abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); - if (closeConnection) { - this.replicationPeers.removePeer(peerId); + public void refreshSource(String peerId) throws IOException { + String terminateMessage = "Peer " + peerId + + " state or config changed. Will close the previous replication source and open a new one"; + List toRemove = new ArrayList<>(); + for (ReplicationSourceInterface src : this.sources) { + if (src.getPeerId().equals(peerId)) { + src.terminate(terminateMessage); + toRemove.add(src); + } } - } + synchronized (this.sources) { + for (ReplicationSourceInterface replicationSource : toRemove) { + this.sources.remove(replicationSource); + } + } + ReplicationPeer peer = replicationPeers.getPeer(peerId); + ReplicationSourceInterface src = createSource(peerId, peer); + // Synchronized on walsById because there may has new log + synchronized (this.walsById) { + this.sources.add(src); + for (SortedSet walsByGroup : walsById.get(peerId).values()) { + walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); + } + } + src.startup(); - /** - * Terminate the replication on this region server - */ - public void join() { - this.executor.shutdown(); - for (ReplicationSourceInterface source : this.sources) { - source.terminate("Region server is closing"); + List toStartup = new ArrayList<>(); + synchronized (this.oldsources) { + List previousQueueIds = new ArrayList<>(); + for (ReplicationSourceInterface oldSource : this.oldsources) { + if (oldSource.getPeerId().equals(peerId)) { + previousQueueIds.add(oldSource.getQueueId()); + oldSource.terminate(terminateMessage); + this.oldsources.remove(oldSource); + } + } + for (String queueId : previousQueueIds) { + ReplicationSourceInterface replicationSource = createSource(queueId, peer); + this.oldsources.add(replicationSource); + for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { + walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); + } + toStartup.add(replicationSource); + } + } + for (ReplicationSourceInterface replicationSource : oldsources) { + replicationSource.startup(); } } /** - * Get a copy of the wals of the first source on this rs - * @return a sorted set of wal names + * Clear the references to the specified old source + * @param src source to clear */ - @VisibleForTesting - Map>> getWALs() { - return Collections.unmodifiableMap(walsById); + void removeRecoveredSource(ReplicationSourceInterface src) { + LOG.info("Done with the recovered queue " + src.getQueueId()); + src.getSourceMetrics().clear(); + this.oldsources.remove(src); + // Delete queue from storage and memory + deleteQueue(src.getQueueId()); + this.walsByIdRecoveredQueues.remove(src.getQueueId()); } /** - * Get a copy of the wals of the recovered sources on this rs - * @return a sorted set of wal names + * Clear the references to the specified old source + * @param src source to clear */ - @VisibleForTesting - Map>> getWalsByIdRecoveredQueues() { - return Collections.unmodifiableMap(walsByIdRecoveredQueues); + void removeSource(ReplicationSourceInterface src) { + LOG.info("Done with the queue " + src.getQueueId()); + src.getSourceMetrics().clear(); + this.sources.remove(src); + // Delete queue from storage and memory + deleteQueue(src.getQueueId()); + this.walsById.remove(src.getQueueId()); } /** - * Get a list of all the normal sources of this rs - * @return lis of all sources + * Delete a complete queue of wals associated with a peer cluster + * @param peerId Id of the peer cluster queue of wals to delete */ - public List getSources() { - return this.sources; + private void deleteQueue(String peerId) { + abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); + } + + @FunctionalInterface + private interface ReplicationQueueOperation { + void exec() throws ReplicationException; + } + + private void abortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + server.abort("Failed to operate on replication queue", e); + } } /** - * Get a list of all the old sources of this rs - * @return list of all old sources + * Provide the id of the peer and a log key and this method will figure which + * wal it belongs to and will log, for this region server, the current + * position. It will also clean old logs from the queue. + * @param log Path to the log currently being replicated from + * replication status in zookeeper. It will also delete older entries. + * @param id id of the peer cluster + * @param position current location in the log + * @param queueRecovered indicates if this queue comes from another region server */ - public List getOldSources() { - return this.oldsources; + public void logPositionAndCleanOldLogs(Path log, String id, long position, + boolean queueRecovered) { + String fileName = log.getName(); + abortWhenFail( + () -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position)); + cleanOldLogs(fileName, id, queueRecovered); } /** - * Get the normal source for a given peer - * @param peerId - * @return the normal source for the give peer if it exists, otherwise null. + * Cleans a log file and all older files from ZK. Called when we are sure that a + * log file is closed and has no more entries. + * @param key Path to the log + * @param id id of the peer cluster + * @param queueRecovered Whether this is a recovered queue */ - public ReplicationSourceInterface getSource(String peerId) { - return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null); + @VisibleForTesting + void cleanOldLogs(String key, String id, boolean queueRecovered) { + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key); + if (queueRecovered) { + SortedSet wals = walsByIdRecoveredQueues.get(id).get(logPrefix); + if (wals != null && !wals.first().equals(key)) { + cleanOldLogs(wals, key, id); + } + } else { + synchronized (this.walsById) { + SortedSet wals = walsById.get(id).get(logPrefix); + if (wals != null && !wals.first().equals(key)) { + cleanOldLogs(wals, key, id); + } + } + } } - @VisibleForTesting - List getAllQueues() throws ReplicationException { - return queueStorage.getAllQueues(server.getServerName()); + private void cleanOldLogs(SortedSet wals, String key, String id) { + SortedSet walSet = wals.headSet(key); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); + } + for (String wal : walSet) { + abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); + } + walSet.clear(); } void preLogRoll(Path newLog) throws IOException { @@ -483,25 +579,9 @@ public class ReplicationSourceManager implements ReplicationListener { } } - @VisibleForTesting - public AtomicLong getTotalBufferUsed() { - return totalBufferUsed; - } - - /** - * Factory method to create a replication source - * @param peerId the id of the peer cluster - * @return the created source - */ - private ReplicationSourceInterface getReplicationSource(String peerId, - ReplicationPeer replicationPeer) throws IOException { - ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId); - - MetricsSource metrics = new MetricsSource(peerId); - // init replication source - src.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, clusterId, - walFileLengthProvider, metrics); - return src; + @Override + public void regionServerRemoved(String regionserver) { + transferQueues(ServerName.valueOf(regionserver)); } /** @@ -524,100 +604,6 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Clear the references to the specified old source - * @param src source to clear - */ - public void closeRecoveredQueue(ReplicationSourceInterface src) { - LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); - if (src instanceof ReplicationSource) { - ((ReplicationSource) src).getSourceMetrics().clear(); - } - this.oldsources.remove(src); - deleteSource(src.getPeerClusterZnode(), false); - this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); - } - - /** - * Clear the references to the specified old source - * @param src source to clear - */ - public void closeQueue(ReplicationSourceInterface src) { - LOG.info("Done with the queue " + src.getPeerClusterZnode()); - src.getSourceMetrics().clear(); - this.sources.remove(src); - deleteSource(src.getPeerClusterZnode(), true); - this.walsById.remove(src.getPeerClusterZnode()); - } - - public void addPeer(String id) throws ReplicationException, IOException { - LOG.info("Trying to add peer, peerId: " + id); - boolean added = this.replicationPeers.addPeer(id); - if (added) { - LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); - addSource(id); - if (replicationForBulkLoadDataEnabled) { - this.queueStorage.addPeerToHFileRefs(id); - } - } - } - - /** - * Thie method first deletes all the recovered sources for the specified - * id, then deletes the normal source (deleting all related data in ZK). - * @param id The id of the peer cluster - */ - public void removePeer(String id) { - LOG.info("Closing the following queue " + id + ", currently have " - + sources.size() + " and another " - + oldsources.size() + " that were recovered"); - String terminateMessage = "Replication stream was removed by a user"; - List oldSourcesToDelete = new ArrayList<>(); - // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer - // see NodeFailoverWorker.run - synchronized (oldsources) { - // First close all the recovered sources for this peer - for (ReplicationSourceInterface src : oldsources) { - if (id.equals(src.getPeerId())) { - oldSourcesToDelete.add(src); - } - } - for (ReplicationSourceInterface src : oldSourcesToDelete) { - src.terminate(terminateMessage); - closeRecoveredQueue(src); - } - } - LOG.info("Number of deleted recovered sources for " + id + ": " - + oldSourcesToDelete.size()); - // Now look for the one on this cluster - List srcToRemove = new ArrayList<>(); - // synchronize on replicationPeers to avoid adding source for the to-be-removed peer - synchronized (this.replicationPeers) { - for (ReplicationSourceInterface src : this.sources) { - if (id.equals(src.getPeerId())) { - srcToRemove.add(src); - } - } - if (srcToRemove.isEmpty()) { - LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " + - "This could mean that ReplicationSourceInterface initialization failed for this peer " + - "and that replication on this peer may not be caught up. peerId=" + id); - } - for (ReplicationSourceInterface toRemove : srcToRemove) { - toRemove.terminate(terminateMessage); - closeQueue(toRemove); - } - deleteSource(id, true); - } - // Remove HFile Refs znode from zookeeper - abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id)); - } - - @Override - public void regionServerRemoved(String regionserver) { - transferQueues(ServerName.valueOf(regionserver)); - } - - /** * Class responsible to setup new ReplicationSources to take care of the * queues from dead region servers. */ @@ -681,23 +667,23 @@ public class ReplicationSourceManager implements ReplicationListener { } for (Map.Entry> entry : newQueues.entrySet()) { - String peerId = entry.getKey(); + String queueId = entry.getKey(); Set walsSet = entry.getValue(); try { // there is not an actual peer defined corresponding to peerId for the failover. - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); String actualPeerId = replicationQueueInfo.getPeerId(); ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); if (peer == null) { LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + ", peer is null"); - abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); + abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); continue; } // track sources in walsByIdRecoveredQueues Map> walsByGroup = new HashMap<>(); - walsByIdRecoveredQueues.put(peerId, walsByGroup); + walsByIdRecoveredQueues.put(queueId, walsByGroup); for (String wal : walsSet) { String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); SortedSet wals = walsByGroup.get(walPrefix); @@ -709,13 +695,13 @@ public class ReplicationSourceManager implements ReplicationListener { } // enqueue sources - ReplicationSourceInterface src = getReplicationSource(peerId, peer); + ReplicationSourceInterface src = createSource(queueId, peer); // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // see removePeer synchronized (oldsources) { if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) { src.terminate("Recovered queue doesn't belong to any current peer"); - closeRecoveredQueue(src); + removeRecoveredSource(src); continue; } oldsources.add(src); @@ -733,6 +719,77 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + * Terminate the replication on this region server + */ + public void join() { + this.executor.shutdown(); + for (ReplicationSourceInterface source : this.sources) { + source.terminate("Region server is closing"); + } + } + + /** + * Get a copy of the wals of the first source on this rs + * @return a sorted set of wal names + */ + @VisibleForTesting + Map>> getWALs() { + return Collections.unmodifiableMap(walsById); + } + + /** + * Get a copy of the wals of the recovered sources on this rs + * @return a sorted set of wal names + */ + @VisibleForTesting + Map>> getWalsByIdRecoveredQueues() { + return Collections.unmodifiableMap(walsByIdRecoveredQueues); + } + + /** + * Get a list of all the normal sources of this rs + * @return lis of all sources + */ + public List getSources() { + return this.sources; + } + + /** + * Get a list of all the old sources of this rs + * @return list of all old sources + */ + public List getOldSources() { + return this.oldsources; + } + + /** + * Get the normal source for a given peer + * @param peerId + * @return the normal source for the give peer if it exists, otherwise null. + */ + @VisibleForTesting + public ReplicationSourceInterface getSource(String peerId) { + return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null); + } + + @VisibleForTesting + List getAllQueues() throws ReplicationException { + return queueStorage.getAllQueues(server.getServerName()); + } + + @VisibleForTesting + int getSizeOfLatestPath() { + synchronized (latestPaths) { + return latestPaths.size(); + } + } + + @VisibleForTesting + public AtomicLong getTotalBufferUsed() { + return totalBufferUsed; + } + + /** * Get the directory where wals are archived * @return the directory where wals are archived */ @@ -764,7 +821,9 @@ public class ReplicationSourceManager implements ReplicationListener { * Get the ReplicationPeers used by this ReplicationSourceManager * @return the ReplicationPeers used by this ReplicationSourceManager */ - public ReplicationPeers getReplicationPeers() {return this.replicationPeers;} + public ReplicationPeers getReplicationPeers() { + return this.replicationPeers; + } /** * Get a string representation of all the sources' metrics diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index ea98cda..808f738 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -277,8 +277,8 @@ public class ReplicationSourceShipper extends Thread { } protected void updateLogPosition(long lastReadPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(), - lastReadPosition, false, false); + source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), + lastReadPosition, false); lastLoggedPosition = lastReadPosition; } @@ -295,7 +295,7 @@ public class ReplicationSourceShipper extends Thread { public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + "," - + source.getPeerClusterZnode(), handler); + + source.getQueueId(), handler); } public PriorityBlockingQueue getLogQueue() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 1ec797f..e2201e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -115,7 +115,7 @@ public class ReplicationSourceWALReader extends Thread { this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); - LOG.info("peerClusterZnode=" + source.getPeerClusterZnode() + LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 38ec598..ff20ddc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -89,7 +89,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public String getPeerClusterZnode() { + public String getQueueId() { return peerClusterId; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 46b53b5..21559d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -313,7 +313,7 @@ public abstract class TestReplicationSourceManager { wal.rollWriter(); manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, false, false); + "1", 0, false); wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), -- 1.9.1