diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java index dbf00700b0..749ff05d47 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java @@ -429,6 +429,11 @@ public class ServerLoad implements ServerMetrics { return metrics.getLastReportTimestamp(); } + @Override + public long getReceivedTimestampNs() { + return metrics.getReceivedTimestampNs(); + } + /** * Originally, this method factored in the effect of requests going to the * server as well. However, this does not interact very well with the current diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 1e1d395e59..2bb4b55e78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -103,4 +103,8 @@ public interface ServerMetrics { */ long getLastReportTimestamp(); + /** + * @return The master-side timestamp of when the report was actually received + */ + long getReceivedTimestampNs(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index 333344ba52..4cae7a698d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -235,6 +235,7 @@ public final class ServerMetricsBuilder { private final Set coprocessorNames; private final long reportTimestamp; private final long lastReportTimestamp; + private final long receivedTimestampNs; ServerMetricsImpl(ServerName serverName, int versionNumber, String version, long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize, @@ -255,6 +256,12 @@ public final class ServerMetricsBuilder { this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames); this.reportTimestamp = reportTimestamp; this.lastReportTimestamp = lastReportTimestamp; + this.receivedTimestampNs = System.nanoTime(); + } + + @Override + public long getReceivedTimestampNs() { + return receivedTimestampNs; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index 9d33a21208..5e4dcff64b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -25,11 +25,14 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.VersionInfoUtil; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -59,6 +62,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe @InterfaceAudience.Private public class RegionServerTracker extends ZKListener { private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class); + private final Set regionServers = new HashSet<>(); private final ServerManager serverManager; private final MasterServices server; @@ -66,6 +70,8 @@ public class RegionServerTracker extends ZKListener { // move the operation to a single threaded thread pool in order to not block the zk event // processing since all the zk listener across HMaster will be called in one thread sequentially. private final ExecutorService executor; + private final Object refreshNotifier = new Object(); + private boolean needsRefresh = false; public RegionServerTracker(ZKWatcher watcher, MasterServices server, ServerManager serverManager) { @@ -74,6 +80,7 @@ public class RegionServerTracker extends ZKListener { this.serverManager = serverManager; this.executor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build()); + this.executor.execute(new RefreshRunnable()); } private Pair getServerInfo(String name) @@ -153,7 +160,8 @@ public class RegionServerTracker extends ZKListener { executor.shutdownNow(); } - private synchronized void refresh() { + /** Only called on the refresh thread. */ + private void refresh() { List names; try { names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); @@ -193,7 +201,67 @@ public class RegionServerTracker extends ZKListener { public void nodeChildrenChanged(String path) { if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() && !server.isStopped()) { - executor.execute(this::refresh); + synchronized (refreshNotifier) { + needsRefresh = true; + refreshNotifier.notifyAll(); + } + } + } + + + private class RefreshRunnable implements Runnable { + private final long maxStaleNs; + + public RefreshRunnable() { + int zkTimeout = server.getConfiguration().getInt( + HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + int rsMsgInterval = HRegionServer.getHeartbeatIntervalConfig(server.getConfiguration()); + this.maxStaleNs = TimeUnit.MILLISECONDS.toNanos(Math.max(zkTimeout, rsMsgInterval) * 2); + } + + @Override + public void run() { + // Make sure we don't refresh too often if multiple servers become stale in quick succession. + long nextRefreshNs = System.nanoTime() + maxStaleNs; + // Loop forever. + while (!server.isStopped()) { + // Loop until someone triggers a refresh, or timeout expires. + while (true) { + if (server.isStopped()) { + return; + } + synchronized (refreshNotifier) { + if (needsRefresh) { + needsRefresh = false; + break; + } else { + long minRemainingNs = nextRefreshNs - System.nanoTime(); + long rsStaleInNs = maxStaleNs - serverManager.getMaxServerReportAgeNs(); + if (minRemainingNs <= 0 && rsStaleInNs <= 0) { + LOG.debug("Refreshing znodes; a server didn't heartbeat in " + maxStaleNs + "ns"); + break; + } + // No refresh is needed - wait for the refresh time to pass before forcing it. + long sleepMs = TimeUnit.NANOSECONDS.toMillis(Math.max(rsStaleInNs, minRemainingNs)); + try { + refreshNotifier.wait(sleepMs); + } catch (InterruptedException ex) { + LOG.info("Refresh thread was interrupted and will now exit"); + return; + } + } + } + } + + // Perform the refresh. + nextRefreshNs = System.nanoTime() + maxStaleNs; + try { + refresh(); + } catch (Exception ex) { + LOG.warn("Failed to refresh the servers from ZK", ex); + } + } } } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 86d72d161b..e41b9377ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; @@ -283,6 +284,15 @@ public class ServerManager { } } + long getMaxServerReportAgeNs() { + long now = System.nanoTime(); + long minReceivedNs = now; + for (ServerMetrics sl : this.onlineServers.values()) { + minReceivedNs = Math.min(minReceivedNs, sl.getReceivedTimestampNs()); + } + return now - minReceivedNs; + } + @VisibleForTesting public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 34a6c13924..ffd73c1e16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -556,7 +556,7 @@ public class HRegionServer extends HasThread implements this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); - this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + this.msgInterval = getHeartbeatIntervalConfig(conf); this.sleeper = new Sleeper(this.msgInterval, this); @@ -659,6 +659,10 @@ public class HRegionServer extends HasThread implements } } + public static int getHeartbeatIntervalConfig(Configuration conf) { + return conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + } + // HMaster should override this method to load the specific config for master protected String getUseThisHostnameInstead(Configuration conf) throws IOException { String hostname = conf.get(RS_HOSTNAME_KEY);