Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1161037) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -326,7 +326,9 @@ synchronized (regionsInTransition) { switch (data.getEventType()) { case RS_ZK_REGION_CLOSING: - if (isOnDeadServer(regionInfo, deadServers)) { + //If zk node was updated by a living server,we should skip it and just add this region into RIT. + if (isOnDeadServer(regionInfo, deadServers) && + (null == data.getServerName() || !serverManager.isServerOnline(data.getServerName()))){ // If was on dead server, its closed now. Force to OFFLINE and this // will get it reassigned if appropriate forceOffline(regionInfo, data); @@ -372,7 +374,8 @@ "; letting RIT timeout so will be assigned elsewhere"); break; } - if (isOnDeadServer(regionInfo, deadServers)) { + if (isOnDeadServer(regionInfo, deadServers) && + (null == data.getServerName() || !serverManager.isServerOnline(data.getServerName()))) { // If was on a dead server, then its not open any more; needs handling. forceOffline(regionInfo, data); } else { @@ -1640,6 +1643,16 @@ boolean assign = ServerShutdownHandler.processDeadRegion(regionInfo, result, this, this.catalogTracker); + RegionTransitionData data = ZKAssign.getData(watcher, regionInfo.getEncodedName()); + + //When zk node has been updated by a living server, we consider that this region server is handling it. + //So we should skip it and process it in processRegionsInTransition. + if (data != null && data.getServerName() != null && + serverManager.isServerOnline(data.getServerName())){ + LOG.info("The region " + regionInfo.getEncodedName() + + "is processing by " + data.getServerName()); + continue; + } if (assign) { ZKAssign.createOrForceNodeOffline(watcher, regionInfo, master.getServerName()); Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1161037) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -83,6 +83,9 @@ if (region != null) { LOG.warn("Attempted open of " + name + " but already online on this server"); + + //This region should be assigned to another region server by RIT, so we need to close it. + cleanupFailedOpen(region); return; } Index: src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (revision 1161037) +++ src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (working copy) @@ -18,11 +18,12 @@ * limitations under the License. */ package org.apache.hadoop.hbase.master; +import static org.junit.Assert.assertEquals; - import java.io.IOException; import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,9 +42,11 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.handler.TotesHRegionInfo; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -64,6 +67,10 @@ @BeforeClass public static void beforeAllTests() throws Exception { Configuration c = TEST_UTIL.getConfiguration(); + // Need to drop the timeout much lower + c.setInt("hbase.master.assignment.timeoutmonitor.period", 2000); + c.setInt("hbase.master.assignment.timeoutmonitor.timeout", 4000); + c.setBoolean("dfs.support.append", true); c.setInt("hbase.regionserver.info.port", 0); TEST_UTIL.startMiniCluster(2); @@ -129,7 +136,30 @@ while (!reopenEventProcessed.get()) { Threads.sleep(100); } - + + //Test a region is reopened on a same region server. + reopenEventProcessed.set(false); + + List masterThreads = cluster.getMasterThreads(); + assertEquals(1, masterThreads.size()); + + HMaster master = masterThreads.get(0).getMaster(); + assertTrue(master.isActiveMaster()); + + hri = getNonMetaRegion(regionServer.getOnlineRegions()); + openListener = + new ReopenEventListener(hri.getRegionNameAsString(), + reopenEventProcessed, EventType.RS_ZK_REGION_OPENED); + cluster.getMaster().executorService. + registerListener(EventType.RS_ZK_REGION_OPENED, openListener); + + master.assignmentManager.regionPlans.put(hri.getEncodedName(), + new RegionPlan(hri, null, regionServer.getServerInfo())); + master.assignRegion(hri); + + while (!reopenEventProcessed.get()) { + Threads.sleep(100); + } LOG.info("Done with testReOpenRegion"); }