From ea0f7deb5243fe492ab405f35c696a40502233e0 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 16 May 2018 18:43:00 +0800 Subject: [PATCH] HBASE-20561 The way we stop a ReplicationSource may cause the RS down --- .../regionserver/ReplicationSourceManager.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9b4a22c..0ea1836 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -453,6 +454,7 @@ public class ReplicationSourceManager implements ReplicationListener { try { op.exec(); } catch (ReplicationException e) { + handleSystemErrorException(e.getCause()); server.abort("Failed to operate on replication queue", e); } } @@ -469,12 +471,29 @@ public class ReplicationSourceManager implements ReplicationListener { try { op.exec(); } catch (ReplicationException e) { + handleSystemErrorException(e.getCause()); server.abort("Failed to operate on replication queue", e); throw new IOException(e); } } /** + * When replication source was terminated, the thread will interrupt. So need to + * handle SystemErrorException as zk client receive InterruptedException + * and will rethrow a SystemErrorException. + * + * @param t exception + */ + private void handleSystemErrorException(Throwable t) { + if (t != null && t instanceof KeeperException.SystemErrorException) { + if (Thread.currentThread().isInterrupted()) { + throw new RuntimeException( + "Thread is interrupted, the replication source may be terminated"); + } + } + } + + /** * This method will log the current position to storage. And also clean old logs from the * replication queue. * @param queueId id of the replication queue -- 2.7.4