diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 22c9b3c..c06a9df 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -126,6 +126,7 @@ import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; @@ -233,6 +234,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, */ Chore majorCompactionChecker; + private Thread loadPublisher; + // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes protected volatile HLog hlog; @@ -641,6 +644,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already + if (this.loadPublisher != null) this.loadPublisher.interrupt(); if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary(); @@ -716,7 +720,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, List tryRegionServerReport(final List outboundMessages) throws IOException { - this.serverInfo.setLoad(buildServerLoad()); this.requestCount.set(0); addOutboundMsgs(outboundMessages); HMsg [] msgs = null; @@ -755,7 +758,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } private HServerLoad buildServerLoad() { - MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + MemoryUsage memory = + ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); HServerLoad hsl = new HServerLoad(requestCount.get(), (int)(memory.getUsed() / 1024 / 1024), (int) (memory.getMax() / 1024 / 1024)); @@ -1028,6 +1032,40 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return this.fsOk; } + /** + * Get and start a Chore that updates server load into this servers ephemeral + * znode. + * @param server Instance of regionserver + * @return The started Chore thread + */ + private static Thread getLoadPublisherChore(final HRegionServer server) { + String name = server.getServerName() + ".LoadPublisher"; + int period = server.getConfiguration(). + getInt("hbase.regionserver.loadpublisher.period", 30000); + final HRegionServer rs = server; + // Start up the load balancer chore + Chore chore = new Chore(name, period, server) { + @Override + protected void chore() { + HServerLoad hsl = rs.buildServerLoad(); + byte [] bytes = null; + try { + bytes = Writables.getBytes(hsl); + } catch (IOException e) { + LOG.warn("Failed serializing load", e); + return; + } + try { + ZKUtil.setData(rs.getZooKeeper(), + rs.getZNodePath(rs.getZooKeeper(), rs.serverInfo), bytes); + } catch (KeeperException e) { + LOG.warn("Failed talking to zookeeper", e); + } + } + }; + return Threads.setDaemonThreadRunning(chore); + } + /* * Inner class that runs on a long period checking if regions need major * compaction. @@ -1246,6 +1284,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, handler); Threads.setDaemonThreadRunning(this.majorCompactionChecker, n + ".majorCompactionChecker", handler); + // Start up our load publishing chore. + this.loadPublisher = getLoadPublisherChore(this); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -1301,8 +1341,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } // Verify that all threads are alive if (!(leases.isAlive() && compactSplitThread.isAlive() - && cacheFlusher.isAlive() && hlogRoller.isAlive() - && this.majorCompactionChecker.isAlive())) { + && cacheFlusher.isAlive() && hlogRoller.isAlive() && + this.loadPublisher.isAlive() && + this.majorCompactionChecker.isAlive())) { stop("One or more threads are no longer alive -- stop"); return false; } @@ -1413,6 +1454,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, * have already been called. */ protected void join() { + Threads.shutdown(this.loadPublisher); Threads.shutdown(this.majorCompactionChecker); Threads.shutdown(this.cacheFlusher); Threads.shutdown(this.compactSplitThread); @@ -1506,10 +1548,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, try { this.requestCount.set(0); lastMsg = System.currentTimeMillis(); - ZKUtil.setAddressAndWatch(zooKeeper, - ZKUtil.joinZNode(zooKeeper.rsZNode, ZKUtil.getNodeName(serverInfo)), - this.serverInfo.getServerAddress()); - this.serverInfo.setLoad(buildServerLoad()); + // The content of znode is serialized load. When we start we have + // no load so serialize empty byte buffer. + ZKUtil.createEphemeralNodeAndWatch(zooKeeper, + getZNodePath(this.zooKeeper, this.serverInfo), HConstants.EMPTY_BYTE_ARRAY); LOG.info("Telling master at " + masterAddress + " that we are up"); result = this.hbaseMaster.regionServerStartup(this.serverInfo, EnvironmentEdgeManager.currentTimeMillis()); @@ -1534,6 +1576,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return result; } + String getZNodePath(final ZooKeeperWatcher zkw, final HServerInfo hsi) { + return ZKUtil.joinZNode(zooKeeper.rsZNode, ZKUtil.getNodeName(hsi)); + } + /** * Add to the outbound message buffer * diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java index 0437484..269175d 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java @@ -19,14 +19,19 @@ */ package org.apache.hadoop.hbase.zookeeper; +import java.io.IOException; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.zookeeper.KeeperException; /** @@ -41,9 +46,10 @@ import org.apache.zookeeper.KeeperException; */ public class RegionServerTracker extends ZooKeeperListener { private static final Log LOG = LogFactory.getLog(RegionServerTracker.class); - private ServerManager serverManager; private Abortable abortable; + private NavigableMap regionServers = + new TreeMap(); public RegionServerTracker(ZooKeeperWatcher watcher, Abortable abortable, ServerManager serverManager) { @@ -61,7 +67,7 @@ public class RegionServerTracker extends ZooKeeperListener { */ public void start() throws KeeperException { watcher.registerListener(this); - ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode); + update(ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode)); } @Override @@ -75,6 +81,9 @@ public class RegionServerTracker extends ZooKeeperListener { LOG.info("No HServerInfo found for " + serverName); return; } + synchronized (this.regionServers) { + this.regionServers.remove(hsi.getServerName()); + } serverManager.expireServer(hsi); } } @@ -83,7 +92,7 @@ public class RegionServerTracker extends ZooKeeperListener { public void nodeChildrenChanged(String path) { if(path.equals(watcher.rsZNode)) { try { - ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode); + update(ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode)); } catch (KeeperException e) { abortable.abort("Unexpected zk exception getting RS nodes", e); } @@ -91,11 +100,34 @@ public class RegionServerTracker extends ZooKeeperListener { } /** - * Gets the online servers. - * @return list of online servers from zk - * @throws KeeperException + * @param nodeAndData Update our Map of regionservers with latest loads. + */ + private void update(final List nodeAndData) { + synchronized (this.regionServers) { + for (NodeAndData nad: nodeAndData) { + String serverName = ZKUtil.getNodeName(nad.getNode()); + HServerLoad hsl = null; + try { + hsl = + (HServerLoad)Writables.getWritable(nad.getData(), new HServerLoad()); + } catch (IOException e) { + LOG.warn("Failed parse of " + serverName + " load"); + continue; + } + this.regionServers.put(serverName, hsl); + } + } + } + + /** + * @return Copy of our map of regionservers to load. */ - public List getOnlineServers() throws KeeperException { - return ZKUtil.listChildrenAndGetAsAddresses(watcher, watcher.rsZNode); + public NavigableMap getLoadings() { + NavigableMap result = + new TreeMap(); + synchronized (this.regionServers) { + result.putAll(this.regionServers); + } + return result; } -} +} \ No newline at end of file