Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision 1097257) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (working copy) @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -122,6 +124,7 @@ static class MockRegionServerServices implements RegionServerServices { final Map regions = new HashMap(); boolean stopping = false; + Set rit = new HashSet(); @Override public boolean removeFromOnlineRegions(String encodedRegionName) { @@ -162,8 +165,13 @@ public HBaseRpcMetrics getRpcMetrics() { return null; } - + @Override + public Set getRegionsInTransitionInRS() { + return rit; + } + + @Override public FlushRequester getFlushRequester() { return null; } @@ -211,6 +219,7 @@ public boolean isStopped() { return false; } + }; /** Index: src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 1097257) +++ src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -171,7 +171,7 @@ @Override public boolean closeRegion(HRegionInfo region) - throws NotServingRegionException { + throws IOException { if (TEST_SKIP_CLOSE) return true; return super.closeRegion(region); } Index: src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (revision 1097257) +++ src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (working copy) @@ -49,6 +49,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertTrue; /** * Test open and close of regions using zk. @@ -59,6 +60,7 @@ private static final String TABLENAME = "TestZKBasedOpenCloseRegion"; private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")}; + private static int countOfRegions; @BeforeClass public static void beforeAllTests() throws Exception { Configuration c = TEST_UTIL.getConfiguration(); @@ -67,8 +69,8 @@ TEST_UTIL.startMiniCluster(2); TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); - int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); - waitUntilAllRegionsAssigned(countOfRegions); + countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); + waitUntilAllRegionsAssigned(); addToEachStartKey(countOfRegions); } @@ -83,6 +85,7 @@ TEST_UTIL.getHBaseCluster().startRegionServer()); } + waitUntilAllRegionsAssigned(); } /** @@ -244,7 +247,61 @@ } } - private static void waitUntilAllRegionsAssigned(final int countOfRegions) + /** + * This test shows how a region won't be able to be assigned to a RS + * if it's already "processing" it. + * @throws Exception + */ + @Test + public void testRSAlreadyProcessingRegion() throws Exception { + LOG.info("starting testRSAlreadyProcessingRegion"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + + HRegionServer hr0 = + cluster.getLiveRegionServerThreads().get(0).getRegionServer(); + HRegionServer hr1 = + cluster.getLiveRegionServerThreads().get(1).getRegionServer(); + HRegionInfo hri = getNonMetaRegion(hr0.getOnlineRegions()); + + // fake that hr1 is processing the region + hr1.getRegionsInTransitionInRS().add(hri.getEncodedNameAsBytes()); + + AtomicBoolean reopenEventProcessed = new AtomicBoolean(false); + EventHandlerListener openListener = + new ReopenEventListener(hri.getRegionNameAsString(), + reopenEventProcessed, EventType.RS_ZK_REGION_OPENED); + cluster.getMaster().executorService. + registerListener(EventType.RS_ZK_REGION_OPENED, openListener); + + // now ask the master to move the region to hr1, will fail + TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), + Bytes.toBytes(hr1.getServerName())); + + while (!reopenEventProcessed.get()) { + Threads.sleep(100); + } + + // make sure the region came back + assertTrue(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()) == null); + + // remove the block and reset the boolean + hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes()); + reopenEventProcessed.set(false); + + // move the region again, but this time it will work + TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), + Bytes.toBytes(hr1.getServerName())); + + while (!reopenEventProcessed.get()) { + Threads.sleep(100); + } + + // make sure the region has moved from the original RS + assertTrue(hr0.getOnlineRegion(hri.getEncodedNameAsBytes()) == null); + + } + + private static void waitUntilAllRegionsAssigned() throws IOException { HTable meta = new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME); @@ -263,7 +320,7 @@ } s.close(); // If I get to here and all rows have a Server, then all have been assigned. - if (rows == countOfRegions) { + if (rows >= countOfRegions) { break; } LOG.info("Found=" + rows); Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java (revision 0) @@ -0,0 +1,34 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +/** + * This exception is thrown when a region server is asked to open or close + * a region but it's already processing it + */ +public class RegionAlreadyInTransitionException extends IOException { + + public RegionAlreadyInTransitionException(String action, String region) { + super("Received " + action + " for region we are" + + " already opening or closing; " + region); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1097257) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -43,6 +43,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -177,6 +178,9 @@ private Path rootDir; private final Random rand = new Random(); + private final Set regionsInTransitionInRS = + new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); + /** * Map of regions currently being served by this region server. Key is the * encoded region name. All access should be synchronized. @@ -2256,7 +2260,10 @@ @Override @QosPriority(priority=HIGH_QOS) public void openRegion(HRegionInfo region) - throws RegionServerStoppedException { + throws IOException { + if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + throw new RegionAlreadyInTransitionException("open", region.getEncodedName()); + } LOG.info("Received request to open region: " + region.getRegionNameAsString()); if (this.stopped) throw new RegionServerStoppedException(); @@ -2272,7 +2279,7 @@ @Override @QosPriority(priority=HIGH_QOS) public void openRegions(List regions) - throws RegionServerStoppedException { + throws IOException { LOG.info("Received request to open " + regions.size() + " region(s)"); for (HRegionInfo region: regions) openRegion(region); } @@ -2280,14 +2287,14 @@ @Override @QosPriority(priority=HIGH_QOS) public boolean closeRegion(HRegionInfo region) - throws NotServingRegionException { + throws IOException { return closeRegion(region, true); } @Override @QosPriority(priority=HIGH_QOS) public boolean closeRegion(HRegionInfo region, final boolean zk) - throws NotServingRegionException { + throws IOException { LOG.info("Received close region: " + region.getRegionNameAsString()); boolean hasit = this.onlineRegions.containsKey(region.getEncodedName()); if (!hasit) { @@ -2296,6 +2303,9 @@ throw new NotServingRegionException("Received close for " + region.getRegionNameAsString() + " but we are not serving it"); } + if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + throw new RegionAlreadyInTransitionException("close", region.getEncodedName()); + } return closeRegion(region, false, zk); } @@ -2309,6 +2319,11 @@ */ protected boolean closeRegion(HRegionInfo region, final boolean abort, final boolean zk) { + if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + LOG.warn("Received close for region we are already opening or closing; " + + region.getEncodedName()); + return false; + } CloseRegionHandler crh = null; if (region.isRootRegion()) { crh = new CloseRootHandler(this, this, region, abort, zk); @@ -2832,6 +2847,10 @@ } + public Set getRegionsInTransitionInRS() { + return this.regionsInTransitionInRS; + } + // // Main program and support routines // Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1097257) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -69,50 +69,56 @@ @Override public void process() throws IOException { - final String name = regionInfo.getRegionNameAsString(); - LOG.debug("Processing open of " + name); - if (this.server.isStopped() || this.rsServices.isStopping()) { - LOG.info("Server stopping or stopped, skipping open of " + name); - return; - } - final String encodedName = regionInfo.getEncodedName(); + try { + final String name = regionInfo.getRegionNameAsString(); + LOG.debug("Processing open of " + name); + if (this.server.isStopped() || this.rsServices.isStopping()) { + LOG.info("Server stopping or stopped, skipping open of " + name); + return; + } + final String encodedName = regionInfo.getEncodedName(); - // Check that this region is not already online - HRegion region = this.rsServices.getFromOnlineRegions(encodedName); - if (region != null) { - LOG.warn("Attempted open of " + name + - " but already online on this server"); - return; - } + // Check that this region is not already online + HRegion region = this.rsServices.getFromOnlineRegions(encodedName); + if (region != null) { + LOG.warn("Attempted open of " + name + + " but already online on this server"); + return; + } - // If fails, just return. Someone stole the region from under us. - // Calling transitionZookeeperOfflineToOpening initalizes this.version. - if (!transitionZookeeperOfflineToOpening(encodedName)) { - LOG.warn("Region was hijacked? It no longer exists, encodedName=" + - encodedName); - return; - } + // If fails, just return. Someone stole the region from under us. + // Calling transitionZookeeperOfflineToOpening initalizes this.version. + if (!transitionZookeeperOfflineToOpening(encodedName)) { + LOG.warn("Region was hijacked? It no longer exists, encodedName=" + + encodedName); + return; + } - // Open region. After a successful open, failures in subsequent processing - // needs to do a close as part of cleanup. - region = openRegion(); - if (region == null) return; - boolean failed = true; - if (tickleOpening("post_region_open")) { - if (updateMeta(region)) failed = false; - } - if (failed || this.server.isStopped() || this.rsServices.isStopping()) { - cleanupFailedOpen(region); - return; - } + // Open region. After a successful open, failures in subsequent + // processing needs to do a close as part of cleanup. + region = openRegion(); + if (region == null) return; + boolean failed = true; + if (tickleOpening("post_region_open")) { + if (updateMeta(region)) failed = false; + } + if (failed || this.server.isStopped() || + this.rsServices.isStopping()) { + cleanupFailedOpen(region); + return; + } - if (!transitionToOpened(region)) { - cleanupFailedOpen(region); - return; + if (!transitionToOpened(region)) { + cleanupFailedOpen(region); + return; + } + + // Done! Successful region open + LOG.debug("Opened " + name); + } finally { + this.rsServices.getRegionsInTransitionInRS(). + remove(this.regionInfo.getEncodedNameAsBytes()); } - - // Done! Successful region open - LOG.debug("Opened " + name); } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision 1097257) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (working copy) @@ -95,45 +95,51 @@ @Override public void process() { - String name = regionInfo.getRegionNameAsString(); - LOG.debug("Processing close of " + name); - String encodedRegionName = regionInfo.getEncodedName(); - // Check that this region is being served here - HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName); - if (region == null) { - LOG.warn("Received CLOSE for region " + name + " but currently not serving"); - return; - } - - int expectedVersion = FAILED; - if (this.zk) { - expectedVersion = setClosingState(); - if (expectedVersion == FAILED) return; - } - - // Close the region try { - // TODO: If we need to keep updating CLOSING stamp to prevent against - // a timeout if this is long-running, need to spin up a thread? - if (region.close(abort) == null) { - // This region got closed. Most likely due to a split. So instead - // of doing the setClosedState() below, let's just ignore and continue. - // The split message will clean up the master state. - LOG.warn("Can't close region: was already closed during close(): " + - regionInfo.getRegionNameAsString()); + String name = regionInfo.getRegionNameAsString(); + LOG.debug("Processing close of " + name); + String encodedRegionName = regionInfo.getEncodedName(); + // Check that this region is being served here + HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName); + if (region == null) { + LOG.warn("Received CLOSE for region " + name + + " but currently not serving"); return; } - } catch (IOException e) { - LOG.error("Unrecoverable exception while closing region " + - regionInfo.getRegionNameAsString() + ", still finishing close", e); - } - this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName()); + int expectedVersion = FAILED; + if (this.zk) { + expectedVersion = setClosingState(); + if (expectedVersion == FAILED) return; + } - if (this.zk) setClosedState(expectedVersion, region); + // Close the region + try { + // TODO: If we need to keep updating CLOSING stamp to prevent against + // a timeout if this is long-running, need to spin up a thread? + if (region.close(abort) == null) { + // This region got closed. Most likely due to a split. So instead + // of doing the setClosedState() below, let's just ignore cont + // The split message will clean up the master state. + LOG.warn("Can't close region: was already closed during close(): " + + regionInfo.getRegionNameAsString()); + return; + } + } catch (IOException e) { + LOG.error("Unrecoverable exception while closing region " + + regionInfo.getRegionNameAsString() + ", still finishing close", e); + } - // Done! Region is closed on this RS - LOG.debug("Closed region " + region.getRegionNameAsString()); + this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName()); + + if (this.zk) setClosedState(expectedVersion, region); + + // Done! Region is closed on this RS + LOG.debug("Closed region " + region.getRegionNameAsString()); + } finally { + this.rsServices.getRegionsInTransitionInRS(). + remove(this.regionInfo.getEncodedNameAsBytes()); + } } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (revision 1097257) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import java.util.Set; /** * Services provided by {@link HRegionServer} @@ -78,4 +79,10 @@ * Returns a reference to the RPC server metrics. */ public HBaseRpcMetrics getRpcMetrics(); + + /** + * Get the regions that are currently being opened or closed in the RS + * @return set of regions in transition in this RS + */ + public Set getRegionsInTransitionInRS(); } \ No newline at end of file