Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision 1096942) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (working copy) @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -121,6 +123,7 @@ static class MockRegionServerServices implements RegionServerServices { final Map regions = new HashMap(); boolean stopping = false; + Set rit = new HashSet(); @Override public boolean removeFromOnlineRegions(String encodedRegionName) { @@ -161,8 +164,13 @@ public HBaseRpcMetrics getRpcMetrics() { return null; } - + @Override + public Set getRegionsInTransitionInRS() { + return rit; + } + + @Override public FlushRequester getFlushRequester() { return null; } @@ -171,6 +179,7 @@ public CompactionRequestor getCompactionRequester() { return null; } + }; /** Index: src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (revision 1096942) +++ src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (working copy) @@ -49,6 +49,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertTrue; /** * Test open and close of regions using zk. @@ -244,6 +245,59 @@ } } + /** + * This test shows how a region won't be able to be assigned to a RS + * if it's already "processing" it. + * @throws Exception + */ + @Test + public void testRSAlreadyProcessingRegion() throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + + HRegionServer hr0 = + cluster.getLiveRegionServerThreads().get(0).getRegionServer(); + HRegionServer hr1 = + cluster.getLiveRegionServerThreads().get(1).getRegionServer(); + HRegionInfo hri = getNonMetaRegion(hr0.getOnlineRegions()); + + // fake that hr1 is processing the region + hr1.getRegionsInTransitionInRS().add(hri.getEncodedNameAsBytes()); + + AtomicBoolean reopenEventProcessed = new AtomicBoolean(false); + EventHandlerListener openListener = + new ReopenEventListener(hri.getRegionNameAsString(), + reopenEventProcessed, EventType.RS_ZK_REGION_OPENED); + cluster.getMaster().executorService. + registerListener(EventType.RS_ZK_REGION_OPENED, openListener); + + // now ask the master to move the region to hr1, will fail + TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), + Bytes.toBytes(hr1.getServerName())); + + while (!reopenEventProcessed.get()) { + Threads.sleep(100); + } + + // make sure the region came back + assertTrue(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()) == null); + + // remove the block and reset the boolean + hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes()); + reopenEventProcessed.set(false); + + // move the region again, but this time it will work + TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), + Bytes.toBytes(hr1.getServerName())); + + while (!reopenEventProcessed.get()) { + Threads.sleep(100); + } + + // make sure the region has moved from the original RS + assertTrue(hr0.getOnlineRegion(hri.getEncodedNameAsBytes()) == null); + + } + private static void waitUntilAllRegionsAssigned(final int countOfRegions) throws IOException { HTable meta = new HTable(TEST_UTIL.getConfiguration(), Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java (revision 0) @@ -0,0 +1,34 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +/** + * This exception is thrown when a region server is asked to open or close + * a region but it's already processing it + */ +public class RegionAlreadyInTransitionException extends IOException { + + public RegionAlreadyInTransitionException(String action, String region) { + super("Received " + action + " for region we are" + + " already opening or closing; " + region); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1096942) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -44,7 +44,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.YouAreDeadException; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.RootLocationEditor; @@ -270,6 +269,9 @@ // Replication services. If no replication, this handler will be null. private Replication replicationHandler; + private final Set regionsInTransitionInRS = + new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); + /** * Starts a HRegionServer at the default location * @@ -2060,7 +2062,10 @@ @Override @QosPriority(priority=HIGH_QOS) public void openRegion(HRegionInfo region) - throws RegionServerStoppedException { + throws IOException { + if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + throw new RegionAlreadyInTransitionException("open", region.getEncodedName()); + } LOG.info("Received request to open region: " + region.getRegionNameAsString()); if (this.stopped) throw new RegionServerStoppedException(); @@ -2076,7 +2081,7 @@ @Override @QosPriority(priority=HIGH_QOS) public void openRegions(List regions) - throws RegionServerStoppedException { + throws IOException { LOG.info("Received request to open " + regions.size() + " region(s)"); for (HRegionInfo region: regions) openRegion(region); } @@ -2084,14 +2089,14 @@ @Override @QosPriority(priority=HIGH_QOS) public boolean closeRegion(HRegionInfo region) - throws NotServingRegionException { + throws IOException { return closeRegion(region, true); } @Override @QosPriority(priority=HIGH_QOS) public boolean closeRegion(HRegionInfo region, final boolean zk) - throws NotServingRegionException { + throws IOException { LOG.info("Received close region: " + region.getRegionNameAsString()); boolean hasit = this.onlineRegions.containsKey(region.getEncodedName()); if (!hasit) { @@ -2100,6 +2105,9 @@ throw new NotServingRegionException("Received close for " + region.getRegionNameAsString() + " but we are not serving it"); } + if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + throw new RegionAlreadyInTransitionException("close", region.getEncodedName()); + } return closeRegion(region, false, zk); } @@ -2113,6 +2121,11 @@ */ protected boolean closeRegion(HRegionInfo region, final boolean abort, final boolean zk) { + if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + LOG.warn("Received close for region we are already opening or closing; " + + region.getEncodedName()); + return false; + } CloseRegionHandler crh = null; if (region.isRootRegion()) { crh = new CloseRootHandler(this, this, region, abort, zk); @@ -2604,6 +2617,10 @@ return this.compactSplitThread; } + public Set getRegionsInTransitionInRS() { + return this.regionsInTransitionInRS; + } + // // Main program and support routines // Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1096942) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -69,51 +69,57 @@ @Override public void process() throws IOException { - final String name = regionInfo.getRegionNameAsString(); - LOG.debug("Processing open of " + name); - if (this.server.isStopped() || this.rsServices.isStopping()) { - LOG.info("Server stopping or stopped, skipping open of " + name); - return; - } - final String encodedName = regionInfo.getEncodedName(); + try { + final String name = regionInfo.getRegionNameAsString(); + LOG.debug("Processing open of " + name); + if (this.server.isStopped() || this.rsServices.isStopping()) { + LOG.info("Server stopping or stopped, skipping open of " + name); + return; + } + final String encodedName = regionInfo.getEncodedName(); - // Check that this region is not already online - HRegion region = this.rsServices.getFromOnlineRegions(encodedName); - if (region != null) { - LOG.warn("Attempted open of " + name + - " but already online on this server"); - return; - } + // Check that this region is not already online + HRegion region = this.rsServices.getFromOnlineRegions(encodedName); + if (region != null) { + LOG.warn("Attempted open of " + name + + " but already online on this server"); + return; + } - // If fails, just return. Someone stole the region from under us. - // Calling transitionZookeeperOfflineToOpening initalizes this.version. - if (!transitionZookeeperOfflineToOpening(encodedName)) { - LOG.warn("Region was hijacked? It no longer exists, encodedName=" + - encodedName); - return; - } + // If fails, just return. Someone stole the region from under us. + // Calling transitionZookeeperOfflineToOpening initalizes this.version. + if (!transitionZookeeperOfflineToOpening(encodedName)) { + LOG.warn("Region was hijacked? It no longer exists, encodedName=" + + encodedName); + return; + } - // Open region. After a successful open, failures in subsequent processing - // needs to do a close as part of cleanup. - region = openRegion(); - if (region == null) return; - boolean failed = true; - if (tickleOpening("post_region_open")) { - if (updateMeta(region)) failed = false; - } + // Open region. After a successful open, failures in subsequent + // processing needs to do a close as part of cleanup. + region = openRegion(); + if (region == null) return; + boolean failed = true; + if (tickleOpening("post_region_open")) { + if (updateMeta(region)) failed = false; + } - if (failed || this.server.isStopped() || this.rsServices.isStopping()) { - cleanupFailedOpen(region); - return; - } + if (failed || this.server.isStopped() || + this.rsServices.isStopping()) { + cleanupFailedOpen(region); + return; + } - if (!transitionToOpened(region)) { - cleanupFailedOpen(region); - return; + if (!transitionToOpened(region)) { + cleanupFailedOpen(region); + return; + } + + // Done! Successful region open + LOG.debug("Opened " + name); + } finally { + this.rsServices.getRegionsInTransitionInRS(). + remove(this.regionInfo.getEncodedNameAsBytes()); } - - // Done! Successful region open - LOG.debug("Opened " + name); } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision 1096942) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (working copy) @@ -95,45 +95,51 @@ @Override public void process() { - String name = regionInfo.getRegionNameAsString(); - LOG.debug("Processing close of " + name); - String encodedRegionName = regionInfo.getEncodedName(); - // Check that this region is being served here - HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName); - if (region == null) { - LOG.warn("Received CLOSE for region " + name + " but currently not serving"); - return; - } - - int expectedVersion = FAILED; - if (this.zk) { - expectedVersion = setClosingState(); - if (expectedVersion == FAILED) return; - } - - // Close the region try { - // TODO: If we need to keep updating CLOSING stamp to prevent against - // a timeout if this is long-running, need to spin up a thread? - if (region.close(abort) == null) { - // This region got closed. Most likely due to a split. So instead - // of doing the setClosedState() below, let's just ignore and continue. - // The split message will clean up the master state. - LOG.warn("Can't close region: was already closed during close(): " + - regionInfo.getRegionNameAsString()); + String name = regionInfo.getRegionNameAsString(); + LOG.debug("Processing close of " + name); + String encodedRegionName = regionInfo.getEncodedName(); + // Check that this region is being served here + HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName); + if (region == null) { + LOG.warn("Received CLOSE for region " + name + + " but currently not serving"); return; } - } catch (IOException e) { - LOG.error("Unrecoverable exception while closing region " + - regionInfo.getRegionNameAsString() + ", still finishing close", e); - } - this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName()); + int expectedVersion = FAILED; + if (this.zk) { + expectedVersion = setClosingState(); + if (expectedVersion == FAILED) return; + } - if (this.zk) setClosedState(expectedVersion, region); + // Close the region + try { + // TODO: If we need to keep updating CLOSING stamp to prevent against + // a timeout if this is long-running, need to spin up a thread? + if (region.close(abort) == null) { + // This region got closed. Most likely due to a split. So instead + // of doing the setClosedState() below, let's just ignore cont + // The split message will clean up the master state. + LOG.warn("Can't close region: was already closed during close(): " + + regionInfo.getRegionNameAsString()); + return; + } + } catch (IOException e) { + LOG.error("Unrecoverable exception while closing region " + + regionInfo.getRegionNameAsString() + ", still finishing close", e); + } - // Done! Region is closed on this RS - LOG.debug("Closed region " + region.getRegionNameAsString()); + this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName()); + + if (this.zk) setClosedState(expectedVersion, region); + + // Done! Region is closed on this RS + LOG.debug("Closed region " + region.getRegionNameAsString()); + } finally { + this.rsServices.getRegionsInTransitionInRS(). + remove(this.regionInfo.getEncodedNameAsBytes()); + } } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (revision 1096942) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.zookeeper.KeeperException; +import java.util.Set; /** * Services provided by {@link HRegionServer} @@ -72,4 +73,10 @@ * Returns a reference to the RPC server metrics. */ public HBaseRpcMetrics getRpcMetrics(); + + /** + * Get the regions that are currently being opened or closed in the RS + * @return set of regions in transition in this RS + */ + public Set getRegionsInTransitionInRS(); } \ No newline at end of file