From 371a03883cc39f7767ab052a2a1655ace5eb2f34 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 16 May 2018 18:43:00 +0800 Subject: [PATCH] HBASE-20561 The way we stop a ReplicationSource may cause the RS down --- .../replication/ZKReplicationQueueStorage.java | 98 ++++++++++++---------- .../regionserver/ReplicationSource.java | 24 +++++- .../regionserver/ReplicationSourceManager.java | 24 +++++- .../apache/hadoop/hbase/zookeeper/ZKWatcher.java | 4 +- 4 files changed, 100 insertions(+), 50 deletions(-) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index b9ebfb9..e530be9 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -174,8 +174,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); } catch (KeeperException e) { - throw new ReplicationException( - "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); + throw toReplicationException(e, + "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")"); } } @@ -185,8 +185,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); } catch (KeeperException e) { - throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName - + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + throw toReplicationException(e, + "Failed to add wal to queue (serverName=" + serverName + ", queueId=" + queueId + + ", fileName=" + fileName + ")"); } } @@ -199,8 +200,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } catch (NoNodeException e) { LOG.warn("{} already deleted when removing log", fileNode); } catch (KeeperException e) { - throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName + - ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + throw toReplicationException(e, + "Failed to remove wal from queue (serverName=" + serverName + ", queueId=" + queueId + + ", fileName=" + fileName + ")"); } } @@ -253,8 +255,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } } catch (KeeperException e) { - throw new ReplicationException("Failed to set log position (serverName=" + serverName - + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); + throw toReplicationException(e, "Failed to set log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")"); } } @@ -287,8 +289,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst(); } catch (KeeperException e) { - throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName=" - + encodedRegionName + ", peerId=" + peerId + ")", e); + throw toReplicationException(e, + "Failed to get last pushed sequence id (encodedRegionName=" + encodedRegionName + + ", peerId=" + peerId + ")"); } } @@ -308,8 +311,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase ZKUtil.multiOrSequential(zookeeper, listOfOps, true); } } catch (KeeperException e) { - throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId - + ", size of lastSeqIds=" + lastSeqIds.size(), e); + throw toReplicationException(e, + "Failed to set last sequence ids, peerId=" + peerId + ", size of lastSeqIds=" + lastSeqIds + .size()); } } @@ -343,7 +347,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase sb.setLength(regionsZNodeLength); } } catch (KeeperException e) { - throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e); + throw toReplicationException(e, "Failed to remove all last sequence ids, peerId=" + peerId); } } @@ -356,8 +360,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList()); ZKUtil.multiOrSequential(zookeeper, listOfOps, true); } catch (KeeperException e) { - throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId + - ", encodedRegionNames.size=" + encodedRegionNames.size(), e); + throw toReplicationException(e, + "Failed to remove last sequence ids, peerId=" + peerId + ", encodedRegionNames.size=" + + encodedRegionNames.size()); } } @@ -368,8 +373,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName)); } catch (KeeperException | InterruptedException e) { - throw new ReplicationException("Failed to get log position (serverName=" + serverName + - ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + throw toReplicationException(e, "Failed to get log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")"); } try { return ZKUtil.parseWALPositionFrom(bytes); @@ -388,10 +393,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.createWithParents(zookeeper, getRsNode(destServerName)); } catch (KeeperException e) { - throw new ReplicationException( - "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + - " failed when creating the node for " + destServerName, - e); + throw toReplicationException(e, + "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + + " failed when creating the node for " + destServerName); } try { String oldQueueNode = getQueueNode(sourceServerName, queueId); @@ -433,8 +437,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase queueId,sourceServerName, destServerName, e.toString()); return null; } catch (KeeperException | InterruptedException e) { - throw new ReplicationException("Claim queue queueId=" + queueId + " from " + - sourceServerName + " to " + destServerName + " failed", e); + throw toReplicationException(e, "Claim queue queueId=" + queueId + " from " + + sourceServerName + " to " + destServerName + " failed"); } } @@ -445,7 +449,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } catch (NotEmptyException e) { // keep silence to avoid logging too much. } catch (KeeperException e) { - throw new ReplicationException("Failed to remove replicator for " + serverName, e); + throw toReplicationException(e, "Failed to remove replicator for " + serverName); } } @@ -462,7 +466,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getListOfReplicators0(); } catch (KeeperException e) { - throw new ReplicationException("Failed to get list of replicators", e); + throw toReplicationException(e, "Failed to get list of replicators"); } } @@ -479,9 +483,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getWALsInQueue0(serverName, queueId); } catch (KeeperException e) { - throw new ReplicationException( - "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")", - e); + throw toReplicationException(e, + "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")"); } } @@ -495,7 +498,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getAllQueues0(serverName); } catch (KeeperException e) { - throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e); + throw toReplicationException(e, "Failed to get all queues (serverName=" + serverName + ")"); } } @@ -531,7 +534,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase v0, v1, retry); } } catch (KeeperException e) { - throw new ReplicationException("Failed to get all wals", e); + throw toReplicationException(e, "Failed to get all wals"); } } @@ -552,8 +555,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase ZKUtil.createWithParents(zookeeper, peerNode); } } catch (KeeperException e) { - throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", - e); + throw toReplicationException(e, + "Failed to add peer " + peerId + " to hfile reference queue"); } } @@ -562,14 +565,14 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase String peerNode = getHFileRefsPeerNode(peerId); try { if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { - LOG.debug("Peer {} not found in hfile reference queue.", peerNode); + LOG.debug("Peer {} not found in hfile reference queue.", peerNode); } else { LOG.info("Removing peer {} from hfile reference queue.", peerNode); ZKUtil.deleteNodeRecursively(zookeeper, peerNode); } } catch (KeeperException e) { - throw new ReplicationException( - "Failed to remove peer " + peerId + " from hfile reference queue.", e); + throw toReplicationException(e, + "Failed to remove peer " + peerId + " from hfile reference queue"); } } @@ -586,7 +589,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); } catch (KeeperException e) { - throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e); + throw toReplicationException(e, "Failed to add hfile reference to peer " + peerId); } } @@ -602,7 +605,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); } catch (KeeperException e) { - throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e); + throw toReplicationException(e, "Failed to remove hfile reference from peer " + peerId); } } @@ -616,8 +619,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getAllPeersFromHFileRefsQueue0(); } catch (KeeperException e) { - throw new ReplicationException("Failed to get list of all peers in hfile references node.", - e); + throw toReplicationException(e, "Failed to get list of all peers in hfile references node"); } } @@ -632,8 +634,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getReplicableHFiles0(peerId); } catch (KeeperException e) { - throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, - e); + throw toReplicationException(e, "Failed to get list of hfile references for peer " + peerId); } } @@ -644,7 +645,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); } catch (KeeperException e) { - throw new ReplicationException("Failed to get stat of replication hfile references node.", e); + throw toReplicationException(e, "Failed to get stat of replication hfile references node"); } return stat.getCversion(); } @@ -671,7 +672,18 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase v0, v1, retry); } } catch (KeeperException e) { - throw new ReplicationException("Failed to get all hfile refs", e); + throw toReplicationException(e, "Failed to get all hfile refs"); + } + } + + private ReplicationException toReplicationException(Exception e, String message) { + // As the zookeeper implementation will rethrow a SystemErrorException when received + // InterruptedException, here translate it back to InterruptedException + if (e instanceof KeeperException.SystemErrorException && e + .getCause() instanceof InterruptedException) { + return new ReplicationException(message, e.getCause()); + } else { + return new ReplicationException(message, e); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index b05a673..666146a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -498,9 +498,29 @@ public class ReplicationSource implements ReplicationSourceInterface { Collection workers = workerThreads.values(); for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - worker.entryReader.interrupt(); - worker.interrupt(); + worker.entryReader.setReaderRunning(false); } + + for (ReplicationSourceShipper worker : workers) { + if (worker.isAlive() || worker.entryReader.isAlive()) { + try { + // Wait worker to stop + Thread.sleep(this.sleepForRetries); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting " + worker.getName() + " to stop"); + Thread.currentThread().interrupt(); + } + // If worker still is alive after waiting, interrupt it + if (worker.isAlive()) { + worker.interrupt(); + } + // If entry reader is alive after waiting, interrupt it + if (worker.entryReader.isAlive()) { + worker.entryReader.interrupt(); + } + } + } + if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9b4a22c..11035ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,6 +450,20 @@ public class ReplicationSourceManager implements ReplicationListener { void exec() throws ReplicationException; } + private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + // When replication source was terminated, the thread will be interrupted. So need to + // handle it and not abort the region server + if (e.getCause() != null && e.getCause() instanceof InterruptedException) { + throw new RuntimeException( + "Thread is interrupted, the replication source may be terminated"); + } + server.abort("Failed to operate on replication queue", e); + } + } + private void abortWhenFail(ReplicationQueueOperation op) { try { op.exec(); @@ -484,8 +499,9 @@ public class ReplicationSourceManager implements ReplicationListener { public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, WALEntryBatch entryBatch) { String fileName = entryBatch.getLastWalPath().getName(); - abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, - entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); + interruptOrAbortWhenFail(() -> this.queueStorage + .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(), + entryBatch.getLastSeqIds())); cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered); } @@ -523,7 +539,7 @@ public class ReplicationSourceManager implements ReplicationListener { } LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); for (String wal : walSet) { - abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); + interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); } walSet.clear(); } @@ -886,7 +902,7 @@ public class ReplicationSourceManager implements ReplicationListener { } public void cleanUpHFileRefs(String peerId, List files) { - abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); + interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); } int activeFailoverTaskCount() { diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index c3cac5f..77e1918 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -605,7 +605,9 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { public void interruptedException(InterruptedException ie) throws KeeperException { interruptedExceptionNoThrow(ie, true); // Throw a system error exception to let upper level handle it - throw new KeeperException.SystemErrorException(); + KeeperException keeperException = new KeeperException.SystemErrorException(); + keeperException.initCause(ie); + throw keeperException; } /** -- 2.7.4