Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1345623) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -37,6 +37,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -181,6 +182,10 @@ */ private final SortedMap regions = new TreeMap(); + + // To maintain regions of dead servers from region plan. + private final NavigableMap + deadServerRegionsFromRegionPlan = new ConcurrentSkipListMap(); private final ExecutorService executorService; @@ -307,6 +312,13 @@ regionPlans.putAll(plans); } } + + /** + * @return dead server regions from region plan. + */ + public NavigableMap getDeadServerRegionsFromRegionPlan(){ + return this.deadServerRegionsFromRegionPlan; + } /** * Set the list of regions that will be reopened @@ -887,7 +899,10 @@ regionState.update(RegionState.State.CLOSED, createTime, sn); // When there are more than one region server a new RS is selected as the // destination and the same is updated in the regionplan. (HBASE-5546) - getRegionPlan(regionState, sn, true); + RegionPlan regionPlan = getRegionPlan(regionState, sn, true); + if (regionPlan == RegionPlan.UNUSABLE_PLAN) { + break; + } this.executorService.submit(new ClosedRegionHandler(master, this, regionState.getRegion())); break; @@ -1706,11 +1721,14 @@ return; } RegionPlan plan = getRegionPlan(state, forceNewPlan); - if (plan == null) { + if (plan == RegionPlan.NO_SERVERS_TO_ASSIGN) { LOG.debug("Unable to determine a plan to assign " + state); this.timeoutMonitor.setAllRegionServersOffline(true); return; // Should get reassigned later when RIT times out. } + if (plan == RegionPlan.UNUSABLE_PLAN) { + return; + } try { LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() + " to " + plan.getDestination().toString()); @@ -1768,11 +1786,14 @@ // Transition back to OFFLINE state.update(RegionState.State.OFFLINE); // Force a new plan and reassign. Will return null if no servers. - if (getRegionPlan(state, plan.getDestination(), true) == null) { + RegionPlan newPlan = getRegionPlan(state, plan.getDestination(), true); + if (newPlan == RegionPlan.NO_SERVERS_TO_ASSIGN) { this.timeoutMonitor.setAllRegionServersOffline(true); LOG.warn("Unable to find a viable location to assign region " + state.getRegion().getRegionNameAsString()); return; + } else if (newPlan == RegionPlan.UNUSABLE_PLAN) { + return; } } } @@ -1888,7 +1909,7 @@ * @param forceNewPlan If true, then if an existing plan exists, a new plan * will be generated. * @return Plan for passed state (If none currently, it creates one or - * if no servers to assign, it returns null). + * if no servers to assign, it returns RegionPlan.NO_SERVERS_TO_ASSIGN). */ RegionPlan getRegionPlan(final RegionState state, final ServerName serverToExclude, final boolean forceNewPlan) { @@ -1900,7 +1921,7 @@ if (destServers.isEmpty()){ LOG.warn("Can't move the region " + encodedName + ", there is no destination server available."); - return null; + return RegionPlan.NO_SERVERS_TO_ASSIGN; } RegionPlan randomPlan = null; @@ -1925,6 +1946,28 @@ balancer.randomAssignment(state.getRegion(), destServers)); this.regionPlans.put(encodedName, randomPlan); } + + if (serverToExclude != null) { + RegionsOnDeadServer regionsOnDeadServer = this.deadServerRegionsFromRegionPlan + .get(serverToExclude); + if (regionsOnDeadServer != null + && regionsOnDeadServer.getRegionsFromRegionPlansForServer(). + contains(state.getRegion())) { + if (newPlan) { + this.regionPlans.remove(randomPlan.getRegionName()); + LOG + .info("Server shutdown handler already in progress for the region " + + randomPlan.getRegionName()); + randomPlan = RegionPlan.UNUSABLE_PLAN; + } else { + this.regionPlans.remove(existingPlan.getRegionName()); + LOG + .info("Server shutdown handler already in progress for the region " + + existingPlan.getRegionName()); + existingPlan = RegionPlan.UNUSABLE_PLAN; + } + } + } } if (newPlan) { @@ -3198,11 +3241,14 @@ /** * Process shutdown server removing any assignments. * @param sn Server that went down. - * @return list of regions in transition on this server + * @return list of regions in transition and region plans on this server */ - public List processServerShutdown(final ServerName sn) { + public RegionsOnDeadServer processServerShutdown(final ServerName sn) { + RegionsOnDeadServer regionsOnDeadServer = new RegionsOnDeadServer(); + Set regionsFromRegionPlansForServer = new ConcurrentSkipListSet(); // Clean out any existing assignment plans for this server synchronized (this.regionPlans) { + this.deadServerRegionsFromRegionPlan.put(sn, regionsOnDeadServer); for (Iterator > i = this.regionPlans.entrySet().iterator(); i.hasNext();) { Map.Entry e = i.next(); @@ -3210,9 +3256,11 @@ // The name will be null if the region is planned for a random assign. if (otherSn != null && otherSn.equals(sn)) { // Use iterator's remove else we'll get CME + regionsFromRegionPlansForServer.add(e.getValue().getRegionInfo()); i.remove(); } } + regionsOnDeadServer.setRegionsFromRegionPlansForServer(regionsFromRegionPlansForServer); } // TODO: Do we want to sync on RIT here? // Remove this server from map of servers to regions, and remove all regions @@ -3223,7 +3271,8 @@ Set assignedRegions = this.servers.remove(sn); if (assignedRegions == null || assignedRegions.isEmpty()) { // No regions on this server, we are done, return empty list of RITs - return rits; + regionsOnDeadServer.setRegionsInTransition(rits); + return regionsOnDeadServer; } deadRegions = new TreeSet(assignedRegions); for (HRegionInfo region : deadRegions) { @@ -3240,7 +3289,8 @@ rits.add(region); } } - return rits; + regionsOnDeadServer.setRegionsInTransition(rits); + return regionsOnDeadServer; } /** @@ -3560,4 +3610,55 @@ this.master.abort(errorMsg, e); } } + + /** + * + * @param hri + * @return whether region is online or not. + */ + public boolean isRegionOnline(HRegionInfo hri) { + ServerName sn = null; + synchronized (this.regions) { + sn = this.regions.get(hri); + if (sn == null) { + return false; + } + if (this.isServerOnline(sn)) { + return true; + } + // Remove the assignment mapping for sn. + Set hriSet = this.servers.get(sn); + if (hriSet != null) { + hriSet.remove(hri); + } + this.regions.remove(hri); + return false; + } + } + + /** + * Store the related regions on a dead server, used by processServerShutdown. + */ + public static class RegionsOnDeadServer { + // The regions which being processed on this dead server. + private Set regionsFromRegionPlansForServer = null; + private List regionsInTransition = null; + + public Set getRegionsFromRegionPlansForServer() { + return regionsFromRegionPlansForServer; + } + + public void setRegionsFromRegionPlansForServer( + Set regionsFromRegionPlansForServer) { + this.regionsFromRegionPlansForServer = regionsFromRegionPlansForServer; + } + + public List getRegionsInTransition() { + return regionsInTransition; + } + + public void setRegionsInTransition(List regionsInTransition) { + this.regionsInTransition = regionsInTransition; + } + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (revision 1345623) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (working copy) @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionsOnDeadServer; import org.apache.hadoop.hbase.master.DeadServer; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerManager; @@ -171,6 +173,8 @@ @Override public void process() throws IOException { final ServerName serverName = this.serverName; + Set regionsFromRegionPlansForServer = null; + List regionsInTransition = null; try { try { if (this.shouldSplitHlog) { @@ -232,9 +236,10 @@ // doing after log splitting. Could do some states before -- OPENING? // OFFLINE? -- and then others after like CLOSING that depend on log // splitting. - List regionsInTransition = - this.services.getAssignmentManager(). - processServerShutdown(this.serverName); + RegionsOnDeadServer regionsOnDeadServer = this.services + .getAssignmentManager().processServerShutdown(this.serverName); + regionsFromRegionPlansForServer = regionsOnDeadServer.getRegionsFromRegionPlansForServer(); + regionsInTransition = regionsOnDeadServer.getRegionsInTransition(); // Wait on meta to come online; we need it to progress. // TODO: Best way to hold strictly here? We should build this retry logic @@ -285,8 +290,8 @@ " regions(s) that are already in transition)"); // Iterate regions that were on this server and assign them + List toAssignRegions = new ArrayList(); if (hris != null) { - List toAssignRegions = new ArrayList(); for (Map.Entry e: hris.entrySet()) { RegionState rit = this.services.getAssignmentManager().isRegionInTransition(e.getKey()); if (processDeadRegion(e.getKey(), e.getValue(), @@ -294,19 +299,19 @@ this.server.getCatalogTracker())) { ServerName addressFromAM = this.services.getAssignmentManager() .getRegionServerOfRegion(e.getKey()); - if (rit != null && !rit.isClosing() && !rit.isPendingClose() && !rit.isSplitting()) { + if (rit != null && !rit.isClosing() && !rit.isPendingClose() && !rit.isSplitting() + && !regionsFromRegionPlansForServer.contains(rit.getRegion())) { // Skip regions that were in transition unless CLOSING or - // PENDING_CLOSE + // PENDING_CLOSE or splitting LOG.info("Skip assigning region " + rit.toString()); - } else if (addressFromAM != null - && !addressFromAM.equals(this.serverName)) { - LOG.debug("Skip assigning region " - + e.getKey().getRegionNameAsString() - + " because it has been opened in " - + addressFromAM.getServerName()); - } else { - toAssignRegions.add(e.getKey()); - } + } else if (addressFromAM != null && !addressFromAM.equals(this.serverName)) { + LOG.debug("Skip assigning region " + e.getKey().getRegionNameAsString() + + " because it has been opened in " + addressFromAM.getServerName()); + regionsFromRegionPlansForServer.remove(e.getKey()); + } else { + toAssignRegions.add(e.getKey()); + regionsFromRegionPlansForServer.remove(e.getKey()); + } } else if (rit != null && (rit.isSplitting() || rit.isSplit())) { // This will happen when the RS went down and the call back for the SPLIITING or SPLIT // has not yet happened for node Deleted event. In that case if the region was actually @@ -317,6 +322,7 @@ HRegionInfo region = rit.getRegion(); AssignmentManager am = this.services.getAssignmentManager(); am.regionOffline(region); + regionsFromRegionPlansForServer.remove(e.getKey()); } // If the table was partially disabled and the RS went down, we should clear the RIT // and remove the node for the region. @@ -333,15 +339,35 @@ am.regionOffline(hri); // To avoid region assignment if table is in disabling or disabled state. toAssignRegions.remove(hri); + regionsFromRegionPlansForServer.remove(hri); } } - // Get all available servers - List availableServers = services.getServerManager() - .createDestinationServersList(); - this.services.getAssignmentManager().assign(toAssignRegions, - availableServers); } + int reassignedRegions = 0; + // Get all available servers + for (HRegionInfo hri : regionsFromRegionPlansForServer) { + if (!this.services.getAssignmentManager().isRegionOnline(hri)) { + if (!toAssignRegions.contains(hri)) { + toAssignRegions.add(hri); + } + } + } + List availableServers = services.getServerManager() + .createDestinationServersList(); + this.services.getAssignmentManager().assign(toAssignRegions, + availableServers); + regionsFromRegionPlansForServer.clear(); + LOG.info(reassignedRegions + " regions which were planned to open on " + + this.serverName + " have been re-assigned."); } finally { + if (regionsFromRegionPlansForServer != null) { + regionsFromRegionPlansForServer.clear(); + } + if (regionsInTransition != null) { + regionsInTransition.clear(); + } + this.services.getAssignmentManager().getDeadServerRegionsFromRegionPlan() + .remove(this.serverName); this.deadServers.finish(serverName); } LOG.info("Finished processing of shutdown of " + serverName); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java (revision 1345623) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java (working copy) @@ -36,6 +36,10 @@ */ @InterfaceAudience.Private public class RegionPlan implements Comparable { + // the following singleton signifies that the plan is not usable + static final RegionPlan UNUSABLE_PLAN = new RegionPlan(null, null, null); + // the following singleton signifies that there is no region server to assign region + static final RegionPlan NO_SERVERS_TO_ASSIGN = new RegionPlan(null, null, null); private final HRegionInfo hri; private final ServerName source; private ServerName dest; @@ -116,6 +120,7 @@ @Override public String toString() { + if (this == UNUSABLE_PLAN || this == NO_SERVERS_TO_ASSIGN) return ""; return "hri=" + this.hri.getRegionNameAsString() + ", src=" + (this.source == null? "": this.source.toString()) + ", dest=" + (this.dest == null? "": this.dest.toString()); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (revision 1345623) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (working copy) @@ -25,8 +25,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.DeserializationException; @@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; @@ -49,6 +52,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionsOnDeadServer; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State; import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; @@ -398,7 +402,7 @@ AssignmentManager am = new AssignmentManager(this.server, this.serverManager, ct, balancer, executor, null); try { - processServerShutdownHandler(ct, am, false); + processServerShutdownHandler(ct, am, false, null); } finally { executor.shutdown(); am.shutdown(); @@ -464,7 +468,7 @@ ZKUtil.createAndWatch(this.watcher, node, data.toByteArray()); try { - processServerShutdownHandler(ct, am, regionSplitDone); + processServerShutdownHandler(ct, am, regionSplitDone, null); // check znode deleted or not. // In both cases the znode should be deleted. @@ -517,7 +521,7 @@ // create znode in M_ZK_REGION_CLOSING state. ZKUtil.createAndWatch(this.watcher, node, data.toByteArray()); try { - processServerShutdownHandler(ct, am, false); + processServerShutdownHandler(ct, am, false, null); // check znode deleted or not. // In both cases the znode should be deleted. assertTrue("The znode should be deleted.", ZKUtil.checkExists(this.watcher, node) == -1); @@ -536,8 +540,116 @@ ZKAssign.deleteAllNodes(this.watcher); } } + + /** + * + * Test verifies if region assignment through balancer is skipped when SSH processing a region. + * See HBASE-5816. + */ + @Test + public void testAssignThroughBalancerOrFailedOpenAndSSHInParallel() throws IOException, + KeeperException, ServiceException, InterruptedException { + try { + this.server.getConfiguration().setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + MockedLoadBalancer.class, LoadBalancer.class); + CatalogTracker ct = Mockito.mock(CatalogTracker.class); + AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(this.server, + this.serverManager); + // Boolean variable used for waiting until randomAssignment is called and + // new + // plan is generated. + AtomicBoolean gate = new AtomicBoolean(false); + if (balancer instanceof MockedLoadBalancer) { + ((MockedLoadBalancer) balancer).setGateVariable(gate); + } + ZKAssign.deleteAllNodes(this.watcher); + ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_A); + int v = ZKAssign.getVersion(this.watcher, REGIONINFO); + ZKAssign.transitionNode(this.watcher, REGIONINFO, SERVERNAME_A, + EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_FAILED_OPEN, v); + String path = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName()); + RegionState state = new RegionState(REGIONINFO, State.OPENING, System.currentTimeMillis(), + SERVERNAME_B); + am.regionsInTransition.put(REGIONINFO.getEncodedName(), state); + // a dummy plan inserted into the regionPlans. This plan is cleared and + // new one is formed + am.regionPlans.put(REGIONINFO.getEncodedName(), + new RegionPlan(REGIONINFO, null, SERVERNAME_B)); + List serverList = new ArrayList(2); + serverList.add(SERVERNAME_B); + Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(serverList); + // In SSH we populate regions from region plans of dead server. See HBASE-5396 + addRegionToDeadServerRegions(am); + am.nodeDataChanged(path); + am.regionPlans.put(REGIONINFO.getEncodedName(), + new RegionPlan(REGIONINFO, null, SERVERNAME_B)); + processServerShutdownHandler(ct, am, false, SERVERNAME_B); + + // new region plan may take some time to get updated after random + // assignment is called and gate is set to true. + RegionPlan newRegionPlan = am.regionPlans.get(REGIONINFO.getEncodedName()); + while (newRegionPlan == null) { + Thread.sleep(10); + newRegionPlan = am.regionPlans.get(REGIONINFO.getEncodedName()); + } + // The new region plan should not be used because region assignment is in progress in SSH. + assertTrue("Assign should be invoked.", am.assignInvoked); + + } finally { + this.server.getConfiguration().setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + DefaultLoadBalancer.class, LoadBalancer.class); + // Clean up all znodes + ZKAssign.deleteAllNodes(this.watcher); + + } + } + + private void addRegionToDeadServerRegions(AssignmentManagerWithExtrasForTesting am) { + Set regionsSet = new HashSet(1); + regionsSet.add(REGIONINFO); + RegionsOnDeadServer rds = new RegionsOnDeadServer(); + rds.setRegionsFromRegionPlansForServer(regionsSet); + am.getDeadServerRegionsFromRegionPlan().put(SERVERNAME_A, rds); + } + + + /** + * When region in transition if region server opening the region gone down then region assignment + * taking long time(Waiting for timeout monitor to trigger assign). HBASE-5396(HBASE-6060) fixes this + * scenario. This test case verifies whether SSH calling assign for the region in transition or not. + * + * @throws KeeperException + * @throws IOException + * @throws ServiceException + */ + @Test + public void testSSHWhenSourceRSandDestRSInRegionPlanGoneDown() throws KeeperException, IOException, + ServiceException { + // We need a mocked catalog tracker. + CatalogTracker ct = Mockito.mock(CatalogTracker.class); + // Create an AM. + AssignmentManagerWithExtrasForTesting am = + setUpMockedAssignmentManager(this.server, this.serverManager); + // adding region in pending open. + am.regionsInTransition.put(REGIONINFO.getEncodedName(), new RegionState(REGIONINFO, + State.PENDING_OPEN)); + // adding region plan + am.regionPlans.put(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, SERVERNAME_A, SERVERNAME_B)); + am.getZKTable().setEnabledTable(REGIONINFO.getTableNameAsString()); + + try { + processServerShutdownHandler(ct, am, false, SERVERNAME_A); + processServerShutdownHandler(ct, am, false, SERVERNAME_B); + assertTrue("Assign should be invoked.", am.assignInvoked); + } finally { + am.regionsInTransition.remove(REGIONINFO.getEncodedName()); + am.regionPlans.remove(REGIONINFO.getEncodedName()); + } + + } - private void processServerShutdownHandler(CatalogTracker ct, AssignmentManager am, boolean splitRegion) + private void processServerShutdownHandler(CatalogTracker ct, + AssignmentManager am, boolean splitRegion, ServerName sn) throws IOException, ServiceException { // Make sure our new AM gets callbacks; once registered, can't unregister. // Thats ok because we make a new zk watcher for each test. @@ -549,10 +661,19 @@ // Get a meta row result that has region up on SERVERNAME_A Result r = null; - if (splitRegion) { - r = Mocking.getMetaTableRowResultAsSplitRegion(REGIONINFO, SERVERNAME_A); + if (sn == null) { + if (splitRegion) { + r = Mocking + .getMetaTableRowResultAsSplitRegion(REGIONINFO, SERVERNAME_A); + } else { + r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + } } else { - r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + if (sn.equals(SERVERNAME_A)) { + r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + } else if (sn.equals(SERVERNAME_B)) { + r = new Result(new KeyValue[0]); + } } ScanResponse.Builder builder = ScanResponse.newBuilder();