diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 44846c8..abb9f9b 100644 --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -1606,41 +1606,58 @@ public class AssignmentManager extends ZooKeeperListener { RegionPlan getRegionPlan(final RegionState state, final ServerName serverToExclude, final boolean forceNewPlan) { // Pickup existing plan or make a new one - String encodedName = state.getRegion().getEncodedName(); - List servers = this.serverManager.getOnlineServersList(); - // The remove below hinges on the fact that the call to - // serverManager.getOnlineServersList() returns a copy - if (serverToExclude != null) servers.remove(serverToExclude); + final String encodedName = state.getRegion().getEncodedName(); + final List servers = this.serverManager.getOnlineServersList(); + final List drainingServers = this.serverManager.getDrainingServersList(); + + if (serverToExclude != null) drainingServers.add(serverToExclude); + + // Loop through the draining server list and remove them from the server + // list. + if (!drainingServers.isEmpty()) { + for (final ServerName server: drainingServers) { + LOG.debug("Removing draining server: " + server + + " from eligible server pool."); + servers.remove(server); + } + } + if (servers.isEmpty()) return null; + RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, balancer.randomAssignment(servers)); boolean newPlan = false; RegionPlan existingPlan = null; + synchronized (this.regionPlans) { existingPlan = this.regionPlans.get(encodedName); + if (existingPlan != null && existingPlan.getDestination() != null) { LOG.debug("Found an existing plan for " + state.getRegion().getRegionNameAsString() + " destination server is + " + existingPlan.getDestination().toString()); } - if (forceNewPlan || existingPlan == null - || existingPlan.getDestination() == null - || existingPlan.getDestination().equals(serverToExclude)) { + + if (forceNewPlan + || existingPlan == null + || existingPlan.getDestination() == null + || drainingServers.contains(existingPlan.getDestination())) { newPlan = true; this.regionPlans.put(encodedName, randomPlan); } } + if (newPlan) { debugLog(state.getRegion(), "No previous transition plan was found (or we are ignoring " + "an existing plan) for " + state.getRegion().getRegionNameAsString() + " so generated a random one; " + randomPlan + "; " + serverManager.countOfRegionServers() + - " (online=" + serverManager.getOnlineServers().size() + - ", exclude=" + serverToExclude + ") available servers"); + " (online=" + serverManager.getOnlineServers().size() + + ", exclude=" + drainingServers.size() + ") available servers"); return randomPlan; } debugLog(state.getRegion(), "Using pre-existing plan for region " + - state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan); + state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan); return existingPlan; } diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 104e9c3..229e14c 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; +import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -131,6 +132,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { private ActiveMasterManager activeMasterManager; // Region server tracker private RegionServerTracker regionServerTracker; + // Draining region server tracker + private DrainingServerTracker drainingServerTracker; // RPC server for the HMaster private final RpcServer rpcServer; @@ -370,6 +373,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.serverManager); this.regionServerTracker.start(); + this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, + this.serverManager); + this.drainingServerTracker.start(); + // Set the cluster as up. If new RSs, they'll be waiting on this before // going ahead with their startup. this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); diff --git src/main/java/org/apache/hadoop/hbase/master/ServerManager.java src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 41a942e..5397c65 100644 --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -80,6 +80,13 @@ public class ServerManager { private final Map serverConnections = new HashMap(); + /** + * List of region servers that should not get any more new + * regions. + */ + private final ArrayList drainingServers = + new ArrayList(); + private final Server master; private final MasterServices services; private final HConnection connection; @@ -367,6 +374,43 @@ public class ServerManager { carryingRoot + ", meta=" + carryingMeta); } + /* + * Remove the server from the drain list. + */ + public synchronized boolean removeServerFromDrainList(final ServerName sn) { + // Warn if the server (sn) is not online. ServerName is of the form: + // , , + + if (!this.isServerOnline(sn)) { + LOG.warn("Server " + sn + " is not currently online. " + + "Removing from draining list anyway, as requested."); + } + // Remove the server from the draining servers lists. + return this.drainingServers.remove(sn); + } + + /* + * Add the server to the drain list. + */ + public synchronized boolean addServerToDrainList(final ServerName sn) { + // Warn if the server (sn) is not online. ServerName is of the form: + // , , + + if (!this.isServerOnline(sn)) { + LOG.warn("Server " + sn + " is not currently online. " + + "Ignoring request to add it to draining list."); + return false; + } + // Add the server to the draining servers lists, if it's not already in + // it. + if (this.drainingServers.contains(sn)) { + LOG.warn("Server " + sn + " is already in the draining server list." + + "Ignoring request to add it again."); + return false; + } + return this.drainingServers.add(sn); + } + // RPC methods to region servers /** @@ -489,6 +533,13 @@ public class ServerManager { return new ArrayList(this.onlineServers.keySet()); } + /** + * @return A copy of the internal list of draining servers. + */ + public List getDrainingServersList() { + return new ArrayList(this.drainingServers); + } + public boolean isServerOnline(ServerName serverName) { return onlineServers.containsKey(serverName); } diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/DrainingServerTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/DrainingServerTracker.java new file mode 100644 index 0000000..e30452e --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/zookeeper/DrainingServerTracker.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.zookeeper.KeeperException; + +/** + * Tracks the list of draining region servers via ZK. + * + *

This class is responsible for watching for changes to the draining + * servers list. It handles adds/deletes in the draining RS list and + * watches each node. + * + *

If an RS gets deleted from draining list, we call + * {@link ServerManager#removeServerFromDrainList(ServerName)} + * + *

If an RS gets added to the draining list, we add a watcher to it and call + * {@link ServerManager#addServerToDrainList(ServerName)} + * + */ +public class DrainingServerTracker extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(DrainingServerTracker.class); + + private ServerManager serverManager; + private NavigableSet drainingServers = new TreeSet(); + private Abortable abortable; + + public DrainingServerTracker(ZooKeeperWatcher watcher, + Abortable abortable, ServerManager serverManager) { + super(watcher); + this.abortable = abortable; + this.serverManager = serverManager; + } + + /** + * Starts the tracking of draining RegionServers. + * + *

All Draining RSs will be tracked after this method is called. + * + * @throws KeeperException + */ + public void start() throws KeeperException, IOException { + watcher.registerListener(this); + List servers = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode); + add(servers); + } + + private void add(final List servers) throws IOException { + synchronized(this.drainingServers) { + this.drainingServers.clear(); + for (String n: servers) { + final ServerName sn = new ServerName(ZKUtil.getNodeName(n)); + this.drainingServers.add(sn); + this.serverManager.addServerToDrainList(sn); + } + } + } + + private void remove(final ServerName sn) { + synchronized(this.drainingServers) { + this.drainingServers.remove(sn); + this.serverManager.removeServerFromDrainList(sn); + } + } + + @Override + public void nodeDeleted(final String path) { + if(path.startsWith(watcher.drainingZNode)) { + final ServerName sn = new ServerName(ZKUtil.getNodeName(path)); + LOG.info("Draining RS ephemeral node deleted, removing from list [" + + sn + "]"); + remove(sn); + } + } + + @Override + public void nodeChildrenChanged(final String path) { + if(path.equals(watcher.drainingZNode)) { + try { + final List newNodes = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.drainingZNode); + add(newNodes); + } catch (KeeperException e) { + abortable.abort("Unexpected zk exception getting RS nodes", e); + } catch (IOException e) { + abortable.abort("Unexpected zk exception getting RS nodes", e); + } + } + } + +} diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index b20d371..a75cf87 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -73,6 +73,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { public String rootServerZNode; // znode containing ephemeral nodes of the regionservers public String rsZNode; + // znode containing ephemeral nodes of the draining regionservers + public String drainingZNode; // znode of currently active master public String masterAddressZNode; // znode containing the current cluster state @@ -90,13 +92,13 @@ public class ZooKeeperWatcher implements Watcher, Abortable { private final Exception constructorCaller; - + /** * Instantiate a ZooKeeper connection and watcher. * @param descriptor Descriptive string that is added to zookeeper sessionid * and used as identifier for this instance. - * @throws IOException - * @throws ZooKeeperConnectionException + * @throws IOException + * @throws ZooKeeperConnectionException */ public ZooKeeperWatcher(Configuration conf, String descriptor, Abortable abortable) throws ZooKeeperConnectionException, IOException { @@ -138,6 +140,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { ZKUtil.createAndFailSilent(this, baseZNode); ZKUtil.createAndFailSilent(this, assignmentZNode); ZKUtil.createAndFailSilent(this, rsZNode); + ZKUtil.createAndFailSilent(this, drainingZNode); ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); } catch (KeeperException e) { @@ -175,6 +178,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { conf.get("zookeeper.znode.rootserver", "root-region-server")); rsZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.rs", "rs")); + drainingZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.draining.rs", "draining")); masterAddressZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.master", "master")); clusterStateZNode = ZKUtil.joinZNode(baseZNode,