Index: src/test/java/org/apache/hadoop/hbase/failuredetection/TestFailureDetection.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/failuredetection/TestFailureDetection.java +++ src/test/java/org/apache/hadoop/hbase/failuredetection/TestFailureDetection.java @@ -0,0 +1,108 @@ +package org.apache.hadoop.hbase.failuredetection; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.monitor.RegionServerFailureDetection; +import org.apache.hadoop.hbase.monitor.RegionServerFailureDetection.RegionServerProcess; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestFailureDetection { + private static final Log LOG = LogFactory + .getLog(TestRegionServerForFailureDetection.class); + private static RegionServerFailureDetection monitor; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeAllTests() throws Exception { + // Start a cluster of two regionservers. + TEST_UTIL.startMiniCluster(1); + Throwable t = null; + try { + monitor = new RegionServerFailureDetection(); + monitor.start(); + } catch (IOException e) { + t = e; + } catch (InterruptedException e) { + t = e; + } catch (KeeperException e) { + t = e; + } + if (t != null) { + Assert.assertFalse("Should not get exception " + + t.getClass().getSimpleName() + ": " + t.getMessage(), true); + } + } + + @AfterClass + public static void afterAllTests() throws IOException { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testFetchProcessFromRegionServer() throws InterruptedException { + try { + Method method = monitor.getClass().getDeclaredMethod( + "fetchProcessFromRegionServer", new Class[] {}); + method.setAccessible(true); + method.invoke(monitor, new Object[] {}); + // assert + Assert.assertEquals(1, monitor.getRsSet().size()); + HRegionServer rserver = TEST_UTIL.getHBaseCluster().getRegionServer(0); + String processName = java.lang.management.ManagementFactory + .getRuntimeMXBean().getName(); + long pid = Long.parseLong(processName.split("@")[0]); + String rsZkNode = ZKUtil.getNodeName(rserver.getHServerInfo()); + // assert + RegionServerProcess expectedRSP = monitor.new RegionServerProcess(pid, + rsZkNode); + RegionServerProcess actualRSP = monitor.getRsSet().getIterator() + .iterator().next(); + Assert.assertEquals(expectedRSP, actualRSP); + } catch (Throwable e) { + Assert.fail("Should not get exception " + e.getClass().getSimpleName() + + ": " + e); + } + } + + @Test + public void testCheckStateWetherExsits() { + Method method; + Object result_true = null; + Object result_pid_wrong = null; + Object result_pid_add_wrong = null; + InetSocketAddress add; + long pid; + try { + method = monitor.getClass().getDeclaredMethod("checkStateWetherExsits", + new Class[] { long.class, InetSocketAddress.class }); + method.setAccessible(true); + HRegionServer rserver = TEST_UTIL.getHBaseCluster().getRegionServer(0); + String processName = java.lang.management.ManagementFactory + .getRuntimeMXBean().getName(); + pid = Long.parseLong(processName.split("@")[0]); + add = rserver.getHServerInfo().getServerAddress().getInetSocketAddress(); + result_true = method.invoke(monitor, new Object[] { pid, add }); + result_pid_wrong = method.invoke(monitor, new Object[] { 65536, add }); + result_pid_add_wrong = method.invoke(monitor, new Object[] { 65536, + new InetSocketAddress("wrong.host.com", 6000) }); + } catch (Throwable e) { + Assert.fail("Should not get exception " + e.getClass().getSimpleName() + + ": " + e); + } + Assert.assertTrue((Boolean) result_true); + Assert.assertTrue((Boolean) result_pid_wrong); + Assert.assertFalse((Boolean) result_pid_add_wrong); + } +} Index: src/test/java/org/apache/hadoop/hbase/failuredetection/TestRegionServerForFailureDetection.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/failuredetection/TestRegionServerForFailureDetection.java +++ src/test/java/org/apache/hadoop/hbase/failuredetection/TestRegionServerForFailureDetection.java @@ -0,0 +1,94 @@ +package org.apache.hadoop.hbase.failuredetection; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.monitor.RegionServerFailureDetection; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRegionServerForFailureDetection { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Log LOG = LogFactory + .getLog(TestRegionServerForFailureDetection.class); + + // private static final byte[] TABLENAME = + // Bytes.toBytes("TestFailureDetection"); + // private static final byte[] FAMILYNAME = Bytes.toBytes("fam"); + + @BeforeClass + public static void beforeAllTests() throws Exception { + // Start a cluster of two regionservers. + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void afterAllTests() throws IOException { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * test method of getRSPidAndRsZknode, + */ + @Test + public void testRemoteGetRSPidAndRsZknode() { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HRegionServer regionServer = cluster.getRegionServer(0); + try { + HServerAddress rsAddress = regionServer.getHServerInfo() + .getServerAddress(); + HConnection connection = HConnectionManager.getConnection(TEST_UTIL + .getConfiguration()); + HServerAddress rsAdd = new HServerAddress(rsAddress); + HRegionInterface server = connection.getHRegionConnection(rsAdd); + String pidAndRsZknode = server.getRSPidAndRsZknode(); + LOG.info("get [pidAndRsZknode:" + pidAndRsZknode + "] from rs!"); + HBaseRPC.stopProxy(server); + String actualPidAndRsZknode = pidAndRsZknode; + String expectedPidAndRsZknode = regionServer.getRSPidAndRsZknode(); + Assert.assertEquals(expectedPidAndRsZknode, actualPidAndRsZknode); + } catch (IOException e) { + Assert.fail("Should not get exception " + e.getClass().getSimpleName() + + ": " + e); + } + } + + @Test + public void testdoMonitorLogic() { + Throwable t = null; + try { + RegionServerFailureDetection monitor = new RegionServerFailureDetection(); + monitor.start(); + } catch (IOException e) { + t = e; + } catch (InterruptedException e) { + t = e; + } catch (KeeperException e) { + t = e; + } + if (t != null) { + Assert.assertFalse("Should not get exception " + + t.getClass().getSimpleName() + ": " + t.getMessage(), true); + } + try { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HRegionServer regionServer = cluster.getRegionServer(0); + regionServer.doMonitorLogic(); + } catch (Throwable th) { + Assert.fail("Should not get exception " + th.getClass().getSimpleName() + + ": " + th); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java +++ src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -456,6 +456,11 @@ * Minimum percentage of free heap necessary for a successful cluster startup. */ public static final float HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD = 0.2f; + + /** monitor's znode*/ + public static final String MONITOR_ZK_NODE = "monitor"; + + public static final String REGION_MONITOR_IMPL = "hbase.regionserver.monitor"; private HConstants() { // Can't be instantiated with this ctor. Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.MonitorInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; @@ -567,6 +568,7 @@ while (!this.stopped) { if (tryReportForDuty()) break; } + doMonitorLogic(); long lastMsg = 0; List outboundMessages = new ArrayList(); // The main run loop. @@ -2789,6 +2791,77 @@ } + @Override + public String getRSPidAndRsZknode() { + String processName = java.lang.management.ManagementFactory + .getRuntimeMXBean().getName(); + long pid = Long.parseLong(processName.split("@")[0]); + String rsZkNode = ZKUtil.getNodeName(serverInfo); + LOG.info("call from monitor,pid:" + pid + ",rsZkNode:" + rsZkNode); + return pid + "@" + rsZkNode; + } + + /** + * after registration of RS,do Monitor Logic.if monitor for this machine has + * registed in zk,push pid to monitor + * + */ + public void doMonitorLogic() { + String machineName = this.getServerInfo().getHostname(); + List monitors; + String monitorBaseNode = ZKUtil.joinZNode(zooKeeper.baseZNode, + HConstants.MONITOR_ZK_NODE);// '/hbase/monitor' + try { + ZKUtil.createNodeIfNotExistsAndWatch(zooKeeper, monitorBaseNode, + "failure monitors".getBytes()); + String monitorZNode = ZKUtil.joinZNode(zooKeeper.baseZNode, + HConstants.MONITOR_ZK_NODE); + monitors = ZKUtil.listChildrenNoWatch(zooKeeper, monitorZNode); + if (monitors == null) { + throw new Exception(); + } + for (String monitor : monitors) { + if (monitor.split(":")[0].equals(machineName)) { + // monitor has started, push pid to monitor + int port = Integer.parseInt(monitor.split(":")[1]); + pushPidToMonitor(monitor.split(":")[0], port); + } + } + } catch (KeeperException e) { + LOG.error("ZK throws unExpected Exception", e); + } catch (Exception e) { + LOG.error("no /hbase/monitor node,this is UNExpectedException", e); + } + } + + private void pushPidToMonitor(String machineName, int port) { + String processName = java.lang.management.ManagementFactory + .getRuntimeMXBean().getName(); + long pid = Long.parseLong(processName.split("@")[0]); + try { + int maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1); + int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + MonitorInterface monitorServer = (MonitorInterface) HBaseRPC + .waitForProxy(MonitorInterface.class, + HBaseRPCProtocolVersion.versionID, new InetSocketAddress( + machineName, port), conf, maxRPCAttempts, rpcTimeout, + rpcTimeout); + String rsZkNode = ZKUtil.getNodeName(serverInfo); + boolean res = monitorServer.notifyProcessID(pid, rsZkNode); + LOG.info("push [pid:" + pid + ",rsZkNode:" + rsZkNode + " to monitor:[" + + machineName + ":" + port + "],sucessfull:" + res); + if (!res) + throw new IOException(); + HBaseRPC.stopProxy(monitorServer); + } catch (RemoteException e) { + LOG.error("RemoteException connecting to Monitor", e); + // Throw what the RemoteException was carrying. + } catch (IOException e) { + LOG.error("push pid failed,UNExpected Exception!", e); + } + } + /** * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine */ Index: src/main/java/org/apache/hadoop/hbase/monitor/RegionServerFailureDetection.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/monitor/RegionServerFailureDetection.java +++ src/main/java/org/apache/hadoop/hbase/monitor/RegionServerFailureDetection.java @@ -0,0 +1,497 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitor; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.MasterAddressTracker; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; +import org.apache.hadoop.hbase.ipc.HBaseServer; +import org.apache.hadoop.hbase.ipc.HMasterMonitorInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.MonitorInterface; +import org.apache.hadoop.hbase.ipc.ServerNotRunningException; +import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.DNS; +import org.apache.zookeeper.KeeperException; + +/** + * Within milliseconds of perception regionserver's Downtime + * + */ +public class RegionServerFailureDetection extends Thread implements + MonitorInterface { + + public static final Log LOG = LogFactory + .getLog(RegionServerFailureDetection.class); + // Remote HMaster + private HMasterMonitorInterface hbaseMaster; + // master address manager and watcher + private MasterAddressTracker masterAddressManager; + // zookeeper connection and watcher + private ZooKeeperWatcher zWatcher; + + // A sleeper that sleeps for msgInterval. + private final Sleeper sleeper; + + private final Configuration conf; + + private final int msgInterval; + + private final int rpcTimeout; + + private final String machineName; + + private final int port = 12020; + + private HBaseServer monitorRPCServer; + + private DetectionHearbeatChore chore; + + private final RSSet rsSet = new RSSet(); + + public RegionServerFailureDetection() throws IOException, + InterruptedException, KeeperException { + this.conf = HBaseConfiguration.create(); + + // Server to handle client requests + this.machineName = DNS.getDefaultHost(conf.get( + "hbase.regionserver.dns.interface", "default"), conf.get( + "hbase.regionserver.dns.nameserver", "default")); + this.msgInterval = conf.getInt("hbase.monitor.msginterval", 3 * 1000); + this.sleeper = new Sleeper(this.msgInterval, this); + this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + initializeZooKeeper(); + initializeMaster(); + startService(); + register(); + } + + private void initializeZooKeeper() throws IOException, InterruptedException { + // Open connection to zookeeper and set primary watcher + zWatcher = new ZooKeeperWatcher(conf, "RegionServerFailureDetection" + ":" + + machineName, this); + // Create the master address manager, register with zk, and start it. + // Then block until a master is available. No point in starting up if no + // master running. + this.masterAddressManager = new MasterAddressTracker(this.zWatcher, this); + this.masterAddressManager.start(); + blockAndCheckIfStopped(this.masterAddressManager); + } + + private void initializeMaster() { + HServerAddress masterAddress = null; + HMasterMonitorInterface master = null; + while (master == null) { + masterAddress = getMasterAddress(); + LOG.info("Attempting connect to Master server at " + masterAddress); + try { + // Do initial RPC setup. The final argument indicates that the + // RPC should retry indefinitely. + master = (HMasterMonitorInterface) HBaseRPC.waitForProxy( + HMasterMonitorInterface.class, HBaseRPCProtocolVersion.versionID, + masterAddress.getInetSocketAddress(), this.conf, -1, + this.rpcTimeout, this.rpcTimeout); + } catch (IOException e) { + e = e instanceof RemoteException ? ((RemoteException) e) + .unwrapRemoteException() : e; + if (e instanceof ServerNotRunningException) { + LOG.info("Master isn't available yet, retrying"); + } else { + LOG.warn("Unable to connect to master. Retrying. Error was:", e); + } + sleeper.sleep(); + } + } + LOG.info("Connected to master at " + masterAddress); + this.hbaseMaster = master; + + int period = conf.getInt("heartbeat.failure.detect.master", 3000); + chore = new DetectionHearbeatChore(this.hbaseMaster, period, this); + chore.setMachineName(machineName); + Threads.setDaemonThreadRunning(chore, "DetectionHearbeatChore"); + } + + /** + * Utilty method to wait indefinitely on a znode availability while checking + * if the region server is shut down + * + * @param tracker + * znode tracker to use + * @throws IOException + * any IO exception, plus if the RS is stopped + * @throws InterruptedException + */ + private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker) + throws IOException, InterruptedException { + while (tracker.blockUntilAvailable(this.msgInterval) == null) { + throw new IOException("Received the shutdown message while waiting."); + } + } + + private HServerAddress getMasterAddress() { + HServerAddress masterAddress = null; + while ((masterAddress = masterAddressManager.getMasterAddress()) == null) { + LOG.debug("No master found, retrying..."); + sleeper.sleep(); + } + return masterAddress; + } + + /** + * start rpc server called for rs + * + * @throws IOException + * @throws InterruptedException + */ + private void startService() throws IOException, InterruptedException { + int QOS_THRESHOLD = 10; + monitorRPCServer = HBaseRPC.getServer(this, + new Class[] { MonitorInterface.class }, machineName, port, conf + .getInt("hbase.monitor.handler.count", 2), conf.getInt( + "hbase.monitor.metahandler.count", 2), false, conf, QOS_THRESHOLD); + monitorRPCServer.start(); + } + + /** + * monitor register to zk '/hbase/monitor' + * + * @throws KeeperException + * @throws IOException + */ + private void register() throws KeeperException, IOException { + String monitorBaseNode = ZKUtil.joinZNode(zWatcher.baseZNode, + HConstants.MONITOR_ZK_NODE);// '/hbase/monitor' + ZKUtil.createNodeIfNotExistsAndWatch(zWatcher, monitorBaseNode, + "failure monitors".getBytes()); + String monitorNode = ZKUtil.joinZNode(monitorBaseNode, this.machineName + + ":" + this.port); + boolean res = ZKUtil.createEphemeralNodeAndWatch(zWatcher, monitorNode, "" + .getBytes()); + if (res) { + LOG.info("register zk sucessfully"); + } else { + throw new IOException( + "register monitor to zk failed, UNExpectedException"); + } + } + + /** + * + * close this znode of rs + * + * @param sessionid + * @throws Throwable + */ + private void reportRSShutDownToZookeeper(String rsZkNode) { + Throwable t = null; + try { + zWatcher.getZooKeeper().delete( + ZKUtil.joinZNode(zWatcher.rsZNode, rsZkNode), -1); + LOG.info("zknode with [regionServerName:" + rsZkNode + + "] has bean deleted"); + } catch (InterruptedException e) { + t = e; + } catch (KeeperException e) { + t = e; + } + if (t != null) { + LOG.warn("delete [zknode:" + rsZkNode + + "] throws Exception! Maybe,rsZkNode has bean deleted by zk", t); + } + } + + /** + * watch pid has existed whether or not.if existed return true,if not return + * false + * + * @param pid + * @return true of false + * @throws IOException + */ + private boolean checkStateWetherExsits(long pid, InetSocketAddress rsAdd) { + try { + BufferedReader br = new BufferedReader(new InputStreamReader(Runtime + .getRuntime().exec("ps -p " + pid).getInputStream())); + String line = br.readLine(); + while (line != null) { + if (line.contains(pid + "")) { + return true; + } + line = br.readLine(); + } + boolean pingOk = pingRS(rsAdd); + if (pingOk) { + return true; + } + return false; + } catch (IOException e) { + LOG.warn("run 'ps' command throws Exception", e); + return false; + } + } + + /** + * connect RegionServer to confirm RegionServer has been crushed. + * + * @param rsAdd + * @return + */ + private boolean pingRS(InetSocketAddress rsAdd) { + try { + Socket scoket = new Socket(); + scoket.connect(rsAdd); + scoket.close(); + return true; + } catch (IOException e) { + if ((e instanceof java.net.ConnectException) + && e.getMessage().contains("Connection refused")) { + LOG.warn("Expected Exception,[RegionServer:" + rsAdd + + "] has been crushed", e); + return false; + } else { + LOG.warn("UNExpected Exception", e); + return true; + } + } + } + + private void fetchProcessFromRegionServer() { + try { + List regionServers = ZKUtil.listChildrenNoWatch(zWatcher, + zWatcher.rsZNode); + if (regionServers == null) { + throw new Exception("regionServers == null"); + } + for (String rs : regionServers) { + if (rs.startsWith(machineName)) { + // rs has started ,but maybe rs's remote service has not been + // compeleted. + HServerAddress rsAddress = ZKUtil.getDataAsAddress(zWatcher, ZKUtil + .joinZNode(zWatcher.rsZNode, rs)); + HConnection connection = HConnectionManager.getConnection(conf); + HServerAddress rsAdd = new HServerAddress(rsAddress); + HRegionInterface server = connection.getHRegionConnection(rsAdd); + String pidAndRsZknode = server.getRSPidAndRsZknode(); + LOG.info("get [pidAndRsZknode:" + pidAndRsZknode + "] from rs!"); + String[] res = pidAndRsZknode.split("@"); + rsSet.add(new RegionServerProcess(Long.parseLong(res[0]), res[1])); + HBaseRPC.stopProxy(server); + } + } + } catch (Throwable e) { + if ((e instanceof ServerNotRunningException) + && e.getMessage().contains("Server is not running yet")) { + LOG + .warn( + "Expected Expection! rs has register ok,but remote service has not been compeleted", + e); + fetchProcessFromRegionServer(); + } else { + LOG.warn("UnExpected Expection! NEVER GOES HERE!", e); + } + } + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + while (true) { + if (System.currentTimeMillis() - startTime > 60000) { + LOG.info("Now this monitor is monitoring : " + rsSet); + startTime = System.currentTimeMillis(); + } + for (RegionServerProcess rsInstance : rsSet.getIterator()) { + boolean exsits = checkStateWetherExsits(rsInstance.pid, + rsInstance.rsAdd); + if (!exsits) { + // releaseHDFS(); + reportRSShutDownToZookeeper(rsInstance.zkNode); + rsSet.remove(rsInstance); + break; + } + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + LOG.warn("UnExpected Expection! Thread.sleep", e); + } + } + } + + /** + * @param args + * @throws InterruptedException + * @throws IOException + * @throws KeeperException + */ + public static void main(String[] args) throws IOException, + InterruptedException, KeeperException { + RegionServerFailureDetection monitor = new RegionServerFailureDetection(); + monitor.fetchProcessFromRegionServer(); + monitor.start(); + } + + @Override + public void abort(String why, Throwable e) { + LOG.info("aborted."); + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void stop(String why) { + LOG.info("because " + why + " ,stoped..."); + // Threads.shutdown(chore); + monitorRPCServer.stop(); + } + + /** + * rs push pid to monitor with rpc + * + * @see org.apache.hadoop.hbase.ipc.MonitorInterface#pushPidToMonitor(long) + */ + @Override + public boolean notifyProcessID(long pid, String rsZkNode) { + LOG.info("received pid:" + pid + ",rs's znode:" + rsZkNode); + rsSet.add(new RegionServerProcess(pid, rsZkNode)); + return true; + } + + @Override + public long getProtocolVersion(final String protocol, final long clientVersion) + throws IOException { + if (protocol.equals(MonitorInterface.class.getName())) { + return HBaseRPCProtocolVersion.versionID; + } + throw new IOException("Unknown protocol to name node: " + protocol); + } + + public RSSet getRsSet() { + return rsSet; + } + + public class RSSet { + private final ConcurrentHashMap map = new ConcurrentHashMap(); + + public void add(RegionServerProcess rSInstance) { + this.map.put(rSInstance.rsAdd, rSInstance); + } + + public void remove(RegionServerProcess rSInstance) { + this.map.remove(rSInstance.rsAdd, rSInstance); + } + + public Iterable getIterator() { + return map.values(); + } + + @Override + public String toString() { + Iterator> iterator = map + .entrySet().iterator(); + if (!iterator.hasNext()) + return "{}"; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append('{'); + do { + Map.Entry entry = iterator + .next(); + Object obj = entry.getValue(); + stringBuilder.append(obj != this ? obj : "(this Map)"); + if (!iterator.hasNext()) { + return stringBuilder.append('}').toString(); + } + stringBuilder.append(", "); + } while (true); + } + + public int size() { + return map.size(); + } + } + + public class RegionServerProcess { + private final Long pid; + private final String zkNode; + public final InetSocketAddress rsAdd; + + public RegionServerProcess(Long pid, String zkNode) { + this.pid = pid; + this.zkNode = zkNode; + String[] strArr = zkNode.split(","); + rsAdd = new InetSocketAddress(strArr[0], Integer.parseInt(strArr[1])); + } + + public Long getPid() { + return pid; + } + + public String getZkNode() { + return zkNode; + } + + public InetSocketAddress getRsAdd() { + return rsAdd; + } + + @Override + public String toString() { + return "[pid:" + pid + ",rsZknode:" + zkNode + "]"; + } + + @Override + public boolean equals(Object obj) { + RegionServerProcess otherObj = (RegionServerProcess) obj; + if (this.pid.equals(otherObj.pid) && this.rsAdd.equals(otherObj.rsAdd) + && this.zkNode.equals(otherObj.zkNode)) { + return true; + } else { + return false; + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/monitor/DetectionHearbeatChore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/monitor/DetectionHearbeatChore.java +++ src/main/java/org/apache/hadoop/hbase/monitor/DetectionHearbeatChore.java @@ -0,0 +1,59 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.monitor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.ipc.HMasterMonitorInterface; + +/** + * RegionServerFailure detection hearbeat to master + * + */ +public class DetectionHearbeatChore extends Chore { + private static final Log LOG = LogFactory + .getLog(DetectionHearbeatChore.class); + /** local machineName */ + private String machineName; + + // Remote HMaster + private final HMasterMonitorInterface hbaseMaster; + + public DetectionHearbeatChore(HMasterMonitorInterface master, int p, + Stoppable stopper) { + super("DetectionHearbeatChore", p, stopper); + this.hbaseMaster = master; + } + + public void setMachineName(String machineName) { + this.machineName = machineName; + } + + @Override + protected void chore() { + try { + this.hbaseMaster.heartbeatMonitor(machineName); + } catch (Throwable t) { + LOG.warn("HBase Master has crushed!", t); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -25,8 +25,10 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HMasterMonitorInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.master.handler.DeleteTableHandler; @@ -108,7 +111,7 @@ * @see Watcher */ public class HMaster extends Thread -implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { +implements HMasterInterface, HMasterRegionInterface, HMasterMonitorInterface, MasterServices, Server { private static final Log LOG = LogFactory.getLog(HMaster.class.getName()); // MASTER is name of the webapp and the attribute name used stuffing this @@ -163,6 +166,9 @@ private Thread balancerChore; // If 'true', the balancer is 'on'. If 'false', the balancer will not run. private volatile boolean balanceSwitch = true; + + private Thread monitorHeartbeatChore; + protected HashMap heartbeatMonitor = new HashMap(); private Thread catalogJanitorChore; private LogCleaner logCleaner; @@ -399,6 +405,7 @@ // Start balancer and meta catalog janitor after meta and regions have // been assigned. this.balancerChore = getAndStartBalancerChore(this); + this.monitorHeartbeatChore = getAndStartMonitorHeartbeatChore(this); this.catalogJanitorChore = Threads.setDaemonThreadRunning(new CatalogJanitor(this, this)); @@ -584,6 +591,34 @@ }; return Threads.setDaemonThreadRunning(chore); } + + private static Thread getAndStartMonitorHeartbeatChore(final HMaster master) { + String name = master.getServerName() + "-MonitorHeartbeat"; + int clientHeartbeat = master.getConfiguration().getInt( + "heartbeat.failure.detect.master", 3000); + int period = Math.max(300000, clientHeartbeat); + final long heartbeat = period == clientHeartbeat ? period * 2 : period; + // Start up the load balancer chore + Chore chore = new Chore(name, period, master) { + @Override + protected void chore() { + synchronized (master.heartbeatMonitor) { + List needToDelete = new ArrayList(); + for (Entry entry : master.heartbeatMonitor.entrySet()) { + long time = System.currentTimeMillis() - entry.getValue(); + if (time > heartbeat) { + needToDelete.add(entry.getKey()); + } + } + for (String machine : needToDelete) { + master.heartbeatMonitor.remove(machine); + LOG.error("the monitor in " + machine + " may be crashed."); + } + } + } + }; + return Threads.setDaemonThreadRunning(chore); + } private void stopChores() { if (this.balancerChore != null) { @@ -639,6 +674,13 @@ this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions)); } + @Override + public void heartbeatMonitor(String machine) { + synchronized (heartbeatMonitor) { + heartbeatMonitor.put(machine, System.currentTimeMillis()); + } + } + /** * Override if you'd add messages to return to regionserver hsi * or to send an exception. Index: src/main/java/org/apache/hadoop/hbase/ipc/MonitorInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/MonitorInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/MonitorInterface.java @@ -0,0 +1,40 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Stoppable; + +/** + * regionserver notify monitor pid. + * + */ +public interface MonitorInterface extends HBaseRPCProtocolVersion, Stoppable, + Abortable { + + /** + * called by regionserver,notify pid + * + * @param pid + * @param rsZkNode + * @return + */ + public boolean notifyProcessID(long pid, String rsZkNode); +} Index: src/main/java/org/apache/hadoop/hbase/ipc/HMasterMonitorInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HMasterMonitorInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HMasterMonitorInterface.java @@ -0,0 +1,30 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +/** + * + * monitor process heartbeat to master + * + */ +public interface HMasterMonitorInterface extends HBaseRPCProtocolVersion { + + public void heartbeatMonitor(String machine); +} Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.ConnectException; import java.util.List; -import java.util.NavigableSet; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HRegionInfo; @@ -433,4 +433,11 @@ /* TODO: Move into place above master operations after deprecation cycle */ public boolean bulkLoadHFiles(List> familyPaths, byte[] regionName) throws IOException; + + /** + * called by monitor. monitor can get RS pid whth this method + * + * @return pid@znode + */ + public String getRSPidAndRsZknode(); }