Index: src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java (revision 1444158) +++ src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java (working copy) @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.fs.HFileSystem; @@ -91,11 +92,6 @@ } @Override - public ConcurrentSkipListMap getRegionsInTransitionInRS() { - return rit; - } - - @Override public FlushRequester getFlushRequester() { return null; } @@ -162,4 +158,16 @@ public Leases getLeases() { return null; } + + @Override + public boolean removeFromRegionsInTransition(HRegionInfo hri) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean containsKeyInRegionsInTransition(HRegionInfo hri) { + // TODO Auto-generated method stub + return false; + } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision 1444158) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (working copy) @@ -19,16 +19,26 @@ */ package org.apache.hadoop.hbase.regionserver.handler; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.RegionTransitionData; -import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MockRegionServerServices; @@ -57,13 +67,20 @@ private int testIndex = 0; @BeforeClass public static void before() throws Exception { - HTU.startMiniZKCluster(); + Configuration c = HTU.getConfiguration(); + c.setClass(HConstants.REGION_SERVER_IMPL, TestOpenRegionHandlerRegionServer.class, + HRegionServer.class); + HTU.startMiniCluster(); TEST_HTD = new HTableDescriptor("TestOpenRegionHandler.java"); } @AfterClass public static void after() throws IOException { TEST_HTD = null; - HTU.shutdownMiniZKCluster(); + try { + HTU.shutdownMiniCluster(); + } catch (Exception e) { + throw new IOException(e); + } } /** @@ -135,6 +152,7 @@ // Create it OFFLINE, which is what it expects ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName()); + ZKAssign.transitionNodeOpening(server.getZooKeeper(), TEST_HRI, server.getServerName()); // Create the handler OpenRegionHandler handler = @@ -160,7 +178,7 @@ // Create it OFFLINE, which is what it expects ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName()); - + ZKAssign.transitionNodeOpening(server.getZooKeeper(), TEST_HRI, server.getServerName()); // Create the handler OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) { @@ -178,14 +196,27 @@ assertEquals(EventType.RS_ZK_REGION_FAILED_OPEN, data.getEventType()); } + public static class TestOpenRegionHandlerRegionServer extends HRegionServer { + public TestOpenRegionHandlerRegionServer(Configuration conf) + throws IOException, InterruptedException { + super(conf); + } + @Override + public void addRegionsInTransition(HRegionInfo region, + String currentAction) throws RegionAlreadyInTransitionException { + super.addRegionsInTransition(region, currentAction); + } + } + @Test public void testTransitionToFailedOpenEvenIfCleanupFails() throws Exception { - Server server = new MockServer(HTU); - RegionServerServices rsServices = new MockRegionServerServices(); + MiniHBaseCluster cluster = HTU.getHBaseCluster(); + HRegionServer server = + cluster.getLiveRegionServerThreads().get(0).getRegionServer(); // Create it OFFLINE, which is what it expects ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName()); // Create the handler - OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) { + OpenRegionHandler handler = new OpenRegionHandler(server, server, TEST_HRI, TEST_HTD) { @Override boolean updateMeta(HRegion r) { return false; @@ -196,7 +227,7 @@ throw new IOException("FileSystem got closed."); } }; - rsServices.getRegionsInTransitionInRS().put(TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE); + ((TestOpenRegionHandlerRegionServer)server).addRegionsInTransition(TEST_HRI, "OPEN"); try { handler.process(); } catch (Exception e) { Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java (revision 1444158) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java (working copy) @@ -204,19 +204,19 @@ HTableDescriptor htd, HRegionInfo hri) throws IOException, NodeExistsException, KeeperException { // Create it OFFLINE node, which is what Master set before sending OPEN RPC - ZKAssign.createNodeOffline(server.getZooKeeper(), hri, - server.getServerName()); - OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, - htd); - openHandler.process(); - RegionTransitionData data = - ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()); - - // delete the node, which is what Master do after the region is opened - ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), - EventType.RS_ZK_REGION_OPENED); - } + + ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName()); + int version = ZKAssign.transitionNodeOpening(server.getZooKeeper(), hri, server.getServerName()); + OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd, version); + openHandler.process(); + RegionTransitionData data = ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName()); + + // delete the node, which is what Master do after the region is opened + ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), + EventType.RS_ZK_REGION_OPENED); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (revision 1444158) +++ src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.handler.TotesHRegionInfo; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; @@ -69,6 +70,8 @@ @BeforeClass public static void beforeAllTests() throws Exception { Configuration c = TEST_UTIL.getConfiguration(); + c.setClass(HConstants.REGION_SERVER_IMPL, TestZKBasedOpenCloseRegionRegionServer.class, + HRegionServer.class); c.setBoolean("dfs.support.append", true); c.setInt("hbase.regionserver.info.port", 0); TEST_UTIL.startMiniCluster(2); @@ -95,6 +98,22 @@ } /** + * Special HRegionServer used in these tests that allows access to + * {@link #addRegionsInTransition(HRegionInfo, String)}. + */ + public static class TestZKBasedOpenCloseRegionRegionServer extends HRegionServer { + public TestZKBasedOpenCloseRegionRegionServer(Configuration conf) + throws IOException, InterruptedException { + super(conf); + } + @Override + public void addRegionsInTransition(HRegionInfo region, + String currentAction) throws RegionAlreadyInTransitionException { + super.addRegionsInTransition(region, currentAction); + } + } + + /** * Test we reopen a region once closed. * @throws Exception */ @@ -244,8 +263,10 @@ cluster.getLiveRegionServerThreads().get(1).getRegionServer(); HRegionInfo hri = getNonMetaRegion(hr0.getOnlineRegions()); - // fake that hr1 is processing the region - hr1.getRegionsInTransitionInRS().putIfAbsent(hri.getEncodedNameAsBytes(), true); + // Fake that hr1 is processing the region. At top of this test we made a + // regionserver that gave access addRegionsInTransition. Need to cast as + // TestZKBasedOpenCloseRegionRegionServer. + ((TestZKBasedOpenCloseRegionRegionServer) hr1).addRegionsInTransition(hri, "OPEN"); AtomicBoolean reopenEventProcessed = new AtomicBoolean(false); EventHandlerListener openListener = @@ -262,7 +283,7 @@ assertEquals(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()), null); // remove the block and reset the boolean - hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes()); + hr1.removeFromRegionsInTransition(hri); reopenEventProcessed.set(false); // now try moving a region when there is no region in transition. @@ -333,7 +354,7 @@ } Whitebox.setInternalState(regionServer, "tableDescriptors", orizinalState); assertFalse("Region should not be in RIT", - regionServer.getRegionsInTransitionInRS().containsKey(REGIONINFO.getEncodedNameAsBytes())); + regionServer.containsKeyInRegionsInTransition(REGIONINFO)); } private static void waitUntilAllRegionsAssigned() Index: src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (revision 1444158) +++ src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (working copy) @@ -184,11 +184,17 @@ int versionid = ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); assertNotSame(versionid, -1); - Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName()); + Mocking.waitForRegionOfflineInRIT(am, REGIONINFO.getEncodedName()); - // Get current versionid else will fail on transition from OFFLINE to + // Get the OFFLINE version id. May have to wait some for it to happen. // OPENING below - versionid = ZKAssign.getVersion(this.watcher, REGIONINFO); + while (true) { + int vid = ZKAssign.getVersion(this.watcher, REGIONINFO); + if (vid != versionid) { + versionid = vid; + break; + } + } assertNotSame(-1, versionid); // This uglyness below is what the openregionhandler on RS side does. versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO, @@ -226,11 +232,17 @@ ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); assertNotSame(versionid, -1); am.gate.set(false); - Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName()); + Mocking.waitForRegionOfflineInRIT(am, REGIONINFO.getEncodedName()); // Get current versionid else will fail on transition from OFFLINE to // OPENING below - versionid = ZKAssign.getVersion(this.watcher, REGIONINFO); + while (true) { + int vid = ZKAssign.getVersion(this.watcher, REGIONINFO); + if (vid != versionid) { + versionid = vid; + break; + } + } assertNotSame(-1, versionid); // This uglyness below is what the openregionhandler on RS side does. versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO, @@ -267,12 +279,18 @@ int versionid = ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); assertNotSame(versionid, -1); - Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName()); + Mocking.waitForRegionOfflineInRIT(am, REGIONINFO.getEncodedName()); am.gate.set(false); // Get current versionid else will fail on transition from OFFLINE to // OPENING below - versionid = ZKAssign.getVersion(this.watcher, REGIONINFO); + while (true) { + int vid = ZKAssign.getVersion(this.watcher, REGIONINFO); + if (vid != versionid) { + versionid = vid; + break; + } + } assertNotSame(-1, versionid); // This uglyness below is what the openregionhandler on RS side does. versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO, @@ -347,9 +365,15 @@ // balancer. The zk node will be OFFLINE waiting for regionserver to // transition it through OPENING, OPENED. Wait till we see the RIT // before we proceed. - Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName()); + Mocking.waitForRegionOfflineInRIT(am, REGIONINFO.getEncodedName()); // Get current versionid else will fail on transition from OFFLINE to OPENING below - versionid = ZKAssign.getVersion(this.watcher, REGIONINFO); + while (true) { + int vid = ZKAssign.getVersion(this.watcher, REGIONINFO); + if (vid != versionid) { + versionid = vid; + break; + } + } assertNotSame(-1, versionid); // This uglyness below is what the openregionhandler on RS side does. versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO, @@ -389,7 +413,7 @@ AssignmentManager am = new AssignmentManager(this.server, this.serverManager, ct, balancer, executor); try { - processServerShutdownHandler(ct, am, false); + processServerShutdownHandler(ct, am, false, null); } finally { executor.shutdown(); am.shutdown(); @@ -452,8 +476,7 @@ ZKUtil.createAndWatch(this.watcher, node, data.getBytes()); try { - - processServerShutdownHandler(ct, am, regionSplitDone); + processServerShutdownHandler(ct, am, regionSplitDone, null); // check znode deleted or not. // In both cases the znode should be deleted. @@ -492,7 +515,7 @@ am.regionOnline(REGIONINFO, SERVERNAME_A); // adding region in pending close. am.regionsInTransition.put(REGIONINFO.getEncodedName(), new RegionState(REGIONINFO, - State.PENDING_CLOSE)); + State.PENDING_CLOSE, System.currentTimeMillis(), SERVERNAME_A)); if (state == TableState.DISABLING) { am.getZKTable().setDisablingTable(REGIONINFO.getTableNameAsString()); @@ -507,7 +530,7 @@ ZKUtil.createAndWatch(this.watcher, node, data.getBytes()); try { - processServerShutdownHandler(ct, am, false); + processServerShutdownHandler(ct, am, false, null); // check znode deleted or not. // In both cases the znode should be deleted. assertTrue("The znode should be deleted.",ZKUtil.checkExists(this.watcher, node) == -1); @@ -525,7 +548,8 @@ } } - private void processServerShutdownHandler(CatalogTracker ct, AssignmentManager am, boolean splitRegion) + private void processServerShutdownHandler(CatalogTracker ct, AssignmentManager am, + boolean splitRegion, ServerName sn) throws IOException { // Make sure our new AM gets callbacks; once registered, can't unregister. // Thats ok because we make a new zk watcher for each test. @@ -534,12 +558,22 @@ // Make an RS Interface implementation. Make it so a scanner can go against it. HRegionInterface implementation = Mockito.mock(HRegionInterface.class); // Get a meta row result that has region up on SERVERNAME_A + Result r = null; - if (splitRegion) { - r = getMetaTableRowResultAsSplitRegion(REGIONINFO, SERVERNAME_A); + if (sn == null) { + if (splitRegion) { + r = getMetaTableRowResultAsSplitRegion(REGIONINFO, SERVERNAME_A); + } else { + r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + } } else { - r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + if (sn.equals(SERVERNAME_A)) { + r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + } else if (sn.equals(SERVERNAME_B)) { + r = new Result(new KeyValue[0]); + } } + Mockito.when(implementation.openScanner((byte [])Mockito.any(), (Scan)Mockito.any())). thenReturn(System.currentTimeMillis()); // Return a good result first and then return null to indicate end of scan @@ -563,8 +597,13 @@ // I need a services instance that will return the AM MasterServices services = Mockito.mock(MasterServices.class); Mockito.when(services.getAssignmentManager()).thenReturn(am); - ServerShutdownHandler handler = new ServerShutdownHandler(this.server, - services, deadServers, SERVERNAME_A, false); + Mockito.when(services.getZooKeeper()).thenReturn(this.watcher); + ServerShutdownHandler handler = null; + if (sn != null) { + handler = new ServerShutdownHandler(this.server, services, deadServers, sn, false); + } else { + handler = new ServerShutdownHandler(this.server, services, deadServers, SERVERNAME_A, false); + } handler.process(); // The region in r will have been assigned. It'll be up in zk as unassigned. } @@ -850,7 +889,7 @@ assertNotSame("Same region plan should not come", regionPlan, newRegionPlan); assertTrue("Destnation servers should be different.", !(regionPlan.getDestination().equals( newRegionPlan.getDestination()))); - Mocking.waitForRegionPendingOpenInRIT(am, REGIONINFO.getEncodedName()); + Mocking.waitForRegionOfflineInRIT(am, REGIONINFO.getEncodedName()); } finally { this.server.getConfiguration().setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, DefaultLoadBalancer.class, LoadBalancer.class); @@ -950,7 +989,59 @@ } } + + /** + * When region in transition if region server opening the region gone down then region assignment + * taking long time(Waiting for timeout monitor to trigger assign). HBASE-5396(HBASE-6060) fixes this + * scenario. This test case verifies whether SSH calling assign for the region in transition or not. + * + * @throws KeeperException + * @throws IOException + * @throws ServiceException + */ + @Test + public void testSSHWhenSourceRSandDestRSInRegionPlanGoneDown() throws KeeperException, IOException, + ServiceException { + testSSHWhenSourceRSandDestRSInRegionPlanGoneDown(true); + testSSHWhenSourceRSandDestRSInRegionPlanGoneDown(false); + } + + private void testSSHWhenSourceRSandDestRSInRegionPlanGoneDown(boolean regionInOffline) + throws IOException, KeeperException, ServiceException { + // We need a mocked catalog tracker. + CatalogTracker ct = Mockito.mock(CatalogTracker.class); + // Create an AM. + AssignmentManagerWithExtrasForTesting am = + setUpMockedAssignmentManager(this.server, this.serverManager); + // adding region in pending open. + if (regionInOffline) { + ServerName MASTER_SERVERNAME = new ServerName("example.org", 1111, 1111); + am.regionsInTransition.put(REGIONINFO.getEncodedName(), new RegionState(REGIONINFO, + State.OFFLINE, System.currentTimeMillis(), MASTER_SERVERNAME)); + } else { + am.regionsInTransition.put(REGIONINFO.getEncodedName(), new RegionState(REGIONINFO, + State.OPENING, System.currentTimeMillis(), SERVERNAME_B)); + } + // adding region plan + am.regionPlans.put(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, SERVERNAME_A, SERVERNAME_B)); + am.getZKTable().setEnabledTable(REGIONINFO.getTableNameAsString()); + + try { + processServerShutdownHandler(ct, am, false, SERVERNAME_A); + processServerShutdownHandler(ct, am, false, SERVERNAME_B); + if(regionInOffline){ + assertFalse("Assign should not be invoked.", am.assignInvoked); + } else { + assertTrue("Assign should be invoked.", am.assignInvoked); + } + } finally { + am.regionsInTransition.remove(REGIONINFO.getEncodedName()); + am.regionPlans.remove(REGIONINFO.getEncodedName()); + } + } + + /** * Mocked load balancer class used in the testcase to make sure that the testcase waits until * random assignment is called and the gate variable is set to true. */ Index: src/test/java/org/apache/hadoop/hbase/master/Mocking.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/Mocking.java (revision 1444158) +++ src/test/java/org/apache/hadoop/hbase/master/Mocking.java (working copy) @@ -43,4 +43,17 @@ } } } + + static void waitForRegionOfflineInRIT(AssignmentManager am, String encodedName) + throws InterruptedException { + boolean wait = true; + while (wait) { + RegionState state = am.getRegionsInTransition().get(encodedName); + if (state != null && state.isOffline()) { + wait = false; + } else { + Thread.sleep(1); + } + } + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1444158) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -52,9 +52,8 @@ // the total open. We'll fail the open if someone hijacks our znode; we can // tell this has happened if version is not as expected. private volatile int version = -1; - //version of the offline node that was set by the master - private volatile int versionOfOfflineNode = -1; + public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, HTableDescriptor htd) { @@ -62,20 +61,20 @@ } public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - HTableDescriptor htd, int versionOfOfflineNode) { + HTableDescriptor htd, int version) { this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, - versionOfOfflineNode); + version); } protected OpenRegionHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, final HTableDescriptor htd, EventType eventType, - final int versionOfOfflineNode) { + final int version) { super(server, eventType); this.rsServices = rsServices; this.regionInfo = regionInfo; this.htd = htd; - this.versionOfOfflineNode = versionOfOfflineNode; + this.version = version; } public HRegionInfo getRegionInfo() { @@ -96,15 +95,6 @@ // Check that this region is not already online HRegion region = this.rsServices.getFromOnlineRegions(encodedName); - // If fails, just return. Someone stole the region from under us. - // Calling transitionZookeeperOfflineToOpening initalizes this.version. - if (!transitionZookeeperOfflineToOpening(encodedName, - versionOfOfflineNode)) { - LOG.warn("Region was hijacked? It no longer exists, encodedName=" + - encodedName); - return; - } - // Open region. After a successful open, failures in subsequent // processing needs to do a close as part of cleanup. region = openRegion(); @@ -144,8 +134,7 @@ LOG.debug("Opened " + name + " on server:" + this.server.getServerName()); } finally { - this.rsServices.getRegionsInTransitionInRS(). - remove(this.regionInfo.getEncodedNameAsBytes()); + this.rsServices.removeFromRegionsInTransition(this.regionInfo); if (!openSuccessful && !transitionToFailedOpen) { tryTransitionToFailedOpen(regionInfo); } @@ -370,33 +359,6 @@ if (region != null) region.close(); } - /** - * Transition ZK node from OFFLINE to OPENING. - * @param encodedName Name of the znode file (Region encodedName is the znode - * name). - * @param versionOfOfflineNode - version Of OfflineNode that needs to be compared - * before changing the node's state from OFFLINE - * @return True if successful transition. - */ - boolean transitionZookeeperOfflineToOpening(final String encodedName, - int versionOfOfflineNode) { - // TODO: should also handle transition from CLOSED? - try { - // Initialize the znode version. - this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo, - server.getServerName(), EventType.M_ZK_REGION_OFFLINE, - EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode); - } catch (KeeperException e) { - LOG.error("Error transition from OFFLINE to OPENING for region=" + - encodedName, e); - } - boolean b = isGoodVersion(); - if (!b) { - LOG.warn("Failed transition from OFFLINE to OPENING for region=" + - encodedName); - } - return b; - } /** * Update our OPENING state in zookeeper. Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision 1444158) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (working copy) @@ -148,8 +148,7 @@ // Done! Region is closed on this RS LOG.debug("Closed region " + region.getRegionNameAsString()); } finally { - this.rsServices.getRegionsInTransitionInRS(). - remove(this.regionInfo.getEncodedNameAsBytes()); + this.rsServices.removeFromRegionsInTransition(this.regionInfo); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (revision 1444158) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (working copy) @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -75,10 +76,18 @@ public RpcServer getRpcServer(); /** - * Get the regions that are currently being opened or closed in the RS - * @return map of regions in transition in this RS + * Remove passed hri from the internal list of regions in transition on this + * regionserver. + * @param hri Region to remove. + * @return True if removed */ - public Map getRegionsInTransitionInRS(); + public boolean removeFromRegionsInTransition(HRegionInfo hri); + /** + * @param hri + * @return True if the internal list of regions in transition includes the + * passed hri. + */ + public boolean containsKeyInRegionsInTransition(HRegionInfo hri); /** * @return Return the FileSystem object used by the regionserver Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1444158) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -157,6 +158,7 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -2810,7 +2812,6 @@ private RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode, Map htds) throws IOException { checkOpen(); - checkIfRegionInTransition(region, OPEN); HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName()); if (null != onlineRegion) { // See HBASE-5094. Cross check with META if still this RS is owning the @@ -2827,47 +2828,104 @@ this.removeFromOnlineRegions(region.getEncodedName()); } } - LOG.info("Received request to open region: " + - region.getRegionNameAsString()); - HTableDescriptor htd = null; - if (htds == null) { - htd = this.tableDescriptors.get(region.getTableName()); - } else { - htd = htds.get(region.getTableNameAsString()); - if (htd == null) { + // Added to in-memory RS RIT that we are trying to open this region. + // Clear it if we fail queuing an open executor. + addRegionsInTransition(region, OPEN); + try { + LOG.info("Received request to open region: " + + region.getRegionNameAsString()); + HTableDescriptor htd = null; + if (htds == null) { htd = this.tableDescriptors.get(region.getTableName()); - htds.put(region.getTableNameAsString(), htd); + } else { + htd = htds.get(region.getTableNameAsString()); + if (htd == null) { + htd = this.tableDescriptors.get(region.getTableName()); + htds.put(region.getTableNameAsString(), htd); + } } + + // Mark the region as OPENING up in zk. This is how we tell the master control of the + // region has passed to this regionserver. + int version = transitionZookeeperOfflineToOpening(region, versionOfOfflineNode); + // Need to pass the expected version in the constructor. + if (region.isRootRegion()) { + this.service.submit(new OpenRootHandler(this, this, region, htd, version)); + } else if (region.isMetaRegion()) { + this.service.submit(new OpenMetaHandler(this, this, region, htd, version)); + } else { + this.service.submit(new OpenRegionHandler(this, this, region, htd, version)); + } + } catch (IOException ie) { + // Clear from this server's RIT list else will stick around for ever. + removeFromRegionsInTransition(region); + throw ie; } - this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), - true); - // Need to pass the expected version in the constructor. - if (region.isRootRegion()) { - this.service.submit(new OpenRootHandler(this, this, region, htd, - versionOfOfflineNode)); - } else if (region.isMetaRegion()) { - this.service.submit(new OpenMetaHandler(this, this, region, htd, - versionOfOfflineNode)); - } else { - this.service.submit(new OpenRegionHandler(this, this, region, htd, - versionOfOfflineNode)); - } return RegionOpeningState.OPENED; } - private void checkIfRegionInTransition(HRegionInfo region, - String currentAction) throws RegionAlreadyInTransitionException { - byte[] encodedName = region.getEncodedNameAsBytes(); - if (this.regionsInTransitionInRS.containsKey(encodedName)) { - boolean openAction = this.regionsInTransitionInRS.get(encodedName); - // The below exception message will be used in master. - throw new RegionAlreadyInTransitionException("Received:" + currentAction + - " for the region:" + region.getRegionNameAsString() + - " ,which we are already trying to " + - (openAction ? OPEN : CLOSE)+ "."); + /** + * Transition ZK node from OFFLINE to OPENING. The master will get a callback + * and will know that the region is now ours. + * + * @param hri + * HRegionInfo whose znode we are updating + * @param versionOfOfflineNode + * Version Of OfflineNode that needs to be compared before changing + * the node's state from OFFLINE + * @throws IOException + */ + int transitionZookeeperOfflineToOpening(final HRegionInfo hri, int versionOfOfflineNode) + throws IOException { + // TODO: should also handle transition from CLOSED? + int version = -1; + try { + // Initialize the znode version. + version = ZKAssign.transitionNode(this.zooKeeper, hri, this.getServerName(), + EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode); + } catch (KeeperException e) { + LOG.error("Error transition from OFFLINE to OPENING for region=" + hri.getEncodedName(), e); } + if (version == -1) { + // TODO: Fix this sloppyness. The exception should be coming off zk + // directly, not an + // intepretation at this high-level (-1 when we call transitionNode can + // mean many things). + throw new IOException("Failed transition from OFFLINE to OPENING for region=" + + hri.getEncodedName()); + } + return version; } + /** + * String currentAction) throws RegionAlreadyInTransitionException { Add + * region to this regionservers list of in transitions regions ONLY if its not + * already byte[] encodedName = region.getEncodedNameAsBytes(); in transition. + * If a region already in RIT, we throw + * {@link RegionAlreadyInTransitionException}. if + * (this.regionsInTransitionInRS.containsKey(encodedName)) { Callers need to + * call {@link #removeFromRegionsInTransition(HRegionInfo)} when done or if + * boolean openAction = this.regionsInTransitionInRS.get(encodedName); error + * processing. + * + * @param region + * Region to add + * @param currentAction + * Whether OPEN or CLOSE. + * @throws RegionAlreadyInTransitionException + */ + protected void addRegionsInTransition(final HRegionInfo region, final String currentAction) + throws RegionAlreadyInTransitionException { + Boolean action = this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), + currentAction.equals(OPEN)); + if (action != null) { + // The below exception message will be used in master. + throw new RegionAlreadyInTransitionException("Received:" + currentAction + " for the region:" + + region.getRegionNameAsString() + " for the region:" + region.getRegionNameAsString() + + ", which we are already trying to " + (action ? OPEN : CLOSE) + "."); + } + } + @Override @QosPriority(priority=HConstants.HIGH_QOS) public void openRegions(List regions) @@ -2919,7 +2977,6 @@ throw new NotServingRegionException("Received close for " + region.getRegionNameAsString() + " but we are not serving it"); } - checkIfRegionInTransition(region, CLOSE); return closeRegion(region, false, zk, versionOfClosingNode); } @@ -2944,7 +3001,7 @@ } - /** + /** * @param region Region to close * @param abort True if we are aborting * @param zk True if we are to update zk about the region close; if the close @@ -2967,25 +3024,29 @@ return false; } } - - if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) { - LOG.warn("Received close for region we are already opening or closing; " + - region.getEncodedName()); + try { + addRegionsInTransition(region, CLOSE); + } catch (RegionAlreadyInTransitionException rate) { + LOG.warn("Received close for region we are already opening or closing; " + + region.getEncodedName()); return false; } - this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false); - CloseRegionHandler crh = null; - if (region.isRootRegion()) { - crh = new CloseRootHandler(this, this, region, abort, zk, - versionOfClosingNode); - } else if (region.isMetaRegion()) { - crh = new CloseMetaHandler(this, this, region, abort, zk, - versionOfClosingNode); - } else { - crh = new CloseRegionHandler(this, this, region, abort, zk, - versionOfClosingNode); + boolean success = false; + try { + CloseRegionHandler crh = null; + if (region.isRootRegion()) { + crh = new CloseRootHandler(this, this, region, abort, zk, versionOfClosingNode); + } else if (region.isMetaRegion()) { + crh = new CloseMetaHandler(this, this, region, abort, zk, versionOfClosingNode); + } else { + crh = new CloseRegionHandler(this, this, region, abort, zk, versionOfClosingNode); + } + this.service.submit(crh); + success = true; + } finally { + // Remove from this server's RIT. + if (!success) removeFromRegionsInTransition(region); } - this.service.submit(crh); return true; } @@ -3672,9 +3733,14 @@ return this.rsHost; } + @Override + public boolean removeFromRegionsInTransition(final HRegionInfo hri) { + return this.regionsInTransitionInRS.remove(hri.getEncodedNameAsBytes()); + } - public ConcurrentSkipListMap getRegionsInTransitionInRS() { - return this.regionsInTransitionInRS; + @Override + public boolean containsKeyInRegionsInTransition(final HRegionInfo hri) { + return this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()); } public ExecutorService getExecutorService() { Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1444158) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -275,6 +275,17 @@ } /** + * Checks whether the region is assigned. + * @param hri HRegion for which this function returns the result + * @return True iff assigned. + */ + public boolean isRegionAssigned(HRegionInfo hri) { + synchronized (this.regions ) { + return regions.containsKey(hri); + } + } + + /** * Gives enabling table regions. * * @param tableName @@ -849,10 +860,10 @@ break; } if (regionState == null || - (!regionState.isPendingOpen() && !regionState.isOpening())) { + (!regionState.isOffline() && !regionState.isPendingOpen() && !regionState.isOpening())) { LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName + " from server " + data.getOrigin() + " but region was in " + - " the state " + regionState + " and not in PENDING_OPEN or OPENING"); + " the state " + regionState + " and not in OFFLINE, PENDING_OPEN or OPENING"); return; } // Handle this the same as if it were opened and then closed. @@ -874,16 +885,13 @@ failoverProcessedRegions.put(encodedName, hri); break; } - // Should see OPENING after we have asked it to OPEN or additional - // times after already being in state of OPENING if (regionState == null || - (!regionState.isPendingOpen() && !regionState.isOpening())) { - LOG.warn("Received OPENING for region " + - prettyPrintedRegionName + - " from server " + data.getOrigin() + " but region was in " + - " the state " + regionState + " and not " + - "in expected PENDING_OPEN or OPENING states"); - return; + (!regionState.isOffline() && !regionState.isPendingOpen() && + !regionState.isOpening())) { + LOG.warn("Received OPENING for region " + prettyPrintedRegionName + " from server " + + sn + " but region was in " + " the state " + regionState + " and not " + + "in expected OFFLINE, PENDING_OPEN or OPENING states"); + return; } // Transition to OPENING (or update stamp if already OPENING) regionState.update(RegionState.State.OPENING, @@ -903,12 +911,12 @@ } // Should see OPENED after OPENING but possible after PENDING_OPEN if (regionState == null || - (!regionState.isPendingOpen() && !regionState.isOpening())) { + (!regionState.isOffline() && !regionState.isPendingOpen() && !regionState.isOpening())) { LOG.warn("Received OPENED for region " + prettyPrintedRegionName + " from server " + data.getOrigin() + " but region was in " + " the state " + regionState + " and not " + - "in expected PENDING_OPEN or OPENING states"); + "in expected OFFLINE, PENDING_OPEN or OPENING states"); return; } // Handle OPENED by removing from transition and deleted zk node @@ -1525,7 +1533,7 @@ @Override public void processResult(int rc, String path, Object ctx, String name) { if (rc != 0) { - // Thisis resultcode. If non-zero, need to resubmit. + // This is resultcode. If non-zero, need to resubmit. LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " + "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2"); this.zkw.abort("Connectionloss writing unassigned at " + path + @@ -1684,14 +1692,17 @@ try { LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() + " to " + plan.getDestination().toString()); - // Transition RegionState to PENDING_OPEN - state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), - plan.getDestination()); - // Send OPEN RPC. This can fail if the server on other end is is not up. - // Pass the version that was obtained while setting the node to OFFLINE. - RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan - .getDestination(), state.getRegion(), versionOfOfflineNode); - if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { + RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan.getDestination(), + state.getRegion(), versionOfOfflineNode); + if (regionOpenState == RegionOpeningState.OPENED) { + // Transition RegionState to PENDING_OPEN + if (state.isOffline() && !state.isOpening()) { + state.update(RegionState.State.PENDING_OPEN, + System.currentTimeMillis(), plan.getDestination()); + } + if (state.isOpening()) return; + if (state.isOpened()) return; + } else if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { // Remove region from in-memory transition and unassigned node from ZK // While trying to enable the table the regions of the table were // already enabled. @@ -3191,12 +3202,14 @@ return matchAM; } + /** - * Process shutdown server removing any assignments. + * Start processing of shutdown server. * @param sn Server that went down. - * @return list of regions in transition on this server + * @return Pair that has a set of regions in transition TO the dead server and + * a list of regions that were in transition, and also ON this server. */ - public List processServerShutdown(final ServerName sn) { + public Pair, List> processServerShutdown(final ServerName sn) { // Clean out any existing assignment plans for this server synchronized (this.regionPlans) { for (Iterator > i = @@ -3213,30 +3226,36 @@ // TODO: Do we want to sync on RIT here? // Remove this server from map of servers to regions, and remove all regions // of this server from online map of regions. - Set deadRegions = null; - List rits = new ArrayList(); + Set deadRegions = new TreeSet(); synchronized (this.regions) { Set assignedRegions = this.servers.remove(sn); - if (assignedRegions == null || assignedRegions.isEmpty()) { - // No regions on this server, we are done, return empty list of RITs - return rits; + if (assignedRegions != null && !assignedRegions.isEmpty()) { + deadRegions.addAll(assignedRegions); + for (HRegionInfo region : deadRegions) { + this.regions.remove(region); + } } - deadRegions = new TreeSet(assignedRegions); - for (HRegionInfo region : deadRegions) { - this.regions.remove(region); - } } // See if any of the regions that were online on this server were in RIT // If they are, normal timeouts will deal with them appropriately so // let's skip a manual re-assignment. + Set ritsGoingToServer = new ConcurrentSkipListSet(); + List ritsOnServer = new ArrayList(); synchronized (regionsInTransition) { - for (RegionState region : this.regionsInTransition.values()) { - if (deadRegions.remove(region.getRegion())) { - rits.add(region); + for (RegionState state : this.regionsInTransition.values()) { + // If destination server in RegionState is same as dead server then add to regions to assign + // Skip the region in OFFLINE state because destionation server in RegionState is master + // server name. Skip the region if the destionation server in RegionState is other than dead + // server. + if ((state.getServerName() != null) && state.getServerName().equals(sn)) { + ritsGoingToServer.add(state.getRegion()); } + if (deadRegions.contains(state.getRegion())) { + ritsOnServer.add(state); + } } } - return rits; + return new Pair, List>(ritsGoingToServer, ritsOnServer); } /** @@ -3556,4 +3575,5 @@ this.master.abort(errorMsg, e); } } + } Index: src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (revision 1444158) +++ src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (working copy) @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.master.handler; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,6 +43,8 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.zookeeper.KeeperException; /** @@ -243,13 +247,6 @@ return; } - // Clean out anything in regions in transition. Being conservative and - // doing after log splitting. Could do some states before -- OPENING? - // OFFLINE? -- and then others after like CLOSING that depend on log - // splitting. - List regionsInTransition = - this.services.getAssignmentManager(). - processServerShutdown(this.serverName); // Wait on meta to come online; we need it to progress. // TODO: Best way to hold strictly here? We should build this retry logic @@ -282,71 +279,26 @@ } } - // Skip regions that were in transition unless CLOSING or PENDING_CLOSE - for (RegionState rit : regionsInTransition) { - if (!rit.isClosing() && !rit.isPendingClose() && !rit.isSplitting()) { - LOG.debug("Removed " + rit.getRegion().getRegionNameAsString() + - " from list of regions to assign because in RIT; region state: " + - rit.getState()); - if (hris != null) hris.remove(rit.getRegion()); - } - } + // Returns set of regions that had regionplans against the downed server and a list of + // the intersection of regions-in-transition and regions that were on the server that died. + Pair, List> p = this.services.getAssignmentManager() + .processServerShutdown(this.serverName); + Set ritsGoingToServer = p.getFirst(); + List ritsOnServer = p.getSecond(); - assert regionsInTransition != null; - LOG.info("Reassigning " + ((hris == null)? 0: hris.size()) + - " region(s) that " + (serverName == null? "null": serverName) + - " was carrying (skipping " + - regionsInTransition.size() + - " regions(s) that are already in transition)"); - - // Iterate regions that were on this server and assign them - if (hris != null) { - for (Map.Entry e: hris.entrySet()) { - RegionState rit = this.services.getAssignmentManager().isRegionInTransition(e.getKey()); - if (processDeadRegion(e.getKey(), e.getValue(), - this.services.getAssignmentManager(), - this.server.getCatalogTracker())) { - ServerName addressFromAM = this.services.getAssignmentManager() - .getRegionServerOfRegion(e.getKey()); - if (rit != null && !rit.isClosing() && !rit.isPendingClose() && !rit.isSplitting()) { - // Skip regions that were in transition unless CLOSING or - // PENDING_CLOSE - LOG.info("Skip assigning region " + rit.toString()); - } else if (addressFromAM != null - && !addressFromAM.equals(this.serverName)) { - LOG.debug("Skip assigning region " - + e.getKey().getRegionNameAsString() - + " because it has been opened in " - + addressFromAM.getServerName()); - } else { - this.services.getAssignmentManager().assign(e.getKey(), true); - } - } else if (rit != null && (rit.isSplitting() || rit.isSplit())) { - // This will happen when the RS went down and the call back for the SPLIITING or SPLIT - // has not yet happened for node Deleted event. In that case if the region was actually split - // but the RS had gone down before completing the split process then will not try to - // assign the parent region again. In that case we should make the region offline and - // also delete the region from RIT. - HRegionInfo region = rit.getRegion(); - AssignmentManager am = this.services.getAssignmentManager(); - am.regionOffline(region); + List regionsToAssign = getRegionsToAssign(hris, ritsOnServer, ritsGoingToServer); + for (HRegionInfo hri : ritsGoingToServer) { + if (!this.services.getAssignmentManager().isRegionAssigned(hri)) { + if (!regionsToAssign.contains(hri)) { + regionsToAssign.add(hri); } - // If the table was partially disabled and the RS went down, we should clear the RIT - // and remove the node for the region. - // The rit that we use may be stale in case the table was in DISABLING state - // but though we did assign we will not be clearing the znode in CLOSING state. - // Doing this will have no harm. See HBASE-5927 - if (rit != null - && (rit.isClosing() || rit.isPendingClose()) - && this.services.getAssignmentManager().getZKTable() - .isDisablingOrDisabledTable(rit.getRegion().getTableNameAsString())) { - HRegionInfo hri = rit.getRegion(); - AssignmentManager am = this.services.getAssignmentManager(); - am.deleteClosingOrClosedNode(hri); - am.regionOffline(hri); - } } } + for (HRegionInfo hri : regionsToAssign) { + this.services.getAssignmentManager().assign(hri, true); + } + LOG.info(regionsToAssign.size() + " regions which were planned to open on " + this.serverName + + " have been re-assigned."); } finally { this.deadServers.finish(serverName); } @@ -354,6 +306,108 @@ } /** + * Figure what to assign from the dead server considering state of RIT and whats up in .META. + * @param metaHRIs Regions that .META. says were assigned to the dead server + * @param ritsOnServer Regions that were in transition, and on the dead server. + * @param ritsGoingToServer Regions that were in transition to the dead server. + * @return List of regions to assign or null if aborting. + * @throws IOException + */ + private List getRegionsToAssign(final NavigableMap metaHRIs, + final List ritsOnServer, Set ritsGoingToServer) throws IOException { + List toAssign = new ArrayList(); + // If no regions on the server, then nothing to assign (Regions that were currently being + // assigned will be retried over in the AM#assign method). + if (metaHRIs == null || metaHRIs.isEmpty()) return toAssign; + // Remove regions that we do not want to reassign such as regions that are + // OFFLINE. If region is OFFLINE against this server, its probably being assigned over + // in the single region assign method in AM; do not assign it here too. TODO: VERIFY!!! + // TODO: Currently OFFLINE is too messy. Its done on single assign but bulk done when bulk + // assigning and then there is special handling when master joins a cluster. + // + // If split, the zk callback will have offlined. Daughters will be in the + // list of hris we got from scanning the .META. These should be reassigned. Not the parent. + for (RegionState rs : ritsOnServer) { + if (!rs.isClosing() && !rs.isPendingClose() && !rs.isSplitting()) { + LOG.debug("Removed " + rs.getRegion().getRegionNameAsString() + + " from list of regions to assign because region state: " + rs.getState()); + metaHRIs.remove(rs.getRegion()); + } + } + + for (Map.Entry e : metaHRIs.entrySet()) { + RegionState rit = services.getAssignmentManager().getRegionsInTransition().get( + e.getKey().getEncodedName()); + AssignmentManager assignmentManager = this.services.getAssignmentManager(); + if (processDeadRegion(e.getKey(), e.getValue(), assignmentManager, + this.server.getCatalogTracker())) { + ServerName addressFromAM = assignmentManager.getRegionServerOfRegion(e.getKey()); + if (rit != null && !rit.isClosing() && !rit.isPendingClose() && !rit.isSplitting() + && !ritsGoingToServer.contains(e.getKey())) { + // Skip regions that were in transition unless CLOSING or + // PENDING_CLOSE + LOG.info("Skip assigning region " + rit.toString()); + } else if (addressFromAM != null && !addressFromAM.equals(this.serverName)) { + LOG.debug("Skip assigning region " + e.getKey().getRegionNameAsString() + + " because it has been opened in " + addressFromAM.getServerName()); + ritsGoingToServer.remove(e.getKey()); + } else { + if (rit != null) { + // clean zk node + try { + LOG.info("Reassigning region with rs =" + rit + " and deleting zk node if exists"); + ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), e.getKey()); + } catch (KeeperException ke) { + this.server.abort("Unexpected ZK exception deleting unassigned node " + e.getKey(), + ke); + return null; + } + } + toAssign.add(e.getKey()); + } + } else if (rit != null && (rit.isSplitting() || rit.isSplit())) { + // This will happen when the RS went down and the call back for the SPLIITING or SPLIT + // has not yet happened for node Deleted event. In that case if the region was actually + // split but the RS had gone down before completing the split process then will not try + // to assign the parent region again. In that case we should make the region offline + // and also delete the region from RIT. + HRegionInfo region = rit.getRegion(); + AssignmentManager am = assignmentManager; + am.regionOffline(region); + ritsGoingToServer.remove(region); + } + // If the table was partially disabled and the RS went down, we should clear the RIT + // and remove the node for the region. The rit that we use may be stale in case the table + // was in DISABLING state but though we did assign we will not be clearing the znode in + // CLOSING state. Doing this will have no harm. See HBASE-5927 + toAssign = checkForDisablingOrDisabledTables(ritsGoingToServer, toAssign, rit, assignmentManager); + } + return toAssign; + } + + private List checkForDisablingOrDisabledTables(Set regionsFromRIT, + List toAssign, RegionState rit, AssignmentManager assignmentManager) { + if (rit == null) { + return toAssign; + } + if (!rit.isClosing() && !rit.isPendingClose()) { + return toAssign; + } + if (!assignmentManager.getZKTable().isDisablingOrDisabledTable( + rit.getRegion().getTableNameAsString())) { + return toAssign; + } + HRegionInfo hri = rit.getRegion(); + AssignmentManager am = assignmentManager; + am.deleteClosingOrClosedNode(hri); + am.regionOffline(hri); + // To avoid region assignment if table is in disabling or disabled state. + toAssign.remove(hri); + regionsFromRIT.remove(hri); + return toAssign; + } + + /** * Process a dead region from a dead RS. Checks if the region is disabled or * disabling or if the region has a partially completed split. * @param hri