Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1554987) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -836,8 +836,6 @@ } try { - // Set our ephemeral znode up in zookeeper now we have a name. - createMyEphemeralNode(); // Try and register with the Master; tell it we are here. Break if // server is stopped or the clusterup flag is down or hdfs went wacky. @@ -1226,6 +1224,9 @@ this.serverNameFromMasterPOV.toString()); } + // Set our ephemeral znode up in zookeeper now we have a name. + createMyEphemeralNode(); + // Save it in a file, this will allow to see if we crash ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1554987) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -37,7 +37,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClockOutOfSyncException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.Server; @@ -60,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Triple; import com.google.protobuf.ServiceException; @@ -132,6 +136,8 @@ private final DeadServer deadservers = new DeadServer(); + private final ServerTimeoutMonitor serverTimeoutMonitor; + private final long maxSkew; private final long warningSkew; @@ -190,6 +196,12 @@ maxSkew = c.getLong("hbase.master.maxclockskew", 30000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000); this.connection = connect ? HConnectionManager.getConnection(c) : null; + + int stmPeriod = c.getInt("hbase.master.servertimeoutmonitor.period", 10000); + this.serverTimeoutMonitor = new ServerTimeoutMonitor(stmPeriod, + c.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT)); + Threads.setDaemonThreadRunning(serverTimeoutMonitor.getThread(), master.getServerName() + + ".serverTimeoutMonitor"); } /** @@ -215,7 +227,7 @@ checkClockSkew(sn, serverCurrentTime); checkIsDead(sn, "STARTUP"); if (!checkAlreadySameHostPortAndRecordNewServer( - sn, ServerLoad.EMPTY_SERVERLOAD)) { + sn, ServerLoad.getEmptyServerload())) { LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn); } @@ -876,7 +888,7 @@ Map getRequeuedDeadServers() { return Collections.unmodifiableMap(this.requeuedDeadServers); } - + public boolean isServerOnline(ServerName serverName) { return serverName != null && onlineServers.containsKey(serverName); } @@ -906,6 +918,7 @@ * Stop the ServerManager. Currently closes the connection to the master. */ public void stop() { + this.serverTimeoutMonitor.interrupt(); if (connection != null) { try { connection.close(); @@ -964,7 +977,7 @@ } } } - + /** * To clear any dead server with same host name and port of any online server */ @@ -973,4 +986,30 @@ deadservers.cleanAllPreviousInstances(serverName); } } + + class ServerTimeoutMonitor extends Chore { + + private int timeout; + + public ServerTimeoutMonitor(int period, int timeout) { + super("ServerTimeoutMonitor", period, master); + this.timeout = timeout; + } + + @Override + protected void chore() { + List deadServers = new ArrayList(); + for (ServerName sn : onlineServers.keySet()) { + long now = EnvironmentEdgeManager.currentTimeMillis(); + long elapsed = now - onlineServers.get(sn).getTimeOfLastHearbeat(); + if (elapsed > timeout) { + LOG.warn("Server " + sn + " hasn't checked in since " + elapsed + "ms"); + deadServers.add(sn); + } + } + for (ServerName sn : deadServers) { + expireServer(sn); + } + } + } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java (revision 1554987) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Strings; import java.util.Arrays; @@ -53,7 +54,8 @@ private int totalStaticBloomSizeKB = 0; private long totalCompactingKVs = 0; private long currentCompactedKVs = 0; - + private long timeOfLastHearbeat; + public ServerLoad(ClusterStatusProtos.ServerLoad serverLoad) { this.serverLoad = serverLoad; for (ClusterStatusProtos.RegionLoad rl: serverLoad.getRegionLoadsList()) { @@ -71,7 +73,7 @@ totalCompactingKVs += rl.getTotalCompactingKVs(); currentCompactedKVs += rl.getCurrentCompactedKVs(); } - + timeOfLastHearbeat = EnvironmentEdgeManager.currentTimeMillis(); } // NOTE: Function name cannot start with "get" because then an OpenDataException is thrown because @@ -178,6 +180,10 @@ return serverLoad.getInfoServerPort(); } + public long getTimeOfLastHearbeat() { + return timeOfLastHearbeat; + } + /** * Originally, this method factored in the effect of requests going to the * server as well. However, this does not interact very well with the current @@ -301,4 +307,8 @@ public static final ServerLoad EMPTY_SERVERLOAD = new ServerLoad(ClusterStatusProtos.ServerLoad.newBuilder().build()); + + public static ServerLoad getEmptyServerload() { + return new ServerLoad(ClusterStatusProtos.ServerLoad.newBuilder().build()); + } }