From ba27f41eae643c7d9e53bda2238aef321691084c Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 16 May 2018 18:43:00 +0800 Subject: [PATCH] HBASE-20561 The way we stop a ReplicationSource may cause the RS down --- .../replication/ZKReplicationQueueStorage.java | 97 ++++++++++++---------- .../regionserver/ReplicationSourceManager.java | 15 ++++ 2 files changed, 69 insertions(+), 43 deletions(-) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index b9ebfb9..fa3eda5 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -174,8 +174,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); } catch (KeeperException e) { - throw new ReplicationException( - "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); + throw toReplicationException(e, + "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")"); } } @@ -185,8 +185,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); } catch (KeeperException e) { - throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName - + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + throw toReplicationException(e, + "Failed to add wal to queue (serverName=" + serverName + ", queueId=" + queueId + + ", fileName=" + fileName + ")"); } } @@ -199,8 +200,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } catch (NoNodeException e) { LOG.warn("{} already deleted when removing log", fileNode); } catch (KeeperException e) { - throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName + - ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + throw toReplicationException(e, + "Failed to remove wal from queue (serverName=" + serverName + ", queueId=" + queueId + + ", fileName=" + fileName + ")"); } } @@ -253,8 +255,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } } catch (KeeperException e) { - throw new ReplicationException("Failed to set log position (serverName=" + serverName - + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); + throw toReplicationException(e, "Failed to set log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")"); } } @@ -287,8 +289,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst(); } catch (KeeperException e) { - throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName=" - + encodedRegionName + ", peerId=" + peerId + ")", e); + throw toReplicationException(e, + "Failed to get last pushed sequence id (encodedRegionName=" + encodedRegionName + + ", peerId=" + peerId + ")"); } } @@ -308,8 +311,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase ZKUtil.multiOrSequential(zookeeper, listOfOps, true); } } catch (KeeperException e) { - throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId - + ", size of lastSeqIds=" + lastSeqIds.size(), e); + throw toReplicationException(e, + "Failed to set last sequence ids, peerId=" + peerId + ", size of lastSeqIds=" + lastSeqIds + .size()); } } @@ -343,7 +347,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase sb.setLength(regionsZNodeLength); } } catch (KeeperException e) { - throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e); + throw toReplicationException(e, "Failed to remove all last sequence ids, peerId=" + peerId); } } @@ -356,8 +360,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList()); ZKUtil.multiOrSequential(zookeeper, listOfOps, true); } catch (KeeperException e) { - throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId + - ", encodedRegionNames.size=" + encodedRegionNames.size(), e); + throw toReplicationException(e, + "Failed to remove last sequence ids, peerId=" + peerId + ", encodedRegionNames.size=" + + encodedRegionNames.size()); } } @@ -368,8 +373,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName)); } catch (KeeperException | InterruptedException e) { - throw new ReplicationException("Failed to get log position (serverName=" + serverName + - ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + throw toReplicationException(e, "Failed to get log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")"); } try { return ZKUtil.parseWALPositionFrom(bytes); @@ -388,10 +393,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.createWithParents(zookeeper, getRsNode(destServerName)); } catch (KeeperException e) { - throw new ReplicationException( - "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + - " failed when creating the node for " + destServerName, - e); + throw toReplicationException(e, + "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + + " failed when creating the node for " + destServerName); } try { String oldQueueNode = getQueueNode(sourceServerName, queueId); @@ -433,8 +437,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase queueId,sourceServerName, destServerName, e.toString()); return null; } catch (KeeperException | InterruptedException e) { - throw new ReplicationException("Claim queue queueId=" + queueId + " from " + - sourceServerName + " to " + destServerName + " failed", e); + throw toReplicationException(e, "Claim queue queueId=" + queueId + " from " + + sourceServerName + " to " + destServerName + " failed"); } } @@ -445,7 +449,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } catch (NotEmptyException e) { // keep silence to avoid logging too much. } catch (KeeperException e) { - throw new ReplicationException("Failed to remove replicator for " + serverName, e); + throw toReplicationException(e, "Failed to remove replicator for " + serverName); } } @@ -462,7 +466,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getListOfReplicators0(); } catch (KeeperException e) { - throw new ReplicationException("Failed to get list of replicators", e); + throw toReplicationException(e, "Failed to get list of replicators"); } } @@ -479,9 +483,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getWALsInQueue0(serverName, queueId); } catch (KeeperException e) { - throw new ReplicationException( - "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")", - e); + throw toReplicationException(e, + "Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")"); } } @@ -495,7 +498,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getAllQueues0(serverName); } catch (KeeperException e) { - throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e); + throw toReplicationException(e, "Failed to get all queues (serverName=" + serverName + ")"); } } @@ -531,7 +534,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase v0, v1, retry); } } catch (KeeperException e) { - throw new ReplicationException("Failed to get all wals", e); + throw toReplicationException(e, "Failed to get all wals"); } } @@ -552,8 +555,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase ZKUtil.createWithParents(zookeeper, peerNode); } } catch (KeeperException e) { - throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", - e); + throw toReplicationException(e, + "Failed to add peer " + peerId + " to hfile reference queue"); } } @@ -562,14 +565,14 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase String peerNode = getHFileRefsPeerNode(peerId); try { if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { - LOG.debug("Peer {} not found in hfile reference queue.", peerNode); + LOG.debug("Peer {} not found in hfile reference queue.", peerNode); } else { LOG.info("Removing peer {} from hfile reference queue.", peerNode); ZKUtil.deleteNodeRecursively(zookeeper, peerNode); } } catch (KeeperException e) { - throw new ReplicationException( - "Failed to remove peer " + peerId + " from hfile reference queue.", e); + throw toReplicationException(e, + "Failed to remove peer " + peerId + " from hfile reference queue"); } } @@ -586,7 +589,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); } catch (KeeperException e) { - throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e); + throw toReplicationException(e, "Failed to add hfile reference to peer " + peerId); } } @@ -602,7 +605,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); } catch (KeeperException e) { - throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e); + throw toReplicationException(e, "Failed to remove hfile reference from peer " + peerId); } } @@ -616,8 +619,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getAllPeersFromHFileRefsQueue0(); } catch (KeeperException e) { - throw new ReplicationException("Failed to get list of all peers in hfile references node.", - e); + throw toReplicationException(e, "Failed to get list of all peers in hfile references node"); } } @@ -632,8 +634,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { return getReplicableHFiles0(peerId); } catch (KeeperException e) { - throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, - e); + throw toReplicationException(e, "Failed to get list of hfile references for peer " + peerId); } } @@ -644,7 +645,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase try { ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); } catch (KeeperException e) { - throw new ReplicationException("Failed to get stat of replication hfile references node.", e); + throw toReplicationException(e, "Failed to get stat of replication hfile references node"); } return stat.getCversion(); } @@ -671,7 +672,17 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase v0, v1, retry); } } catch (KeeperException e) { - throw new ReplicationException("Failed to get all hfile refs", e); + throw toReplicationException(e, "Failed to get all hfile refs"); + } + } + + private ReplicationException toReplicationException(Exception e, String message) { + // As the zookeeper implementation will rethrow a SystemErrorException when received + // InterruptedException, here translate it back to InterruptedException + if (e instanceof KeeperException.SystemErrorException) { + return new ReplicationException(message, new InterruptedException()); + } else { + return new ReplicationException(message, e); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9b4a22c..75da8f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -453,6 +454,7 @@ public class ReplicationSourceManager implements ReplicationListener { try { op.exec(); } catch (ReplicationException e) { + handleInterruptedException(e.getCause()); server.abort("Failed to operate on replication queue", e); } } @@ -469,12 +471,25 @@ public class ReplicationSourceManager implements ReplicationListener { try { op.exec(); } catch (ReplicationException e) { + handleInterruptedException(e.getCause()); server.abort("Failed to operate on replication queue", e); throw new IOException(e); } } /** + * When replication source was terminated, the thread will be interrupted. So need to + * handle it and not abort the region server. + * + * @param t exception + */ + private void handleInterruptedException(Throwable t) { + if (t != null && t instanceof InterruptedException) { + throw new RuntimeException("Thread is interrupted, the replication source may be terminated"); + } + } + + /** * This method will log the current position to storage. And also clean old logs from the * replication queue. * @param queueId id of the replication queue -- 2.7.4