diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index a067bed..a1bdac4 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; @@ -45,6 +48,7 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -127,8 +131,118 @@ public class TestAssignmentManager { } @After - public void after() { - if (this.watcher != null) this.watcher.close(); + public void after() throws KeeperException { + if (this.watcher != null) { + // Clean up all znodes + ZKAssign.deleteAllNodes(this.watcher); + this.watcher.close(); + } + } + + /** + * Test a balance going on at same time as a master failover + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test (timeout=5000) + public void testBalanceOnMasterFailover() + throws IOException, KeeperException, InterruptedException { + // We need a mocked catalog tracker. Its used by our AM instance. + CatalogTracker ct = Mockito.mock(CatalogTracker.class); + // Make an RS Interface implementation. Make it so a scanner can go against + // it and a get to return the single region, REGIONINFO, this test is + // messing with. Needed when "new master" joins cluster. AM will try and + // rebuild its list of user regions and it will also get the HRI that goes + // with an encoded name by doing a Get on .META. + HRegionInterface implementation = Mockito.mock(HRegionInterface.class); + // Get a meta row result that has region up on SERVERNAME_A for REGIONINFO + Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A); + Mockito.when(implementation.openScanner((byte [])Mockito.any(), (Scan)Mockito.any())). + thenReturn(System.currentTimeMillis()); + // Return a good result 'r' first and then return null to indicate end of scan + Mockito.when(implementation.next(Mockito.anyLong(), Mockito.anyInt())). + thenReturn(new Result [] {r}, (Result [])null); + // If a get, return the above result too for REGIONINFO + Mockito.when(implementation.get((byte [])Mockito.any(), (Get)Mockito.any())). + thenReturn(r); + // Get a connection w/ mocked up common methods. + HConnection connection = + HConnectionTestingUtility.getMockedConnectionAndDecorate(HTU.getConfiguration(), + implementation, SERVERNAME_B, REGIONINFO); + // Make it so we can get the connection from our mocked catalogtracker + Mockito.when(ct.getConnection()).thenReturn(connection); + + // Create and startup an executor. Used by AM handling zk callbacks. + ExecutorService executor = + startupMasterExecutor("testBalanceOnMasterFailoverExecutor"); + // Create an AM. We want to ensure events proceed in a certain order so + // subclass so we can intercept processRegionsInTransition + final AtomicBoolean gate = new AtomicBoolean(true); + final AssignmentManager am = + new AssignmentManager(this.server, this.serverManager, ct, executor) { + @Override + void processRegionsInTransition(final RegionTransitionData data, + final HRegionInfo regionInfo, + final Map>> deadServers, + final int expectedVersion) + throws KeeperException { + while (gate.get()) Threads.sleep(1); + super.processRegionsInTransition(data, regionInfo, deadServers, + expectedVersion); + } + }; + try { + // Make sure our new AM gets callbacks; once registered, can't unregister. + // Thats ok because we make a new zk watcher for each test. + this.watcher.registerListenerFirst(am); + // Call the balance function but fake the region being online first at + // SERVERNAME_A. Create a balance plan. + am.regionOnline(REGIONINFO, SERVERNAME_A); + // Balance region from A to B. It calls unassign setting CLOSING state + // up in zk. + RegionPlan plan = new RegionPlan(REGIONINFO, SERVERNAME_A, SERVERNAME_B); + am.balance(plan); + + // Thread that is faking an AM coming up in a new failed over master. + Thread t = new Thread("FakedNewMaster") { + public void run() { + // Call the joinCluster function as though we were doing a master + // failover at this point. It will stall just before we go to add + // the RIT region to our RIT Map in AM at processRegionsInTransition. + // First clear any inmemory state from AM so it acts like a new master + // coming on line. + am.regionsInTransition.clear(); + am.regionPlans.clear(); + try { + am.joinCluster(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (KeeperException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + }; + t.start(); + while(!t.isAlive()) Threads.sleep(1); + + // Now fake the region closing successfully over on the regionserver; the + // regionserver will have set the region in CLOSED state. This will + // trigger callback into AM. The below zk close call is from the RS close + // region handler duplicated here because its down deep in a private + // method hard to expose. + int versionid = + ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1); + assertNotSame(versionid, -1); + // We will block here until our znode is cleared or until this test + // timesout. + ZKAssign.blockUntilNoRIT(watcher); + } finally { + executor.shutdown(); + am.shutdown(); + } } /** @@ -193,8 +307,6 @@ public class TestAssignmentManager { } finally { executor.shutdown(); am.shutdown(); - // Clean up all znodes - ZKAssign.deleteAllNodes(this.watcher); } }