From fe752d1a66d9c5bcc6929be33114750eb344883f Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Tue, 12 Jun 2018 22:19:39 +0800 Subject: [PATCH] HBASE-20561 The way we stop a ReplicationSource may cause the RS down --- .../regionserver/ReplicationSource.java | 24 ++++++++++++++++++++-- .../regionserver/ReplicationSourceManager.java | 24 ++++++++++++++++++---- .../apache/hadoop/hbase/zookeeper/ZKWatcher.java | 4 +++- 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d21d83c..b63712b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -499,9 +499,29 @@ public class ReplicationSource implements ReplicationSourceInterface { Collection workers = workerThreads.values(); for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); - worker.entryReader.interrupt(); - worker.interrupt(); + worker.entryReader.setReaderRunning(false); } + + for (ReplicationSourceShipper worker : workers) { + if (worker.isAlive() || worker.entryReader.isAlive()) { + try { + // Wait worker to stop + Thread.sleep(this.sleepForRetries); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting " + worker.getName() + " to stop"); + Thread.currentThread().interrupt(); + } + // If worker still is alive after waiting, interrupt it + if (worker.isAlive()) { + worker.interrupt(); + } + // If entry reader is alive after waiting, interrupt it + if (worker.entryReader.isAlive()) { + worker.entryReader.interrupt(); + } + } + } + if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9b4a22c..cbd48b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,6 +450,20 @@ public class ReplicationSourceManager implements ReplicationListener { void exec() throws ReplicationException; } + private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + // When replication source was terminated, the thread will be interrupted. So need to + // handle it and not abort the region server + if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException && e.getCause().getCause() != null && e.getCause().getCause() instanceof InterruptedException) { + throw new RuntimeException( + "Thread is interrupted, the replication source may be terminated"); + } + server.abort("Failed to operate on replication queue", e); + } + } + private void abortWhenFail(ReplicationQueueOperation op) { try { op.exec(); @@ -484,8 +499,9 @@ public class ReplicationSourceManager implements ReplicationListener { public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, WALEntryBatch entryBatch) { String fileName = entryBatch.getLastWalPath().getName(); - abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, - entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); + interruptOrAbortWhenFail(() -> this.queueStorage + .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(), + entryBatch.getLastSeqIds())); cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered); } @@ -523,7 +539,7 @@ public class ReplicationSourceManager implements ReplicationListener { } LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); for (String wal : walSet) { - abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); + interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); } walSet.clear(); } @@ -886,7 +902,7 @@ public class ReplicationSourceManager implements ReplicationListener { } public void cleanUpHFileRefs(String peerId, List files) { - abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); + interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); } int activeFailoverTaskCount() { diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index cceedfd..ce00af4 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -607,7 +607,9 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { public void interruptedException(InterruptedException ie) throws KeeperException { interruptedExceptionNoThrow(ie, true); // Throw a system error exception to let upper level handle it - throw new KeeperException.SystemErrorException(); + KeeperException keeperException = new KeeperException.SystemErrorException(); + keeperException.initCause(ie); + throw keeperException; } /** -- 2.7.4