diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java index dbf00700b0..749ff05d47 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java @@ -429,6 +429,11 @@ public class ServerLoad implements ServerMetrics { return metrics.getLastReportTimestamp(); } + @Override + public long getReceivedTimestampNs() { + return metrics.getReceivedTimestampNs(); + } + /** * Originally, this method factored in the effect of requests going to the * server as well. However, this does not interact very well with the current diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 1e1d395e59..2bb4b55e78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -103,4 +103,8 @@ public interface ServerMetrics { */ long getLastReportTimestamp(); + /** + * @return The master-side timestamp of when the report was actually received + */ + long getReceivedTimestampNs(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index 333344ba52..4cae7a698d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -235,6 +235,7 @@ public final class ServerMetricsBuilder { private final Set coprocessorNames; private final long reportTimestamp; private final long lastReportTimestamp; + private final long receivedTimestampNs; ServerMetricsImpl(ServerName serverName, int versionNumber, String version, long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize, @@ -255,6 +256,12 @@ public final class ServerMetricsBuilder { this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames); this.reportTimestamp = reportTimestamp; this.lastReportTimestamp = lastReportTimestamp; + this.receivedTimestampNs = System.nanoTime(); + } + + @Override + public long getReceivedTimestampNs() { + return receivedTimestampNs; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index 9d33a21208..a2869e32f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -23,13 +23,14 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.VersionInfoUtil; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -40,8 +41,6 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; @@ -59,21 +58,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe @InterfaceAudience.Private public class RegionServerTracker extends ZKListener { private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class); + private final Set regionServers = new HashSet<>(); private final ServerManager serverManager; private final MasterServices server; // As we need to send request to zk when processing the nodeChildrenChanged event, we'd better - // move the operation to a single threaded thread pool in order to not block the zk event + // move the operation to a single thread in order to not block the zk event // processing since all the zk listener across HMaster will be called in one thread sequentially. - private final ExecutorService executor; + private final Thread thread; + private final Object refreshNotifier = new Object(); + private boolean needsRefresh = false; public RegionServerTracker(ZKWatcher watcher, MasterServices server, ServerManager serverManager) { super(watcher); this.server = server; this.serverManager = serverManager; - this.executor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build()); + this.thread = new Thread(new RefreshRunnable(), "RegionServerTracker-0"); + this.thread.setDaemon(true); + this.thread.start(); } private Pair getServerInfo(String name) @@ -150,10 +153,11 @@ public class RegionServerTracker extends ZKListener { } public void stop() { - executor.shutdownNow(); + thread.interrupt(); } - private synchronized void refresh() { + /** Only called on the refresh thread. */ + private void refresh() { List names; try { names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); @@ -193,7 +197,68 @@ public class RegionServerTracker extends ZKListener { public void nodeChildrenChanged(String path) { if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() && !server.isStopped()) { - executor.execute(this::refresh); + synchronized (refreshNotifier) { + needsRefresh = true; + refreshNotifier.notifyAll(); + } } } + + + private class RefreshRunnable implements Runnable { + private final long maxStaleNs; + + public RefreshRunnable() { + int zkTimeout = server.getConfiguration().getInt( + HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + int rsMsgInterval = HRegionServer.getHeartbeatIntervalConfig(server.getConfiguration()); + this.maxStaleNs = TimeUnit.MILLISECONDS.toNanos(Math.max(zkTimeout, rsMsgInterval) * 2); + } + + @Override + public void run() { + // Make sure we don't refresh too often if multiple servers become stale in quick succession. + long nextRefreshNs = System.nanoTime() + maxStaleNs; + // Loop forever. + while (!server.isStopped()) { + // Loop until someone triggers a refresh, or timeout expires. + while (true) { + if (server.isStopped()) { + LOG.info("Server has stopped; refresh thread will now exit"); + return; + } + synchronized (refreshNotifier) { + if (needsRefresh) { + needsRefresh = false; + break; + } else { + long minRemainingNs = nextRefreshNs - System.nanoTime(); + long rsStaleInNs = maxStaleNs - serverManager.getMaxServerReportAgeNs(); + if (minRemainingNs <= 0 && rsStaleInNs <= 0) { + LOG.debug("Refreshing znodes; a server didn't heartbeat in " + maxStaleNs + "ns"); + break; + } + // No refresh is needed - wait for the refresh time to pass before forcing it. + long sleepMs = TimeUnit.NANOSECONDS.toMillis(Math.max(rsStaleInNs, minRemainingNs)); + try { + refreshNotifier.wait(sleepMs); + } catch (InterruptedException ex) { + LOG.info("Refresh thread was interrupted and will now exit"); + return; + } + } + } + } + + // Perform the refresh. + nextRefreshNs = System.nanoTime() + maxStaleNs; + try { + refresh(); + } catch (Exception ex) { + LOG.warn("Failed to refresh the servers from ZK", ex); + } + } + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 0fb1551792..fb2c2738b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -283,6 +283,15 @@ public class ServerManager { } } + long getMaxServerReportAgeNs() { + long now = System.nanoTime(); + long minReceivedNs = now; + for (ServerMetrics sl : this.onlineServers.values()) { + minReceivedNs = Math.min(minReceivedNs, sl.getReceivedTimestampNs()); + } + return now - minReceivedNs; + } + @VisibleForTesting public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c897502753..e4fb293755 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -557,7 +557,7 @@ public class HRegionServer extends HasThread implements this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); - this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + this.msgInterval = getHeartbeatIntervalConfig(conf); this.sleeper = new Sleeper(this.msgInterval, this); @@ -660,6 +660,10 @@ public class HRegionServer extends HasThread implements } } + public static int getHeartbeatIntervalConfig(Configuration conf) { + return conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + } + // HMaster should override this method to load the specific config for master protected String getUseThisHostnameInstead(Configuration conf) throws IOException { String hostname = conf.get(RS_HOSTNAME_KEY);