diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index ec41e22..8de2314 100644 --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -1606,41 +1606,58 @@ public class AssignmentManager extends ZooKeeperListener { RegionPlan getRegionPlan(final RegionState state, final ServerName serverToExclude, final boolean forceNewPlan) { // Pickup existing plan or make a new one - String encodedName = state.getRegion().getEncodedName(); - List servers = this.serverManager.getOnlineServersList(); - // The remove below hinges on the fact that the call to - // serverManager.getOnlineServersList() returns a copy + final String encodedName = state.getRegion().getEncodedName(); + final List servers = this.serverManager.getOnlineServersList(); + final List drainingServers = this.serverManager.getDrainingServersList(); + if (serverToExclude != null) servers.remove(serverToExclude); + + // Loop through the draining server list and remove them from the server + // list. + if (!drainingServers.isEmpty()) { + for (final ServerName server: drainingServers) { + LOG.debug("Removing draining server: " + server + + " from eligible server pool."); + servers.remove(server); + } + } + if (servers.isEmpty()) return null; + RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, balancer.randomAssignment(servers)); boolean newPlan = false; RegionPlan existingPlan = null; + synchronized (this.regionPlans) { existingPlan = this.regionPlans.get(encodedName); + if (existingPlan != null && existingPlan.getDestination() != null) { LOG.debug("Found an existing plan for " + state.getRegion().getRegionNameAsString() + " destination server is + " + existingPlan.getDestination().toString()); } - if (forceNewPlan || existingPlan == null - || existingPlan.getDestination() == null - || existingPlan.getDestination().equals(serverToExclude)) { + + if (forceNewPlan + || existingPlan == null + || existingPlan.getDestination() == null + || drainingServers.contains(existingPlan.getDestination())) { newPlan = true; this.regionPlans.put(encodedName, randomPlan); } } + if (newPlan) { debugLog(state.getRegion(), "No previous transition plan was found (or we are ignoring " + "an existing plan) for " + state.getRegion().getRegionNameAsString() + " so generated a random one; " + randomPlan + "; " + serverManager.countOfRegionServers() + - " (online=" + serverManager.getOnlineServers().size() + - ", exclude=" + serverToExclude + ") available servers"); + " (online=" + serverManager.getOnlineServers().size() + + ", exclude=" + drainingServers.size() + ") available servers"); return randomPlan; } debugLog(state.getRegion(), "Using pre-existing plan for region " + - state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan); + state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan); return existingPlan; } diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 56f7e33..0d0e4c5 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; +import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -131,6 +132,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { private ActiveMasterManager activeMasterManager; // Region server tracker private RegionServerTracker regionServerTracker; + // Draining region server tracker + private DrainingServerTracker drainingServerTracker; // RPC server for the HMaster private final RpcServer rpcServer; @@ -370,6 +373,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.serverManager); this.regionServerTracker.start(); + this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, + this.serverManager); + this.drainingServerTracker.start(); + // Set the cluster as up. If new RSs, they'll be waiting on this before // going ahead with their startup. this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); diff --git src/main/java/org/apache/hadoop/hbase/master/ServerManager.java src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 41a942e..39c04bf 100644 --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -80,6 +80,13 @@ public class ServerManager { private final Map serverConnections = new HashMap(); + /** + * List of region servers that should not get any more new + * regions. + */ + private final ArrayList drainingServers = + new ArrayList(); + private final Server master; private final MasterServices services; private final HConnection connection; @@ -367,6 +374,43 @@ public class ServerManager { carryingRoot + ", meta=" + carryingMeta); } + /* + * Remove the server from the drain list. + */ + public boolean removeServerFromDrainList(final ServerName sn) { + // Warn if the server (sn) is not online. ServerName is of the form: + // , , + + if (!this.isServerOnline(sn)) { + LOG.warn("Server " + sn + " is not currently online. " + + "Removing from draining list anyway, as requested."); + } + // Remove the server from the draining servers lists. + return this.drainingServers.remove(sn); + } + + /* + * Add the server to the drain list. + */ + public boolean addServerToDrainList(final ServerName sn) { + // Warn if the server (sn) is not online. ServerName is of the form: + // , , + + if (!this.isServerOnline(sn)) { + LOG.warn("Server " + sn + " is not currently online. " + + "Ignoring request to add it to draining list."); + return false; + } + // Add the server to the draining servers lists, if it's not already in + // it. + if (this.drainingServers.contains(sn)) { + LOG.warn("Server " + sn + " is already in the draining server list." + + "Ignoring request to add it again."); + return false; + } + return this.drainingServers.add(sn); + } + // RPC methods to region servers /** @@ -489,6 +533,13 @@ public class ServerManager { return new ArrayList(this.onlineServers.keySet()); } + /** + * @return A copy of the internal list of draining servers. + */ + public List getDrainingServersList() { + return new ArrayList(this.drainingServers); + } + public boolean isServerOnline(ServerName serverName) { return onlineServers.containsKey(serverName); } diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/DrainingServerTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/DrainingServerTracker.java new file mode 100644 index 0000000..e90edf9 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/zookeeper/DrainingServerTracker.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.zookeeper.KeeperException; + +/** + * Tracks the list of draining region servers via ZK. + * + *

This class is responsible for watching for changes to the draining + * servers list. It handles adds/deletes in the draining RS list and + * watches each node. + * + *

If an RS gets deleted from draining list, we call + * {@link ServerManager#removeServerFromDrainList(ServerName)} + * + *

If an RS gets added to the draining list, we add a watcher to it and call + * {@link ServerManager#addServerToDrainList(ServerName)} + * + */ +public class DrainingServerTracker extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(DrainingServerTracker.class); + + private ServerManager serverManager; + private NavigableSet drainingServers = new TreeSet(); + private Abortable abortable; + + public DrainingServerTracker(ZooKeeperWatcher watcher, + Abortable abortable, ServerManager serverManager) { + super(watcher); + this.abortable = abortable; + this.serverManager = serverManager; + } + + /** + * Starts the tracking of draining RegionServers. + * + *

All Draining RSs will be tracked after this method is called. + * + * @throws KeeperException + */ + public void start() throws KeeperException, IOException { + watcher.registerListener(this); + List servers = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode); + add(servers); + } + + private void add(final List servers) throws IOException { + synchronized(this.drainingServers) { + this.drainingServers.clear(); + for (String n: servers) { + final ServerName sn = new ServerName(ZKUtil.getNodeName(n)); + this.drainingServers.add(sn); + this.serverManager.addServerToDrainList(sn); + LOG.info("Draining RS node created, adding to list [" + + sn + "]"); + + } + } + } + + private void remove(final ServerName sn) { + synchronized(this.drainingServers) { + this.drainingServers.remove(sn); + this.serverManager.removeServerFromDrainList(sn); + } + } + + @Override + public void nodeDeleted(final String path) { + if(path.startsWith(watcher.drainingZNode)) { + final ServerName sn = new ServerName(ZKUtil.getNodeName(path)); + LOG.info("Draining RS node deleted, removing from list [" + + sn + "]"); + remove(sn); + } + } + + @Override + public void nodeChildrenChanged(final String path) { + if(path.equals(watcher.drainingZNode)) { + try { + final List newNodes = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode); + add(newNodes); + } catch (KeeperException e) { + abortable.abort("Unexpected zk exception getting RS nodes", e); + } catch (IOException e) { + abortable.abort("Unexpected zk exception getting RS nodes", e); + } + } + } + +} diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index b20d371..a75cf87 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -73,6 +73,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { public String rootServerZNode; // znode containing ephemeral nodes of the regionservers public String rsZNode; + // znode containing ephemeral nodes of the draining regionservers + public String drainingZNode; // znode of currently active master public String masterAddressZNode; // znode containing the current cluster state @@ -90,13 +92,13 @@ public class ZooKeeperWatcher implements Watcher, Abortable { private final Exception constructorCaller; - + /** * Instantiate a ZooKeeper connection and watcher. * @param descriptor Descriptive string that is added to zookeeper sessionid * and used as identifier for this instance. - * @throws IOException - * @throws ZooKeeperConnectionException + * @throws IOException + * @throws ZooKeeperConnectionException */ public ZooKeeperWatcher(Configuration conf, String descriptor, Abortable abortable) throws ZooKeeperConnectionException, IOException { @@ -138,6 +140,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { ZKUtil.createAndFailSilent(this, baseZNode); ZKUtil.createAndFailSilent(this, assignmentZNode); ZKUtil.createAndFailSilent(this, rsZNode); + ZKUtil.createAndFailSilent(this, drainingZNode); ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); } catch (KeeperException e) { @@ -175,6 +178,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { conf.get("zookeeper.znode.rootserver", "root-region-server")); rsZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.rs", "rs")); + drainingZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.draining.rs", "draining")); masterAddressZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.master", "master")); clusterStateZNode = ZKUtil.joinZNode(baseZNode, diff --git src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java new file mode 100644 index 0000000..9e175fe --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + + +import java.io.IOException; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the draining servers feature. + * @see HBASE-4298 + */ +public class TestDrainingServer { + private static final Log LOG = LogFactory.getLog(TestDrainingServer.class); + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + private static final byte [] TABLENAME = Bytes.toBytes("t"); + private static final byte [] FAMILY = Bytes.toBytes("f"); + private static final int COUNT_OF_REGIONS = HBaseTestingUtility.KEYS.length; + + /** + * Spin up a cluster with a bunch of regions on it. + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(5); + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + htd.addFamily(new HColumnDescriptor(FAMILY)); + TEST_UTIL.createMultiRegionsInMeta(TEST_UTIL.getConfiguration(), htd, + HBaseTestingUtility.KEYS); + // Make a mark for the table in the filesystem. + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + FSUtils.createTableDescriptor(fs, FSUtils.getRootDir(TEST_UTIL.getConfiguration()), htd); + // Assign out the regions we just created. + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + admin.disableTable(TABLENAME); + admin.enableTable(TABLENAME); + // Assert that every regionserver has some regions on it. + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = cluster.getRegionServer(i); + Assert.assertFalse(hrs.getOnlineRegions().isEmpty()); + } + } + + private static HRegionServer setDrainingServer(final HRegionServer hrs) + throws KeeperException { + LOG.info("Making " + hrs.getServerName() + " the draining server; " + + "it has " + hrs.getNumberOfOnlineRegions() + " online regions"); + ZooKeeperWatcher zkw = hrs.getZooKeeper(); + String hrsDrainingZnode = + ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString()); + ZKUtil.createWithParents(zkw, hrsDrainingZnode); + return hrs; + } + + private static HRegionServer unsetDrainingServer(final HRegionServer hrs) + throws KeeperException { + ZooKeeperWatcher zkw = hrs.getZooKeeper(); + String hrsDrainingZnode = + ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString()); + ZKUtil.deleteNode(zkw, hrsDrainingZnode); + return hrs; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Test adding server to draining servers and then move regions off it. + * Make sure that no regions are moved back to the draining server. + * @throws IOException + * @throws KeeperException + */ + @Test // (timeout=30000) + public void testDrainingServerOffloading() + throws IOException, KeeperException { + // I need master in the below. + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + HRegionInfo hriToMoveBack = null; + // Set first server as draining server. + HRegionServer drainingServer = + setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)); + try { + final int regionsOnDrainingServer = + drainingServer.getNumberOfOnlineRegions(); + Assert.assertTrue(regionsOnDrainingServer > 0); + List hris = drainingServer.getOnlineRegions(); + for (HRegionInfo hri : hris) { + // Pass null and AssignmentManager will chose a random server BUT it + // should exclude draining servers. + master.move(hri.getEncodedNameAsBytes(), null); + // Save off region to move back. + hriToMoveBack = hri; + } + // Wait for regions to come back on line again. + waitForAllRegionsOnline(); + Assert.assertEquals(0, drainingServer.getNumberOfOnlineRegions()); + } finally { + unsetDrainingServer(drainingServer); + } + // Now we've unset the draining server, we should be able to move a region + // to what was the draining server. + master.move(hriToMoveBack.getEncodedNameAsBytes(), + Bytes.toBytes(drainingServer.getServerName().toString())); + // Wait for regions to come back on line again. + waitForAllRegionsOnline(); + Assert.assertEquals(1, drainingServer.getNumberOfOnlineRegions()); + } + + /** + * Test that draining servers are ignored even after killing regionserver(s). + * Verify that the draining server is not given any of the dead servers regions. + * @throws KeeperException + * @throws IOException + */ + @Test (timeout=30000) + public void testDrainingServerWithAbort() throws KeeperException, IOException { + // Add first server to draining servers up in zk. + HRegionServer drainingServer = + setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)); + try { + final int regionsOnDrainingServer = + drainingServer.getNumberOfOnlineRegions(); + Assert.assertTrue(regionsOnDrainingServer > 0); + // Kill a few regionservers. + int aborted = 0; + final int numberToAbort = 2; + for (int i = 1; i < TEST_UTIL.getMiniHBaseCluster().countServedRegions(); i++) { + HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i); + if (hrs.getServerName().equals(drainingServer.getServerName())) continue; + hrs.abort("Aborting"); + aborted++; + if (aborted >= numberToAbort) break; + } + // Wait for regions to come back on line again. + waitForAllRegionsOnline(); + // Assert the draining server still has the same number of regions. + Assert.assertEquals(regionsOnDrainingServer, + drainingServer.getNumberOfOnlineRegions()); + } finally { + unsetDrainingServer(drainingServer); + } + } + + private void waitForAllRegionsOnline() { + while (TEST_UTIL.getMiniHBaseCluster().getMaster(). + getAssignmentManager().isRegionsInTransition()) { + Threads.sleep(10); + } + // Wait for regions to come back on line again. + while (!isAllRegionsOnline()) { + Threads.sleep(10); + } + } + + private boolean isAllRegionsOnline() { + return TEST_UTIL.getMiniHBaseCluster().countServedRegions() == + (COUNT_OF_REGIONS + 2 /*catalog regions*/); + } +} \ No newline at end of file