From aa02c27086ed5b46e17af346a2353cf78ab81833 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 14 Apr 2018 15:40:13 +0800 Subject: [PATCH] HBASE-20417 Do not read wal entries when peer is disabled --- .../regionserver/ReplicationSourceShipper.java | 15 ++++++--------- .../regionserver/ReplicationSourceWALReader.java | 4 ++++ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 2097d00..11fd660 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -87,12 +87,11 @@ public class ReplicationSourceShipper extends Thread { setWorkerState(WorkerState.RUNNING); // Loop until we close down while (isActive()) { - int sleepMultiplier = 1; // Sleep until replication is enabled again if (!source.isPeerEnabled()) { - if (sleepForRetries("Replication is disabled", sleepMultiplier)) { - sleepMultiplier++; - } + // The peer enabled check is in memory, not expensive, so do not need to increase the + // sleep interval as it may cause a long lag when we enable the peer. + sleepForRetries("Replication is disabled", 1); continue; } try { @@ -188,8 +187,8 @@ public class ReplicationSourceShipper extends Thread { } break; } catch (Exception ex) { - LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:" - + org.apache.hadoop.util.StringUtils.stringifyException(ex)); + LOG.warn("{} threw unknown exception:", + source.getReplicationEndpoint().getClass().getName(), ex); if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { sleepMultiplier++; } @@ -292,9 +291,7 @@ public class ReplicationSourceShipper extends Thread { */ public boolean sleepForRetries(String msg, int sleepMultiplier) { try { - if (LOG.isTraceEnabled()) { - LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); - } + LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); Thread.sleep(this.sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping between retries"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 7ba347f..64fd48d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -126,6 +126,10 @@ class ReplicationSourceWALReader extends Thread { source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can + if (!source.isPeerEnabled()) { + Threads.sleep(sleepForRetries); + continue; + } if (!checkQuota()) { continue; } -- 2.7.4