diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 369fcfbec6..8354ffd0a3 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -616,6 +616,8 @@ message ListDecommissionedRegionServersResponse { message DecommissionRegionServersRequest { repeated ServerName server_name = 1; required bool offload = 2; + optional bool kill_after_offload = 3 [default = false]; + optional bool replace_decommission_list = 4 [default = false]; } message DecommissionRegionServersResponse { diff --git a/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto b/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto index 383388b294..3647ee3535 100644 --- a/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto +++ b/hbase-protocol-shaded/src/main/protobuf/ZooKeeper.proto @@ -107,3 +107,11 @@ message DeprecatedTableState { message SwitchState { optional bool enabled = 1; } + +/** + * State of the decom node. + */ +message DecommissionState { + optional bool offload = 1; + optional bool killAfterOffload = 2; +} \ No newline at end of file diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index 20eb32700b..d0a3752408 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -635,7 +635,7 @@ public class RSGroupAdminServer implements RSGroupAdmin { private void checkForDeadOrOnlineServers(Set
servers) throws ConstraintException { // This uglyness is because we only have Address, not ServerName. Set
onlineServers = new HashSet<>(); - List drainingServers = master.getServerManager().getDrainingServersList(); + List drainingServers = master.getDecommissionManager().getDrainingServersList(); for (ServerName server : master.getServerManager().getOnlineServers().keySet()) { // Only online but not decommissioned servers are really online if (!drainingServers.contains(server)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DecommissionManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DecommissionManager.java new file mode 100644 index 0000000000..cf0be0d2c5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DecommissionManager.java @@ -0,0 +1,547 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + + +/** + * Manages decommission requests of region servers. + */ +@InterfaceAudience.Private +public class DecommissionManager { + private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager.class); + private final MasterServices master; + private final String parentZNode; + + private final NavigableMap drainingServersFromZk = new TreeMap<>(); + /** List of region servers that should not get any more new regions. */ + private final Set drainingServers = new HashSet<>(); + /** A subset of drainingServers that is offloading; true means kill after offloading. */ + private final NavigableMap offloadingServers = new TreeMap<>(); + + /** Relies on servermanager from master services in ctor. */ + public DecommissionManager(MasterServices master) { + this.master = master; + if (master.getZooKeeper() != null) { + this.parentZNode = this.master.getZooKeeper().getZNodePaths().drainingZNode; + // We don't need to watch znodes; we don't expect anyone but us to change them. + // If they do, these changes will not be reflected until next startup. + try { + initInternal(ZKUtil.getChildData(master.getZooKeeper(), parentZNode)); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException("Failed to initialize DecommissionManager"); + } + } else { + parentZNode = null; // Test path. + } + master.getServerManager().registerListener(new ServerListener() { + @Override + public void serverAdded(ServerName sn) { + boolean offload = false; + synchronized (drainingServersFromZk) { + // This is necessary when the servers report after master init. The below add.. code + // will not find an online server when adding to serverManager, so the latter will + // not remember the decom state. When the server dials it, we will remind it again. + Integer ds = drainingServersFromZk.remove(sn); + if (ds != null) { + offload = ZkStateBits.isOffload(ds); + try { + addServerToDrainList(sn, offload, ZkStateBits.isKillOnOffload(ds), false); + } catch (HBaseIOException e) { + throw new AssertionError("Unexpected", e); // Only when creating znode. + } + } + } + + if (offload) { + try { + tryOffloadRegions(Lists.newArrayList(sn)); + } catch (HBaseIOException e) { + // Ignore, already logged in tryOffload. + } + } + } + + @Override + public void serverRemoved(ServerName serverName) { + // Dead server is no longer decommissioning. + removeServerFromDrainList(serverName); + } + }); + } + + /** Checks server report and performs any decommissioning actions necessary. */ + public void checkServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException { + if (sl == null || !sl.getRegionMetrics().isEmpty()) { + return; + } + Boolean isKillOnEmpty; + synchronized (drainingServers) { + isKillOnEmpty = offloadingServers.get(sn); + } + if (isKillOnEmpty != null && isKillOnEmpty) { + String message = "Server " + sn + " has been decommissioned; killing it"; + LOG.info(message); + throw new YouAreDeadException(message); + } + } + + /** @returns the copy of the draining servers list. */ + public List getDrainingServersList() { + synchronized (drainingServers) { + return new ArrayList<>(this.drainingServers); + } + } + + /** + * Reports to the decom manager that a region has been opened. Since there's no central + * coordination, RIT may assign to a decommissioned server; see comments elsewhere. + */ + public void reportAfterRegionOpened( + RegionInfo regionInfo, ServerName serverName) throws HBaseIOException { + Boolean killAfterOffload = null; + synchronized (drainingServers) { + killAfterOffload = this.offloadingServers.get(serverName); + if (killAfterOffload == null) { + return; // Not offloading - vast majority of the cases. + } + } + + // The decommissioning RS has opened a region. + List destServers = master.getServerManager().createDestinationServersList(); + try { + tryMoveRegion(regionInfo, serverName, destServers); + } catch (HBaseIOException ex) { + // We cannot remove this region for some reason. If we wanted to kill this RS + // at the end of the offloading, kill it now by failing the call. + if (killAfterOffload) { + throw ex; + } + LOG.info("Ignoring the offload error", ex); + } + } + + /** + * Decommissions region servers based on master RPC request. + * @param servers Server list. May contain hostnames without start codes and/or ports. + * @param offload Whether to offload the regions. + * @param isReplace Whether to replace the current decom list; may result in some recommissions. + * @param killAfterOffload Whether to force RS to die when they are empty (only if offloading). + */ + public void decommissionRegionServers(final List servers, final boolean offload, + final boolean isReplace, final boolean killAfterOffload) throws IOException { + List decomList = servers, recomList = null; + if (isReplace) { + // Note: replace is the subject to the same races between multiple requests that + // a regular decom/recom requests are subject to; we could, but we don't do + // any additional locking here. + Ref> decomListRef = new Ref<>(), recomListRef = new Ref<>(); + List existing = getDrainingServersList(); + combineServerListsForReplace(servers, existing, decomListRef, recomListRef); + decomList = decomListRef.t; + recomList = recomListRef.t; + } + + HBaseIOException decomEx = null; + if (decomList != null && !decomList.isEmpty()) { + MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preDecommissionRegionServers(decomList, offload); + } + try { + decommissionSpecificServers(decomList, offload, killAfterOffload); + if (cpHost != null) { + cpHost.postDecommissionRegionServers(decomList, offload); + } + } catch (HBaseIOException ex) { + // Decom can fail for completely normal reasons (an OPENING region), so don't fail yet. + decomEx = ex; + } + + } + if (recomList != null && !recomList.isEmpty()) { + for (ServerName server : recomList) { + // Also calls coprocessors, just like this method does for decom. + recommissionRegionServer(server, null); + } + } + if (decomEx != null) { + throw decomEx; + } + } + + /** + * Recommissions region servers based on master RPC request. + * @param server Server. + * @param encodedRegionNames Optional list of regions to assign to the new server. + */ + public void recommissionRegionServer(ServerName server, List encodedRegionNames) + throws IOException { + MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preRecommissionRegionServer(server, encodedRegionNames); + } + recommissionSpecificServer(server, encodedRegionNames); + if (cpHost != null) { + cpHost.postRecommissionRegionServer(server, encodedRegionNames); + } + } + + private static void combineServerListsForReplace( + List requested, List existing, + Ref> decomListRef, Ref> recomListRef) { + if (existing == null || existing.isEmpty()) { + decomListRef.t = requested; // Just decom all that are requested. + recomListRef.t = null; + return; + } + if (requested == null || requested.isEmpty()) { + decomListRef.t = null; + recomListRef.t = existing; // Reverse of the above. + return; + } + + // We have both new and existing servers; compare the lists. existing list always contains + // fully defined SNs; the new list can have partially defined ones (no SC, or no SC+no port) + List decom = new ArrayList<>(), recom = new ArrayList<>(); + decomListRef.t = decom; + recomListRef.t = recom; + + // Sort by host -> port -> sc + requested.sort(null); + existing.sort(null); + Iterator iterE = existing.iterator(), iterR = requested.iterator(); + ServerName nextE = iterE.next(), nextR = iterR.next(); + while (nextE != null && nextR != null) { + int c = nextE.compareTo(nextR); + if (c < 0) { + // No such server in the new list (nextE is always fully defined), recommission. + recom.add(nextE); + nextE = iterE.hasNext() ? iterE.next() : null; + } else if (c == 0) { // Exact match; already decommissioning. + nextR = iterR.hasNext() ? iterR.next() : null; + nextE = iterE.hasNext() ? iterE.next() : null; + } else if (c > 0) { + // Either nextR is a new server, or it could be missing port/SC and match an existing + // server. Regardless, we will add it to the list; in case the latter case it may match + // additional online servers compared to those currently decommissioning. + decom.add(nextR); + // If nextR is not fully defined, multiple existing servers can match it; skip all of them. + // That should be extremely rare in practice; there'd usually be one. + while (nextR.getStartcode() == ServerName.NON_STARTCODE + && (nextR.getPort() == 0 || nextR.getPort() == nextE.getPort()) + && nextR.getHostname().equalsIgnoreCase(nextE.getHostname())) { + nextE = iterE.hasNext() ? iterE.next() : null; + } + nextR = iterR.hasNext() ? iterR.next() : null; + } + } + // We ran out of one of the lists; after that decom/recom is straightforward. + if (nextE != null) { + recom.add(nextE); + while (iterE.hasNext()) { + recom.add(iterE.next()); + } + } else if (nextR != null) { + decom.add(nextR); + while (iterR.hasNext()) { + decom.add(iterR.next()); + } + } + } + + private void removeServerFromDrainList(final ServerName sn) { + synchronized (drainingServers) { + this.offloadingServers.remove(sn); + this.drainingServers.remove(sn); + } + } + + /** + * @return True if the server is added or the server is already on the drain list. + */ + private boolean addServerToDrainList(final ServerName server, + boolean offload, boolean killAfterOffload, boolean doCreateZNode) throws HBaseIOException { + // Warn if the server (sn) is not online. ServerName is of the form: + // , ( , ) + List onlineToDrain = master.getServerManager().findMatchingServers(server); + if (onlineToDrain == null || onlineToDrain.isEmpty()) { + LOG.warn("No servers matching " + server + " are currently online. " + + "Ignoring request to add it to draining list."); + return false; + } + + for (ServerName sn : onlineToDrain) { + if (doCreateZNode) { + createZNode(sn, offload, killAfterOffload); + } + synchronized (drainingServers) { + // Add the server to the draining servers lists, if it's not already in it. + if (!this.drainingServers.add(sn)) { + LOG.warn("Server " + sn + " is already in the draining server list"); + } else { + LOG.info("Server " + sn + " added to draining server list"); + } + if (!offload) { + if (this.offloadingServers.remove(sn) != null) { + LOG.info("Server " + sn + " was removed from the offloading list while still draining"); + } + } else { + this.offloadingServers.put(sn, killAfterOffload); + } + } + } + return true; + } + + private void createZNode(ServerName server, boolean offload, boolean killOnOffload) + throws HBaseIOException { + ZKWatcher watcher = master.getZooKeeper(); + if (watcher == null) { + return; // Test. + } + try { + String node = ZNodePaths.joinZNode(parentZNode, server.getServerName()); + ZKUtil.createAndFailSilent(watcher, node, createZNodeValue(offload, killOnOffload)); + } catch (KeeperException ke) { + throw new HBaseIOException( + watcher.prefix("Unable to decommission '" + server.getServerName() + "'."), ke); + } + } + + @VisibleForTesting + static byte[] createZNodeValue(boolean offload, boolean killOnOffload) { + ZooKeeperProtos.DecommissionState state = ZooKeeperProtos.DecommissionState.newBuilder() + .setOffload(offload).setKillAfterOffload(killOnOffload).build(); + return ProtobufUtil.prependPBMagic(state.toByteArray()); + } + + private void deleteZNode(ServerName server) throws HBaseIOException { + ZKWatcher watcher = master.getZooKeeper(); + if (watcher == null) { + return; // Test. + } + String node = ZNodePaths.joinZNode(parentZNode, server.getServerName()); + try { + ZKUtil.deleteNodeFailSilent(watcher, node); + } catch (KeeperException ke) { + throw new HBaseIOException(watcher.prefix( + "Unable to recommission '" + server.getServerName() + "'."), ke); + } + } + + /** + * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional + * regions from getting assigned to them. Also unload the regions on the servers asynchronously. + */ + private void decommissionSpecificServers(final List servers, final boolean offload, + final boolean killAfterOffload) throws HBaseIOException { + List serversAdded = new ArrayList<>(servers.size()); + for (ServerName server : servers) { + if (addServerToDrainList(server, offload, killAfterOffload, true)) { + serversAdded.add(server); + } + } + if (!offload) { + return; + } + tryOffloadRegions(serversAdded); + } + + private void tryOffloadRegions(List servers) throws HBaseIOException { + // Note: existing code is best effort; it will only work for OPEN regions without RITs. + // We don't really care about most RITs; the only bad ones to miss are OPENING. There's + // also no sync with potential incoming RITs that we cannot see that will try to open a + // region here. Therefore we'll skip failures; and handle OPENING in regionOpened in + // accordance with the imperative nature of RS-master protocol. + // TODO: we could move this to a procedure; however, we'd need to make sure to coordinate with + // any RITs for other regions (that didn't yet move the region but may have chosen this + // server as target; decom list addition would need to be atomic for all RITs' target + // choice + assignment, or we'd need to have RITs detect they assigned to a decommed + // server and retry); as well as SCP that may be blocked by SD(ecom)P holding a lock. + int errorCount = 0; + HBaseIOException firstEx = null; + + List destServers = master.getServerManager().createDestinationServersList(); + for (ServerName server : servers) { + for (RegionInfo hri : getRegionsOnServer(server)) { + try { + tryMoveRegion(hri, server, destServers); + } catch (HBaseIOException ex) { + // Probably just a state issue, e.g. it's OPENING. + LOG.info("Offloading " + hri + " failed", ex); + ++errorCount; + firstEx = (firstEx == null) ? ex : firstEx; + } + } + } + if (errorCount > 0) { + throw new HBaseIOException("Failed to move " + errorCount + " regions; first", firstEx); + } + } + + @VisibleForTesting + protected Collection getRegionsOnServer(ServerName server) { + return master.getAssignmentManager().getRegionsOnServer(server); + } + + @VisibleForTesting + protected void tryMoveRegion(RegionInfo hri, ServerName from, List destServers) + throws HBaseIOException { + ServerName dest = master.getLoadBalancer().randomAssignment(hri, destServers); + if (dest == null) { + throw new HBaseIOException("Unable to determine a plan to move " + hri); + } + master.getAssignmentManager().moveAsync(new RegionPlan(hri, from, dest)); + } + + private void recommissionSpecificServer(final ServerName server, + final List encodedRegionNames) throws IOException { + // Remove the server from decommissioned (draining) server list. + if (!master.getServerManager().isServerOnline(server)) { + LOG.warn("Server " + server + " is not currently online. " + + "Removing from draining list anyway, as requested."); + } + removeServerFromDrainList(server); + deleteZNode(server); + + // Load the regions onto the server if we are given a list of regions. + if (encodedRegionNames == null || encodedRegionNames.isEmpty()) { + return; + } + if (!master.getServerManager().isServerOnline(server)) { + return; + } + AssignmentManager am = master.getAssignmentManager(); + for (byte[] encodedRegionName : encodedRegionNames) { + RegionState rs = am.getRegionStates().getRegionState(Bytes.toString(encodedRegionName)); + if (rs == null) { + LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName)); + continue; + } + RegionInfo hri = rs.getRegion(); + if (server.equals(rs.getServerName())) { + LOG.info("Skipping move of region " + hri.getRegionNameAsString() + + " because region already assigned to the same server " + server + "."); + continue; + } + RegionPlan rp = new RegionPlan(hri, rs.getServerName(), server); + am.moveAsync(rp); + } + } + + @VisibleForTesting + void initInternal(final List servers) { + List serversAdded = new ArrayList<>(); + // Do this under an epic lock, because RS might start reporting and we want to make sure we + // either offload them here if they report before init, or in the report if they report after. + synchronized (drainingServersFromZk) { + this.drainingServersFromZk.clear(); + if (servers == null) { + return; + } + for (ZKUtil.NodeAndData n : servers) { + ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(n.getNode())); + boolean offload = false, killOnOffload = false; + if (ProtobufUtil.isPBMagicPrefix(n.getData())) { + int o = ProtobufUtil.lengthOfPBMagic(); + try { + ZooKeeperProtos.DecommissionState ds = ZooKeeperProtos.DecommissionState.parser() + .parseFrom(n.getData(), o, n.getData().length - o); + offload = ds.hasOffload() && ds.getOffload(); + killOnOffload = ds.hasKillAfterOffload() && ds.getKillAfterOffload(); + } catch (InvalidProtocolBufferException e) { + LOG.error("Corrupted PB data for decommissioning server " + sn, e); // Assume falses. + } + } else { + LOG.warn("No PB data for decommissioning server " + sn); // Legacy znode? Assume falses. + } + boolean isKnownServer; + try { + isKnownServer = addServerToDrainList(sn, offload, killOnOffload, false); + } catch (HBaseIOException e) { + throw new AssertionError("Unexpected", e); // Only possible w/doCreateZNode. + } + if (!isKnownServer) { + // The server may yet register with master. + this.drainingServersFromZk.put(sn, ZkStateBits.create(offload, killOnOffload)); + } else if (offload) { + serversAdded.add(sn); + } + LOG.info("Draining RS node created, adding to list [" + sn + "]"); + } + } + if (!serversAdded.isEmpty()) { + try { + tryOffloadRegions(serversAdded); + } catch (HBaseIOException e) { + // Ignore, already logged in tryOffload. + } + } + } + + /** A helper class to store 2 booleans in a map value. */ + private static class ZkStateBits { + public static boolean isOffload(int s) { + return (s & 1) == 1; + } + public static boolean isKillOnOffload(int s) { + return (s & 2) == 2; + } + public static int create(boolean isOffload, boolean isKillOnOffload) { + return (isOffload ? 1 : 0) + (isKillOnOffload ? 2 : 0); + } + } + + protected static class Ref { + public T t = null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DrainingServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DrainingServerTracker.java deleted file mode 100644 index 14c4a3ec85..0000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DrainingServerTracker.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.io.IOException; -import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; - -import org.apache.hadoop.hbase.zookeeper.ZKListener; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ServerName; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tracks the list of draining region servers via ZK. - * - *

This class is responsible for watching for changes to the draining - * servers list. It handles adds/deletes in the draining RS list and - * watches each node. - * - *

If an RS gets deleted from draining list, we call - * {@link ServerManager#removeServerFromDrainList(ServerName)} - * - *

If an RS gets added to the draining list, we add a watcher to it and call - * {@link ServerManager#addServerToDrainList(ServerName)} - * - *

This class is deprecated in 2.0 because decommission/draining API goes through - * master in 2.0. Can remove this class in 3.0. - * - */ -@InterfaceAudience.Private -public class DrainingServerTracker extends ZKListener { - private static final Logger LOG = LoggerFactory.getLogger(DrainingServerTracker.class); - - private ServerManager serverManager; - private final NavigableSet drainingServers = new TreeSet<>(); - private Abortable abortable; - - public DrainingServerTracker(ZKWatcher watcher, - Abortable abortable, ServerManager serverManager) { - super(watcher); - this.abortable = abortable; - this.serverManager = serverManager; - } - - /** - * Starts the tracking of draining RegionServers. - * - *

All Draining RSs will be tracked after this method is called. - * - * @throws KeeperException - */ - public void start() throws KeeperException, IOException { - watcher.registerListener(this); - // Add a ServerListener to check if a server is draining when it's added. - serverManager.registerListener(new ServerListener() { - @Override - public void serverAdded(ServerName sn) { - if (drainingServers.contains(sn)){ - serverManager.addServerToDrainList(sn); - } - } - }); - List servers = - ZKUtil.listChildrenAndWatchThem(watcher, watcher.getZNodePaths().drainingZNode); - add(servers); - } - - private void add(final List servers) throws IOException { - synchronized(this.drainingServers) { - this.drainingServers.clear(); - for (String n: servers) { - final ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(n)); - this.drainingServers.add(sn); - this.serverManager.addServerToDrainList(sn); - LOG.info("Draining RS node created, adding to list [" + - sn + "]"); - - } - } - } - - private void remove(final ServerName sn) { - synchronized(this.drainingServers) { - this.drainingServers.remove(sn); - this.serverManager.removeServerFromDrainList(sn); - } - } - - @Override - public void nodeDeleted(final String path) { - if(path.startsWith(watcher.getZNodePaths().drainingZNode)) { - final ServerName sn = ServerName.valueOf(ZKUtil.getNodeName(path)); - LOG.info("Draining RS node deleted, removing from list [" + - sn + "]"); - remove(sn); - } - } - - @Override - public void nodeChildrenChanged(final String path) { - if(path.equals(watcher.getZNodePaths().drainingZNode)) { - try { - final List newNodes = - ZKUtil.listChildrenAndWatchThem(watcher, watcher.getZNodePaths().drainingZNode); - add(newNodes); - } catch (KeeperException e) { - abortable.abort("Unexpected zk exception getting RS nodes", e); - } catch (IOException e) { - abortable.abort("Unexpected zk exception getting RS nodes", e); - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4f5edf7a3e..48aa72bd1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -316,8 +316,6 @@ public class HMaster extends HRegionServer implements MasterServices { private final ActiveMasterManager activeMasterManager; // Region server tracker private RegionServerTracker regionServerTracker; - // Draining region server tracker - private DrainingServerTracker drainingServerTracker; // Tracker for load balancer state LoadBalancerTracker loadBalancerTracker; // Tracker for meta location, if any client ZK quorum specified @@ -352,6 +350,7 @@ public class HMaster extends HRegionServer implements MasterServices { // server manager to deal with region server info private volatile ServerManager serverManager; + private DecommissionManager decommissionManager; // manager of assignment nodes in zookeeper private AssignmentManager assignmentManager; @@ -789,9 +788,6 @@ public class HMaster extends HRegionServer implements MasterServices { this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); - this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); - this.drainingServerTracker.start(); - String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM); boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE, HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE); @@ -956,6 +952,7 @@ public class HMaster extends HRegionServer implements MasterServices { // The below two managers must be created before loading procedures, as they will be used during // loading. this.serverManager = createServerManager(this); + this.decommissionManager = new DecommissionManager(this); this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this); if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { @@ -1406,6 +1403,11 @@ public class HMaster extends HRegionServer implements MasterServices { return this.serverManager; } + @Override + public DecommissionManager getDecommissionManager() { + return this.decommissionManager; + } + @Override public MasterFileSystem getMasterFileSystem() { return this.fileSystemManager; @@ -1742,7 +1744,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.balancer.setClusterLoad(assignmentsByTable); for (Map> serverMap : assignmentsByTable.values()) { - serverMap.keySet().removeAll(this.serverManager.getDrainingServersList()); + serverMap.keySet().removeAll(this.decommissionManager.getDrainingServersList()); } for (Entry>> e : assignmentsByTable.entrySet()) { List partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue()); @@ -3782,52 +3784,21 @@ public class HMaster extends HRegionServer implements MasterServices { return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state)); } - /** - * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional - * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0 - * @param servers Region servers to decommission. - */ - public void decommissionRegionServers(final List servers, final boolean offload) - throws HBaseIOException { - List serversAdded = new ArrayList<>(servers.size()); - // Place the decommission marker first. - String parentZnode = getZooKeeper().getZNodePaths().drainingZNode; - for (ServerName server : servers) { - try { - String node = ZNodePaths.joinZNode(parentZnode, server.getServerName()); - ZKUtil.createAndFailSilent(getZooKeeper(), node); - } catch (KeeperException ke) { - throw new HBaseIOException( - this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke); - } - if (this.serverManager.addServerToDrainList(server)) { - serversAdded.add(server); - } - } - // Move the regions off the decommissioned servers. - if (offload) { - final List destServers = this.serverManager.createDestinationServersList(); - for (ServerName server : serversAdded) { - final List regionsOnServer = this.assignmentManager.getRegionsOnServer(server); - for (RegionInfo hri : regionsOnServer) { - ServerName dest = balancer.randomAssignment(hri, destServers); - if (dest == null) { - throw new HBaseIOException("Unable to determine a plan to move " + hri); - } - RegionPlan rp = new RegionPlan(hri, server, dest); - this.assignmentManager.moveAsync(rp); - } - } - } + + public void decommissionRegionServers(final List servers, final boolean offload, + final boolean isReplace, final boolean killAfterOffload) throws IOException { + List decomList = servers, recomList = null; + decommissionManager.decommissionRegionServers(servers, offload, isReplace, killAfterOffload); } + /** * List region servers marked as decommissioned (previously called 'draining') to not get regions * assigned to them. * @return List of decommissioned servers. */ public List listDecommissionedRegionServers() { - return this.serverManager.getDrainingServersList(); + return this.decommissionManager.getDrainingServersList(); } /** @@ -3837,40 +3808,7 @@ public class HMaster extends HRegionServer implements MasterServices { */ public void recommissionRegionServer(final ServerName server, final List encodedRegionNames) throws IOException { - // Remove the server from decommissioned (draining) server list. - String parentZnode = getZooKeeper().getZNodePaths().drainingZNode; - String node = ZNodePaths.joinZNode(parentZnode, server.getServerName()); - try { - ZKUtil.deleteNodeFailSilent(getZooKeeper(), node); - } catch (KeeperException ke) { - throw new HBaseIOException( - this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke); - } - this.serverManager.removeServerFromDrainList(server); - - // Load the regions onto the server if we are given a list of regions. - if (encodedRegionNames == null || encodedRegionNames.isEmpty()) { - return; - } - if (!this.serverManager.isServerOnline(server)) { - return; - } - for (byte[] encodedRegionName : encodedRegionNames) { - RegionState regionState = - assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName)); - if (regionState == null) { - LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName)); - continue; - } - RegionInfo hri = regionState.getRegion(); - if (server.equals(regionState.getServerName())) { - LOG.info("Skipping move of region " + hri.getRegionNameAsString() + - " because region already assigned to the same server " + server + "."); - continue; - } - RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server); - this.assignmentManager.moveAsync(rp); - } + decommissionManager.recommissionRegionServer(server, encodedRegionNames); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 222a6868a6..cbd3a8146d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -524,6 +524,7 @@ public class MasterRpcServices extends RSRpcServices master.getServerManager().regionServerReport(serverName, newLoad); master.getAssignmentManager().reportOnlineRegions(serverName, newLoad.getRegionMetrics().keySet()); + master.getDecommissionManager().checkServerReport(serverName, newLoad); if (sl != null && master.metricsMaster != null) { // Up our metrics. master.metricsMaster.incrementRequests( @@ -2077,13 +2078,10 @@ public class MasterRpcServices extends RSRpcServices List servers = request.getServerNameList().stream() .map(pbServer -> ProtobufUtil.toServerName(pbServer)).collect(Collectors.toList()); boolean offload = request.getOffload(); - if (master.cpHost != null) { - master.cpHost.preDecommissionRegionServers(servers, offload); - } - master.decommissionRegionServers(servers, offload); - if (master.cpHost != null) { - master.cpHost.postDecommissionRegionServers(servers, offload); - } + boolean killOnOffload = request.hasKillAfterOffload() && request.getKillAfterOffload(); + boolean isReplace = request.hasReplaceDecommissionList() + && request.getReplaceDecommissionList(); + master.decommissionRegionServers(servers, offload, isReplace, killOnOffload); } catch (IOException io) { throw new ServiceException(io); } @@ -2100,13 +2098,8 @@ public class MasterRpcServices extends RSRpcServices List encodedRegionNames = request.getRegionList().stream() .map(regionSpecifier -> regionSpecifier.getValue().toByteArray()) .collect(Collectors.toList()); - if (master.cpHost != null) { - master.cpHost.preRecommissionRegionServer(server, encodedRegionNames); - } + master.recommissionRegionServer(server, encodedRegionNames); - if (master.cpHost != null) { - master.cpHost.postRecommissionRegionServer(server, encodedRegionNames); - } } catch (IOException io) { throw new ServiceException(io); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 12c78ac8f6..45916b15a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -83,6 +83,11 @@ public interface MasterServices extends Server { */ AssignmentManager getAssignmentManager(); + /** + * @return Master's instance of the {@link DecommissionManager} + */ + DecommissionManager getDecommissionManager(); + /** * @return Master's filesystem {@link MasterFileSystem} utility class. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 0fb1551792..ffd8a41774 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -165,9 +166,6 @@ public class ServerManager { */ private final Map rsAdmins = new HashMap<>(); - /** List of region servers that should not get any more new regions. */ - private final ArrayList drainingServers = new ArrayList<>(); - private final MasterServices master; private final ClusterConnection connection; @@ -602,7 +600,9 @@ public class ServerManager { return false; } LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName()); - long pid = master.getAssignmentManager().submitServerCrash(serverName, true); + AssignmentManager am = master.getAssignmentManager(); + // am may be null in tests. + long pid = am == null ? 1 : am.submitServerCrash(serverName, true); if(pid <= 0) { return false; } else { @@ -636,45 +636,6 @@ public class ServerManager { this.rsAdmins.remove(sn); } - /* - * Remove the server from the drain list. - */ - public synchronized boolean removeServerFromDrainList(final ServerName sn) { - // Warn if the server (sn) is not online. ServerName is of the form: - // , , - - if (!this.isServerOnline(sn)) { - LOG.warn("Server " + sn + " is not currently online. " + - "Removing from draining list anyway, as requested."); - } - // Remove the server from the draining servers lists. - return this.drainingServers.remove(sn); - } - - /** - * Add the server to the drain list. - * @param sn - * @return True if the server is added or the server is already on the drain list. - */ - public synchronized boolean addServerToDrainList(final ServerName sn) { - // Warn if the server (sn) is not online. ServerName is of the form: - // , , - - if (!this.isServerOnline(sn)) { - LOG.warn("Server " + sn + " is not currently online. " + - "Ignoring request to add it to draining list."); - return false; - } - // Add the server to the draining servers lists, if it's not already in - // it. - if (this.drainingServers.contains(sn)) { - LOG.warn("Server " + sn + " is already in the draining server list." + - "Ignoring request to add it again."); - return true; - } - LOG.info("Server " + sn + " added to draining server list."); - return this.drainingServers.add(sn); - } // RPC methods to region servers @@ -896,13 +857,6 @@ public class ServerManager { return names; } - /** - * @return A copy of the internal list of draining servers. - */ - public List getDrainingServersList() { - return new ArrayList<>(this.drainingServers); - } - public boolean isServerOnline(ServerName serverName) { return serverName != null && onlineServers.containsKey(serverName); } @@ -977,8 +931,7 @@ public class ServerManager { } // Loop through the draining server list and remove them from the server list - final List drainingServersCopy = getDrainingServersList(); - destServers.removeAll(drainingServersCopy); + destServers.removeAll(master.getDecommissionManager().getDrainingServersList()); return destServers; } @@ -1167,6 +1120,39 @@ public class ServerManager { } } + + + /** + * Depending on serverName supplied, returns matching online servers. + * If ServerName is fully specified, performs an exact match. If start code or port are + * missing (negative), returns any servers matching the hostname+port, or just hostname. + */ + public synchronized List findMatchingServers(final ServerName serverName) { + if (serverName.getStartcode() != ServerName.NON_STARTCODE) { + // Test for the exact match first; a common, cheap path. + if (!onlineServers.containsKey(serverName)) { + return null; + } + List list = new ArrayList<>(1); + list.add(serverName); + return list; + } + // Both port and start code are zero/negative when not specified. + Map tailMap = onlineServers.tailMap(serverName, true); + List result = null; + for (ServerName actualSn : tailMap.keySet()) { + if ((actualSn.getHostname().compareToIgnoreCase(serverName.getHostname()) != 0) + || (serverName.getPort() > 0 && actualSn.getPort() != serverName.getPort())) { + break; + } + if (result == null) { + result = new ArrayList<>(1); // We would almost always have just one match. + } + result.add(actualSn); + } + return result; + } + private class FlushedSequenceIdFlusher extends ScheduledChore { public FlushedSequenceIdFlusher(String name, int p) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 5bdbb92769..99d503c7e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -951,6 +951,14 @@ public class AssignmentManager { } finally { regionNode.unlock(); } + + // See the TO-DO in decommissionSpecificServers. + // There's no coordination between decom and rits. reportTransitionOpen should have set + // the region to opened; if there is decom going on, it will now move the region away. + // If the move fails, the call MAY fail and kill the RS. + if (state.equals(TransitionCode.OPENED)) { + this.master.getDecommissionManager().reportAfterRegionOpened(regionInfo, serverName); + } } private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 9c55f57212..874ac73c2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -94,6 +94,11 @@ public class MockNoopMasterServices implements MasterServices { return null; } + @Override + public DecommissionManager getDecommissionManager() { + return null; + } + @Override public ExecutorService getExecutorService() { return null; @@ -473,4 +478,4 @@ public class MockNoopMasterServices implements MasterServices { public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() { return null; } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDecommissionManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDecommissionManager.java new file mode 100644 index 0000000000..84a0094308 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDecommissionManager.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.RegionMetricsBuilder; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerMetricsBuilder; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category({MasterTests.class, SmallTests.class}) +public class TestDecommissionManager { + private static final Logger LOG = LoggerFactory.getLogger(TestDecommissionManager.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDecommissionManager.class); + + @Rule + public final TestName name = new TestName(); + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private MasterServices master; + + @Before + public void setup() throws IOException, KeeperException { + master = new MasterServices(); + master.sm = new ServerManager(master); + master.dm = new DecommissionManagerForTest(master); + } + + @After + public void teardown() { + this.master.stop(""); + } + + @Test + public void testLoadFromZk() throws Exception { + List sns = initServersAndRegions(8, false, 1, 1, 1, 1, 1, 1, 0, 0); + // Init dm from ZK; 0-7 are all decommissioned. 0 znode is empty and 1 corrupt; + // 2-3 are to be offloaded and 4-6 are to be offloaded and killed. 6 is empty + // 1, 3, 5, 6 have not reported to master. + master.sm.regionServerReport(sns.get(0), master.dm.getSm(sns.get(0))); + master.sm.regionServerReport(sns.get(2), master.dm.getSm(sns.get(2))); + master.sm.regionServerReport(sns.get(4), master.dm.getSm(sns.get(4))); + master.sm.regionServerReport(sns.get(7), master.dm.getSm(sns.get(7))); + + List initData = new ArrayList<>(6); + initData.add(new ZKUtil.NodeAndData(nodeName(sns, 0), new byte[0])); + initData.add(new ZKUtil.NodeAndData(nodeName(sns, 1), + ProtobufUtil.prependPBMagic(new byte[] { 0 }))); + byte[] val23 = DecommissionManager.createZNodeValue(true, false); + byte[] val456 = DecommissionManager.createZNodeValue(true, true); + initData.add(new ZKUtil.NodeAndData(nodeName(sns, 2), val23)); + initData.add(new ZKUtil.NodeAndData(nodeName(sns, 3), val23)); + initData.add(new ZKUtil.NodeAndData(nodeName(sns, 4), val456)); + initData.add(new ZKUtil.NodeAndData(nodeName(sns, 5), val456)); + initData.add(new ZKUtil.NodeAndData(nodeName(sns, 6), val456)); + + master.dm.initInternal(initData); + verifyDrainingServers(sns, 0, 2, 4); // 0, 2, 4 are known and draining + master.dm.performMoves(sns.get(0)); + assertEquals(1, master.dm.getRegionsOnServer(sns.get(0)).size()); // Not offloading. + master.dm.performAllMoves(); + assertEquals(0, master.dm.getRegionsOnServer(sns.get(2)).size()); // Offloading. + assertEquals(0, master.dm.getRegionsOnServer(sns.get(4)).size()); // Offloading. + master.dm.checkServerReport(sns.get(2), master.dm.getSm(sns.get(2))); // Not killed. + try { + master.dm.checkServerReport(sns.get(4), master.dm.getSm(sns.get(4))); // Killed. + fail("Check should have killed RS"); + } catch (YouAreDeadException e) { + } + + // Now all the "late" servers dial in. + master.sm.regionServerReport(sns.get(1), master.dm.getSm(sns.get(1))); + master.sm.regionServerReport(sns.get(3), master.dm.getSm(sns.get(3))); + master.sm.regionServerReport(sns.get(5), master.dm.getSm(sns.get(5))); + master.sm.regionServerReport(sns.get(6), master.dm.getSm(sns.get(6))); + verifyDrainingServers(sns, 0, 1, 2, 3, 4, 5, 6); + master.dm.performAllMoves(); + assertEquals(1, master.dm.getRegionsOnServer(sns.get(1)).size()); // Not offloading. + assertEquals(0, master.dm.getRegionsOnServer(sns.get(3)).size()); // Offloading. + assertEquals(0, master.dm.getRegionsOnServer(sns.get(5)).size()); // Offloading. + master.dm.checkServerReport(sns.get(3), master.dm.getSm(sns.get(3))); // Not killed. + try { + master.dm.checkServerReport(sns.get(5), master.dm.getSm(sns.get(5))); // Killed. + fail("Check should have killed RS"); + } catch (YouAreDeadException e) { + } + try { + master.dm.checkServerReport(sns.get(6), master.dm.getSm(sns.get(6))); // Killed. + fail("Check should have killed RS"); + } catch (YouAreDeadException e) { + } + } + + private static String nodeName(List sns, int i) { + return "/r/" + sns.get(i).getServerName(); + } + + @Test + public void testKillOnOffload() throws Exception { + List sns = initServersAndRegions(4, 2, 2, 0, 0); + master.dm.decommissionRegionServers(sns.subList(0, 3), true, false, true); + // Check all servers - 2 will be killed at once as it's empty; 0 & 1 have regions. + checkServerReports(sns, 2); + // Complete the moves for one server. + // Also verify if servers keep pinging we keep killing them. + master.dm.performMoves(sns.get(1)); + checkServerReports(sns, 1, 2); + // Recommissioned server should not be killed even after moves. + master.dm.performAllMoves(); + assertEquals(0, master.dm.getRegionsOnServer(sns.get(0)).size()); + master.dm.recommissionRegionServer(sns.get(0), null); + checkServerReports(sns, 1, 2); + + // Verify a dead server is removed from the list by "reporting" from it. + // It's not a realistic condition but just a way to check that the list is cleaned up. + master.sm.expireServer(sns.get(1)); + checkServerReports(sns, 2); + + // Finally, try to open a region on server that's offloading and set to be killed. + // If we cannot move the newly opened region, we'll just kill the server as is. + RegionInfo badRi = prepareBadServer(sns.get(3), true); + try { + master.dm.reportAfterRegionOpened(badRi, sns.get(3)); + fail("We should have failed the server after the report"); + } catch (HBaseIOException ex) { + } + + // Same without kill - the failed-to-move on open shouldn't throw and kill the RS. + master.dm.recommissionRegionServer(sns.get(3), null); + badRi = prepareBadServer(sns.get(0), false); + master.dm.reportAfterRegionOpened(badRi, sns.get(0)); + } + + private RegionInfo prepareBadServer(ServerName sn, boolean doKill) throws IOException { + RegionInfo badRi = master.dm.getRegionsOnServer(sn).iterator().next(); + master.dm.failToMove.add(badRi.getEncodedName()); + try { + master.dm.decommissionRegionServers(Lists.newArrayList(sn), true, false, doKill); + fail("Decom should have failed"); + } catch (HBaseIOException ex) { + } + master.dm.performAllMoves(); + assertEquals(1, master.dm.getRegionsOnServer(sn).size()); + return badRi; + } + + private void checkServerReports(List sns, Integer... kills) { + List ls = Lists.newArrayList(kills); + for (int i = 0; i < sns.size(); ++i) { + boolean shouldFail = ls.contains(i); + try { + master.dm.checkServerReport(sns.get(i), master.dm.getSm(sns.get(i))); + assertFalse(shouldFail); + } catch (YouAreDeadException e) { + assertTrue(shouldFail); + } + } + } + + @Test + public void testOffloadErrorsAndOpened() throws Exception { + List sns = initServersAndRegions(3, 4, 1, 0); + RegionInfo badRi = master.dm.getRegionsOnServer(sns.get(0)).iterator().next(); + String badRiName = badRi.getEncodedName(); + + // Test regular failure and retry. + master.dm.failToMove.add(badRiName); + try { + // Decommission 2 RS - one region on 0 would fail, but decom and other moves should proceed. + master.dm.decommissionRegionServers(sns.subList(0, 2), true, false, false); + fail("Decom should have failed"); + } catch (HBaseIOException ex) { + } + master.dm.performAllMoves(); + verifyRegionCounts(sns, 1, 0, 4); + verifyDrainingServers(sns, 0, 1); + // Try again. + master.dm.failToMove.clear(); + master.dm.decommissionRegionServers(sns.subList(0, 1), true, false, false); + master.dm.performAllMoves(); + verifyRegionCounts(sns, 0, 0, 5); + verifyDrainingServers(sns, 0, 1); + + // Introduce the failure again. + master.dm.recommissionRegionServer(sns.get(0), null); + master.dm.failToMove.add(badRiName); + try { + master.dm.decommissionRegionServers(sns.subList(2, 3), true, false, false); + fail("Decom should have failed"); + } catch (HBaseIOException ex) { + } + master.dm.performAllMoves(); + verifyRegionCounts(sns, 4, 0, 1); + verifyDrainingServers(sns, 1, 2); + + // Suppose it was an opening region, and now it has been opened. + master.dm.failToMove.clear(); + master.dm.reportAfterRegionOpened(badRi, sns.get(2)); + master.dm.performAllMoves(); + verifyRegionCounts(sns, 5, 0, 0); + verifyDrainingServers(sns, 1, 2); + + // Same as above, but without offload. + master.dm.failToMove.add(badRiName); + master.dm.recommissionRegionServer(sns.get(2), null); + // Won't fail because offload is false. + master.dm.decommissionRegionServers(sns.subList(0, 1), false, false, false); + verifyDrainingServers(sns, 0, 1); + // Report the region has opened on a decommissioned server w/o offload - nothing happens. + master.dm.reportAfterRegionOpened(badRi, sns.get(0)); + master.dm.performAllMoves(); + verifyRegionCounts(sns, 5, 0, 0); + } + + @Test + public void testOffload() throws Exception { + List sns = initServersAndRegions(3, 1, 2, 2); + // Decom 2, allow the moves to finish, verify there are no regions on it. + master.dm.decommissionRegionServers(sns.subList(2, 3), true, false, false); + master.dm.performAllMoves(); + int[] regionCounts = master.dm.getRegionCountsOnServers(sns); + assertEquals(0, regionCounts[2]); + verifyDrainingServers(sns, 2); + // Decom 1, but don't offload. + master.dm.decommissionRegionServers(sns.subList(1, 2), false, false, false); + master.dm.performAllMoves(); + verifyDrainingServers(sns, 1, 2); + assertEquals(regionCounts[1], master.dm.getRegionsOnServer(sns.get(1)).size()); + // Recommission 1, decommission 0. + master.dm.recommissionRegionServer(sns.get(1), null); + master.dm.decommissionRegionServers(sns.subList(0, 1), true, false, false); + verifyDrainingServers(sns, 0, 2); + master.dm.performAllMoves(); + verifyRegionCounts(sns, 0, 5, 0); + } + + private void verifyRegionCounts(List sns, int... counts) { + int[] regionCounts = master.dm.getRegionCountsOnServers(sns); + for (int i = 0; i < counts.length; ++i) { + assertEquals(counts[i], regionCounts[i]); + } + } + + @Test + public void testAddRemoveReplace() throws Exception { + List sns = initServersAndRegions(5); + + master.dm.decommissionRegionServers(sns.subList(0, 1), false, false, false); + verifyDrainingServers(sns, 0); + master.dm.decommissionRegionServers(sns.subList(2, 3), false, false, false); + verifyDrainingServers(sns, 0, 2); + master.dm.decommissionRegionServers(sns.subList(4, 5), false, false, false); + verifyDrainingServers(sns, 0, 2, 4); + master.dm.decommissionRegionServers(sns.subList(4, 5), false, false, false); + verifyDrainingServers(sns, 0, 2, 4); + master.dm.recommissionRegionServer(sns.get(2), null); + verifyDrainingServers(sns, 0, 4); + master.dm.recommissionRegionServer(sns.get(2), null); + verifyDrainingServers(sns, 0, 4); + + // Replace... + verifyReplace(sns, 1, 2, 3); // Disjoint list with above. + verifyReplace(sns, 2, 4); // Partial match. + verifyReplace(sns, 2, 3, 4); // More. + verifyReplace(sns, 0, 1, 2); // Head. + verifyReplace(sns, 2, 3); // Tail. + verifyReplace(sns, 1, 2, 3, 4); // Both. + verifyReplace(sns, 2, 3); // Subset. + verifyReplace(sns); // Nothing. + verifyReplace(sns, 1, 3); // From empty. + verifyReplace(sns, 1, 3); // Interleaved. + verifyReplace(sns, 0, 2, 4); // Interleaved. + + // Verify partial matching. + ServerName sn0part = ServerName.valueOf(sns.get(0).getHostname(), 0, -1), + sn3part = ServerName.valueOf(sns.get(3).getHostname(), sns.get(3).getPort(), -1); + master.dm.decommissionRegionServers(Lists.newArrayList(sn0part), false, false, false); + verifyDrainingServers(sns, 0, 2, 4); + master.dm.decommissionRegionServers(Lists.newArrayList(sn0part, sn3part), false, true, false); + verifyDrainingServers(sns, 0, 3); + verifyReplace(sns, 3); // Replace with the full name. + + // Verify multiple servers on the same node with partial match (different ports). + ServerName sn0a = ServerName.valueOf(sns.get(0).getHostname(), 101, 1); + master.sm.regionServerReport(sn0a, master.dm.getSm(sn0a)); + master.dm.decommissionRegionServers(Lists.newArrayList(sn0part), false, true, false); + assertEquals(2, master.dm.getDrainingServersList().size()); + assertTrue(master.dm.getDrainingServersList().contains(sns.get(0))); + assertTrue(master.dm.getDrainingServersList().contains(sn0a)); + verifyReplace(sns, 0); // Replace with the full name - we should just get one server not both. + } + + private List initServersAndRegions(int n, int... rn) throws YouAreDeadException { + return initServersAndRegions(n, true, rn); + } + + private List initServersAndRegions( + int n, boolean isReport, int... rn) throws YouAreDeadException { + List sns = new ArrayList<>(n); + int totalRi = -1; + for (int i = 0; i < n; ++i) { + ServerName sn = ServerName.valueOf("server" + i, 100, 1); + sns.add(sn); + int regionCount = rn.length > i ? rn[i] : 1; + for (int ri = 0; ri < regionCount; ++ri) { + RegionInfo rInfo = createRegionInfo(++totalRi); + master.dm.addRegion(sn, rInfo); + } + if (isReport) { + master.sm.regionServerReport(sn, master.dm.getSm(sn)); + } + } + return sns; + } + + private void verifyReplace(List sns, int... ix) throws IOException { + List replace = new ArrayList<>(ix.length); + for (int i : ix) { + replace.add(sns.get(i)); + } + master.dm.decommissionRegionServers(replace, false, true, false); + verifyDrainingServers(sns, ix); + } + + private void verifyDrainingServers(List sns, int... ix) { + assertEquals(ix.length, master.dm.getDrainingServersList().size()); + for (int i : ix) { + assertTrue(master.dm.getDrainingServersList().contains(sns.get(i))); + } + } + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + private static RegionInfo createRegionInfo(final long regionId) { + return RegionInfoBuilder.newBuilder(TABLE_NAME) + .setStartKey(Bytes.toBytes(regionId)) + .setEndKey(Bytes.toBytes(regionId + 1)) + .setSplit(false) + .setRegionId(0) + .build(); + } + + private static class DecommissionManagerForTest extends DecommissionManager { + public Map>> regions = new HashMap<>(); + public HashSet failToMove = new HashSet<>(); + + public DecommissionManagerForTest(MasterServices m) { + super(m); + } + + public int cancelMoves(ServerName sn) { + int result = 0; + for (Ref r : regions.get(sn).values()) { + result += (r.t != null) ? 1 : 0; + r.t = null; + } + return result; + } + + public void performMoves(ServerName sn) { + Iterator>> i = regions.get(sn).entrySet().iterator(); + while (i.hasNext()) { + Map.Entry> e = i.next(); + ServerName to = e.getValue().t; + if (to != null) { + addRegion(to, e.getKey()); + i.remove(); + } + } + } + + public void performAllMoves() { + for (ServerName sn : new ArrayList<>(regions.keySet())) { + performMoves(sn); + } + } + + public ServerMetrics getSm(ServerName sn) { + ServerMetricsBuilder b = ServerMetricsBuilder.newBuilder(sn); + IdentityHashMap> map = regions.get(sn); + if (map != null) { + List rms = new ArrayList<>(); + for (RegionInfo ri : map.keySet()) { + rms.add(RegionMetricsBuilder.newBuilder(ri.getEncodedNameAsBytes()).build()); + } + b.setRegionMetrics(rms); + } + return b.build(); + } + + public void addRegion(ServerName sn, RegionInfo ri) { + IdentityHashMap> map = regions.get(sn); + if (map == null) { + map = new IdentityHashMap<>(); + regions.put(sn, map); + } + map.put(ri, new Ref<>()); + } + + @Override + protected Collection getRegionsOnServer(ServerName server) { + IdentityHashMap> r = regions.get(server); + return r == null ? Lists.newArrayList() : r.keySet(); + } + + private int[] getRegionCountsOnServers(List servers) { + int[] result = new int[servers.size()]; + for (int i = 0; i < servers.size(); ++i) { + result[i] = regions.get(servers.get(i)).size(); + } + return result; + } + + + @Override + protected void tryMoveRegion(RegionInfo hri, ServerName from, List destServers) + throws HBaseIOException { + if (failToMove.contains(hri.getEncodedName())) { + throw new HBaseIOException("Test error for " + hri.getEncodedName()); + } + IdentityHashMap> fromMap = regions.get(from); + Ref to = fromMap.get(hri); + if (to == null) { + // We explicitly inject errors above. + throw new AssertionError("Region is not on the server; unexpected in test"); + } + if (to.t != null) { + // We explicitly inject errors above. + throw new AssertionError("Region is moving; unexpected in test"); + } + to.t = destServers.get(0); + } + } + + private class MasterServices extends MockNoopMasterServices { + public ServerManager sm; + public DecommissionManagerForTest dm; + + public MasterServices() { + super(HTU.getConfiguration()); + } + + @Override + public boolean isInitialized() { + return true; + } + + @Override + public ServerManager getServerManager() { + return sm; + } + + @Override + public DecommissionManager getDecommissionManager() { + return dm; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index 56467cc6d6..c8d1243c59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.DecommissionManager; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; @@ -90,6 +91,7 @@ public class MockMasterServices extends MockNoopMasterServices { private final ClusterConnection connection; private final LoadBalancer balancer; private final ServerManager serverManager; + private final DecommissionManager decommissionManager; private final ProcedureEvent initialized = new ProcedureEvent<>("master initialized"); public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; @@ -116,6 +118,7 @@ public class MockMasterServices extends MockNoopMasterServices { }; this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.serverManager = new ServerManager(this); + this.decommissionManager = new DecommissionManager(this); this.tableStateManager = Mockito.mock(TableStateManager.class); Mockito.when(this.tableStateManager.getTableState(Mockito.any())). thenReturn(new TableState(TableName.valueOf("AnyTableNameSetInMockMasterServcies"), @@ -273,6 +276,11 @@ public class MockMasterServices extends MockNoopMasterServices { return serverManager; } + @Override + public DecommissionManager getDecommissionManager() { + return decommissionManager; + } + @Override public AssignmentManager getAssignmentManager() { return assignmentManager; diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 51401b0206..28e1e673c5 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -441,8 +441,14 @@ public final class ZKUtil { public static List listChildrenAndWatchForNewChildren( ZKWatcher zkw, String znode) throws KeeperException { + return listChildren(zkw, znode, true); + } + + public static List listChildren(ZKWatcher zkw, String znode, boolean doWatch) + throws KeeperException { + ZKWatcher watcher = doWatch ? zkw : null; try { - List children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw); + List children = zkw.getRecoverableZooKeeper().getChildren(znode, watcher); return children; } catch(KeeperException.NoNodeException ke) { LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " + @@ -510,9 +516,7 @@ public final class ZKUtil { /** * Simple class to hold a node path and node data. - * @deprecated Unused */ - @Deprecated public static class NodeAndData { private String node; private byte [] data; @@ -751,6 +755,21 @@ public final class ZKUtil { return null; } + public static List getChildData( + ZKWatcher zkw, String baseNode) throws KeeperException, InterruptedException { + List nodes = ZKUtil.listChildren(zkw, baseNode, false); + if (nodes != null) { + List newNodes = new ArrayList<>(); + for (String node : nodes) { + String nodePath = ZNodePaths.joinZNode(baseNode, node); + byte[] data = ZKUtil.getData(zkw, nodePath); + newNodes.add(new NodeAndData(nodePath, data)); + } + return newNodes; + } + return null; + } + /** * Update the data of an existing node with the expected version to have the * specified data.