diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index fdc3d82677..d9d23bdd24 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -202,6 +202,12 @@ public final class HConstants { public static final int ZK_CFG_PROPERTY_PREFIX_LEN = ZK_CFG_PROPERTY_PREFIX.length(); + + /** Maximum time between the explicit refreshes of the RS list from ZK in RegionServerTracker */ + public static final String MAX_RST_REFRESH_PERIOD_MS = "hbase.max.zk.rs.refresh.period.ms"; + + public static final int DEFAULT_MAX_RST_REFRESH_PERIOD_MS = 120000; + /** * The ZK client port key in the ZK properties map. The name reflects the * fact that this is not an HBase configuration key. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index f419732eb8..4953544bb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; @@ -66,6 +68,8 @@ public class RegionServerTracker extends ZKListener { // move the operation to a single threaded thread pool in order to not block the zk event // processing since all the zk listener across HMaster will be called in one thread sequentially. private final ExecutorService executor; + private final Object refreshNotifier = new Object(); + private boolean needsRefresh = false; public RegionServerTracker(ZKWatcher watcher, MasterServices server, ServerManager serverManager) { @@ -74,6 +78,7 @@ public class RegionServerTracker extends ZKListener { this.serverManager = serverManager; this.executor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build()); + this.executor.execute(new RefreshRunnable()); } private Pair getServerInfo(String name) @@ -150,7 +155,8 @@ public class RegionServerTracker extends ZKListener { executor.shutdownNow(); } - private synchronized void refresh() { + /** Only called on the refresh thread. */ + private void refresh() { List names; try { names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); @@ -190,7 +196,61 @@ public class RegionServerTracker extends ZKListener { public void nodeChildrenChanged(String path) { if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() && !server.isStopped()) { - executor.execute(this::refresh); + synchronized (refreshNotifier) { + needsRefresh = true; + refreshNotifier.notifyAll(); + } } } + + + private class RefreshRunnable implements Runnable { + private final long maxIntervalNs; + + public RefreshRunnable() { + this.maxIntervalNs = TimeUnit.MILLISECONDS.toNanos(server.getConfiguration().getInt( + HConstants.MAX_RST_REFRESH_PERIOD_MS, HConstants.DEFAULT_MAX_RST_REFRESH_PERIOD_MS)); + } + + @Override + public void run() { + long nextRefreshNs = System.nanoTime() + maxIntervalNs; + // Loop forever. + while (!server.isStopped()) { + // Loop until someone triggers a refresh, or timeout expires. + while (true) { + if (server.isStopped()) { + return; + } + synchronized (refreshNotifier) { + if (needsRefresh) { + needsRefresh = false; + break; + } else { + long remainingNs = nextRefreshNs - System.nanoTime(); + if (remainingNs <= 0) { + LOG.debug("Refresh thread is calling refresh based on the timer"); + break; + } + try { + refreshNotifier.wait(TimeUnit.NANOSECONDS.toMillis(remainingNs)); + } catch (InterruptedException ex) { + LOG.info("Refresh thread was interrupted and will now exit"); + return; + } + } + } + } + + // Perform the refresh. + nextRefreshNs = System.nanoTime() + maxIntervalNs; + try { + refresh(); + } catch (Exception ex) { + LOG.warn("Failed to refresh the servers from ZK", ex); + } + } + } + } + }