diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index df8c609..dc2c0e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -159,6 +159,17 @@ public class AssignmentManager extends ZooKeeperListener { private final ExecutorService executorService; + // For unit tests, keep track of calls to ClosedRegionHandler + private Map closedRegionHandlerCalled = + new HashMap(); + + // For unit tests, keep track of calls to OpenedRegionHandler + private Map openedRegionHandlerCalled = + new HashMap(); + + // For unit tests, keep track of calls to SplitRegionHandler + private AtomicBoolean splitRegionHandlerCalled = new AtomicBoolean(false); + //Thread pool executor service for timeout monitor private java.util.concurrent.ExecutorService threadPoolExecutorService; @@ -836,8 +847,8 @@ public class AssignmentManager extends ZooKeeperListener { break; } // Run handler to do the rest of the SPLIT handling. - this.executorService.submit(new SplitRegionHandler(server, this, - regionState.getRegion(), sn, daughters)); + new SplitRegionHandler(server, this, regionState.getRegion(), sn, daughters).process(); + splitRegionHandlerCalled.set(true); break; case RS_ZK_REGION_MERGING: @@ -872,8 +883,7 @@ public class AssignmentManager extends ZooKeeperListener { + merge_a + ", rs_b=" + merge_b); } // Run handler to do the rest of the MERGED handling. - this.executorService.submit(new MergedRegionHandler( - server, this, sn, mergeRegions)); + new MergedRegionHandler(server, this, sn, mergeRegions).process(); break; case M_ZK_REGION_CLOSING: @@ -907,8 +917,8 @@ public class AssignmentManager extends ZooKeeperListener { regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED); if (regionState != null) { removeClosedRegion(regionState.getRegion()); - this.executorService.submit(new ClosedRegionHandler(server, - this, regionState.getRegion())); + new ClosedRegionHandler(server, this, regionState.getRegion()).process(); + closedRegionHandlerCalled.put(regionState.getRegion(), new AtomicBoolean(true)); } break; @@ -941,8 +951,7 @@ public class AssignmentManager extends ZooKeeperListener { // When there are more than one region server a new RS is selected as the // destination and the same is updated in the regionplan. (HBASE-5546) getRegionPlan(regionState.getRegion(), sn, true); - this.executorService.submit(new ClosedRegionHandler(server, - this, regionState.getRegion())); + new ClosedRegionHandler(server, this, regionState.getRegion()).process(); } } break; @@ -980,8 +989,9 @@ public class AssignmentManager extends ZooKeeperListener { regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN); if (regionState != null) { failedOpenTracker.remove(encodedName); // reset the count, if any - this.executorService.submit(new OpenedRegionHandler( - server, this, regionState.getRegion(), sn, expectedVersion)); + new OpenedRegionHandler( + server, this, regionState.getRegion(), sn, expectedVersion).process(); + openedRegionHandlerCalled.put(regionState.getRegion(), new AtomicBoolean(true)); } break; @@ -993,6 +1003,32 @@ public class AssignmentManager extends ZooKeeperListener { } } + //For unit tests only + boolean wasClosedHandlerCalled(HRegionInfo hri) { + AtomicBoolean b = closedRegionHandlerCalled.get(hri); + //compareAndSet to be sure that unit tests don't see stale values. Means, + //we will return true exactly once unless the handler code resets to true + //this value. + return b == null ? false : b.compareAndSet(true, false); + } + + //For unit tests only + boolean wasOpenedHandlerCalled(HRegionInfo hri) { + AtomicBoolean b = openedRegionHandlerCalled.get(hri); + //compareAndSet to be sure that unit tests don't see stale values. Means, + //we will return true exactly once unless the handler code resets to true + //this value. + return b == null ? false : b.compareAndSet(true, false); + } + + //For unit tests only + boolean wasSplitHandlerCalled() { + //compareAndSet to be sure that unit tests don't see stale values. Means, + //we will return true exactly once unless the handler code resets to true + //this value. + return splitRegionHandlerCalled.compareAndSet(true, false); + } + /** * @return Returns true if this RegionState is splittable; i.e. the * RegionState is currently in splitting state or pending_close or diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index eb4f2c9..04c5e55 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -24,16 +24,11 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import java.io.IOException; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -86,35 +81,27 @@ public class TestMaster { tableRegions.get(0).getFirst().getEndKey()); // Now trigger a split and stop when the split is in progress - CountDownLatch split = new CountDownLatch(1); - CountDownLatch proceed = new CountDownLatch(1); - RegionSplitListener list = new RegionSplitListener(split, proceed); - cluster.getMaster().executorService. - registerListener(EventType.RS_ZK_REGION_SPLIT, list); - LOG.info("Splitting table"); TEST_UTIL.getHBaseAdmin().split(TABLENAME); LOG.info("Waiting for split result to be about to open"); - split.await(60, TimeUnit.SECONDS); - try { - LOG.info("Making sure we can call getTableRegions while opening"); - tableRegions = MetaReader.getTableRegionsAndLocations(m.getCatalogTracker(), + while (!m.assignmentManager.wasSplitHandlerCalled()) { + Thread.sleep(100); + } + LOG.info("Making sure we can call getTableRegions while opening"); + tableRegions = MetaReader.getTableRegionsAndLocations(m.getCatalogTracker(), TABLENAME, false); - LOG.info("Regions: " + Joiner.on(',').join(tableRegions)); - // We have three regions because one is split-in-progress - assertEquals(3, tableRegions.size()); - LOG.info("Making sure we can call getTableRegionClosest while opening"); - Pair pair = + LOG.info("Regions: " + Joiner.on(',').join(tableRegions)); + // We have three regions because one is split-in-progress + assertEquals(3, tableRegions.size()); + LOG.info("Making sure we can call getTableRegionClosest while opening"); + Pair pair = m.getTableRegionForRow(TABLENAME, Bytes.toBytes("cde")); - LOG.info("Result is: " + pair); - Pair tableRegionFromName = + LOG.info("Result is: " + pair); + Pair tableRegionFromName = MetaReader.getRegion(m.getCatalogTracker(), pair.getFirst().getRegionName()); - assertEquals(tableRegionFromName.getFirst(), pair.getFirst()); - } finally { - proceed.countDown(); - } + assertEquals(tableRegionFromName.getFirst(), pair.getFirst()); } @Test @@ -175,33 +162,5 @@ public class TestMaster { TEST_UTIL.deleteTable(tableName); } } - - static class RegionSplitListener implements EventHandlerListener { - CountDownLatch split, proceed; - - public RegionSplitListener(CountDownLatch split, CountDownLatch proceed) { - this.split = split; - this.proceed = proceed; - } - - @Override - public void afterProcess(EventHandler event) { - if (event.getEventType() != EventType.RS_ZK_REGION_SPLIT) { - return; - } - try { - split.countDown(); - proceed.await(60, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } - return; - } - - @Override - public void beforeProcess(EventHandler event) { - } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java index 6902b25..d753216 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java @@ -26,7 +26,6 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,10 +42,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.handler.TotesHRegionInfo; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; @@ -116,29 +111,14 @@ public class TestZKBasedOpenCloseRegion { HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer)); LOG.debug("Asking RS to close region " + hri.getRegionNameAsString()); - AtomicBoolean closeEventProcessed = new AtomicBoolean(false); - AtomicBoolean reopenEventProcessed = new AtomicBoolean(false); - - EventHandlerListener closeListener = - new ReopenEventListener(hri.getRegionNameAsString(), - closeEventProcessed, EventType.RS_ZK_REGION_CLOSED); - cluster.getMaster().executorService. - registerListener(EventType.RS_ZK_REGION_CLOSED, closeListener); - - EventHandlerListener openListener = - new ReopenEventListener(hri.getRegionNameAsString(), - reopenEventProcessed, EventType.RS_ZK_REGION_OPENED); - cluster.getMaster().executorService. - registerListener(EventType.RS_ZK_REGION_OPENED, openListener); - LOG.info("Unassign " + hri.getRegionNameAsString()); cluster.getMaster().assignmentManager.unassign(hri); - while (!closeEventProcessed.get()) { + while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) { Threads.sleep(100); } - while (!reopenEventProcessed.get()) { + while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) { Threads.sleep(100); } @@ -157,83 +137,6 @@ public class TestZKBasedOpenCloseRegion { return hri; } - public static class ReopenEventListener implements EventHandlerListener { - private static final Log LOG = LogFactory.getLog(ReopenEventListener.class); - String regionName; - AtomicBoolean eventProcessed; - EventType eventType; - - public ReopenEventListener(String regionName, - AtomicBoolean eventProcessed, EventType eventType) { - this.regionName = regionName; - this.eventProcessed = eventProcessed; - this.eventType = eventType; - } - - @Override - public void beforeProcess(EventHandler event) { - if(event.getEventType() == eventType) { - LOG.info("Received " + eventType + " and beginning to process it"); - } - } - - @Override - public void afterProcess(EventHandler event) { - LOG.info("afterProcess(" + event + ")"); - if(event.getEventType() == eventType) { - LOG.info("Finished processing " + eventType); - String regionName = ""; - if(eventType == EventType.RS_ZK_REGION_OPENED) { - TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event; - regionName = hriCarrier.getHRegionInfo().getRegionNameAsString(); - } else if(eventType == EventType.RS_ZK_REGION_CLOSED) { - TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event; - regionName = hriCarrier.getHRegionInfo().getRegionNameAsString(); - } - if(this.regionName.equals(regionName)) { - eventProcessed.set(true); - } - synchronized(eventProcessed) { - eventProcessed.notifyAll(); - } - } - } - } - - public static class CloseRegionEventListener implements EventHandlerListener { - private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class); - String regionToClose; - AtomicBoolean closeEventProcessed; - - public CloseRegionEventListener(String regionToClose, - AtomicBoolean closeEventProcessed) { - this.regionToClose = regionToClose; - this.closeEventProcessed = closeEventProcessed; - } - - @Override - public void afterProcess(EventHandler event) { - LOG.info("afterProcess(" + event + ")"); - if(event.getEventType() == EventType.RS_ZK_REGION_CLOSED) { - LOG.info("Finished processing CLOSE REGION"); - TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event; - if (regionToClose.equals(hriCarrier.getHRegionInfo().getRegionNameAsString())) { - LOG.info("Setting closeEventProcessed flag"); - closeEventProcessed.set(true); - } else { - LOG.info("Region to close didn't match"); - } - } - } - - @Override - public void beforeProcess(EventHandler event) { - if(event.getEventType() == EventType.M_RS_CLOSE_REGION) { - LOG.info("Received CLOSE RPC and beginning to process it"); - } - } - } - /** * This test shows how a region won't be able to be assigned to a RS * if it's already "processing" it. @@ -253,13 +156,6 @@ public class TestZKBasedOpenCloseRegion { // fake that hr1 is processing the region hr1.getRegionsInTransitionInRS().putIfAbsent(hri.getEncodedNameAsBytes(), true); - AtomicBoolean reopenEventProcessed = new AtomicBoolean(false); - EventHandlerListener openListener = - new ReopenEventListener(hri.getRegionNameAsString(), - reopenEventProcessed, EventType.RS_ZK_REGION_OPENED); - cluster.getMaster().executorService. - registerListener(EventType.RS_ZK_REGION_OPENED, openListener); - // now ask the master to move the region to hr1, will fail TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), Bytes.toBytes(hr1.getServerName().toString())); @@ -269,22 +165,14 @@ public class TestZKBasedOpenCloseRegion { // remove the block and reset the boolean hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes()); - reopenEventProcessed.set(false); // now try moving a region when there is no region in transition. hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(hr1)); - openListener = - new ReopenEventListener(hri.getRegionNameAsString(), - reopenEventProcessed, EventType.RS_ZK_REGION_OPENED); - - cluster.getMaster().executorService. - registerListener(EventType.RS_ZK_REGION_OPENED, openListener); - TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), Bytes.toBytes(hr0.getServerName().toString())); - while (!reopenEventProcessed.get()) { + while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) { Threads.sleep(100); } @@ -304,15 +192,9 @@ public class TestZKBasedOpenCloseRegion { HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer)); LOG.debug("Asking RS to close region " + hri.getRegionNameAsString()); - AtomicBoolean closeEventProcessed = new AtomicBoolean(false); - EventHandlerListener listener = - new CloseRegionEventListener(hri.getRegionNameAsString(), - closeEventProcessed); - cluster.getMaster().executorService.registerListener(EventType.RS_ZK_REGION_CLOSED, listener); - cluster.getMaster().assignmentManager.unassign(hri); - while (!closeEventProcessed.get()) { + while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) { Threads.sleep(100); } LOG.info("Done with testCloseRegion");