Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 1172481) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -1311,6 +1311,8 @@ } /** + * Tries to assign a region. Region could be reassigned to the same server. + * * @param regionName * Region name to assign. * @param force Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1172481) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionPlan; @@ -79,7 +80,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; @@ -94,6 +94,7 @@ * Handles existing regions in transition during master failover. */ public class AssignmentManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(AssignmentManager.class); protected Server master; @@ -1449,6 +1450,14 @@ } break; } catch (Throwable t) { + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + if (t instanceof RegionAlreadyInTransitionException) { + String errorMsg = "Failed assignment in: " + plan.getDestination() + + " due to " + t.getMessage(); + LOG.error(errorMsg, t); + return; + } LOG.warn("Failed assignment of " + state.getRegion().getRegionNameAsString() + " to " + plan.getDestination() + ", trying to assign elsewhere instead; " + @@ -1465,6 +1474,7 @@ } } } + } } private void debugLog(HRegionInfo region, String string) { @@ -2656,7 +2666,7 @@ /** * State of a Region while undergoing transitions. */ - public static class RegionState implements Writable { + public static class RegionState implements org.apache.hadoop.io.Writable { private HRegionInfo region; public enum State { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1172481) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -43,7 +43,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -100,7 +100,6 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; import org.apache.hadoop.hbase.io.hfile.CacheStats; -import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; @@ -155,6 +154,7 @@ */ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, Runnable, RegionServerServices { + public static final Log LOG = LogFactory.getLog(HRegionServer.class); // Set when a report to the master comes back with a message asking us to @@ -182,8 +182,11 @@ private Path rootDir; private final Random rand = new Random(); - private final Set regionsInTransitionInRS = - new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); + //RegionName vs current action in progress + //true - if open region action in progress + //false - if close region action in progress + private final ConcurrentSkipListMap regionsInTransitionInRS = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); /** * Map of regions currently being served by this region server. Key is the @@ -306,6 +309,13 @@ */ private TableDescriptors tableDescriptors; + /* + * Strings to be used in forming the exception message for + * RegionsAlreadyInTransitionException. + */ + private static final String OPEN = "OPEN"; + private static final String CLOSE = "CLOSE"; + /** * Starts a HRegionServer at the default location * @@ -803,7 +813,7 @@ // iterator of onlineRegions to close all user regions. for (Map.Entry e : this.onlineRegions.entrySet()) { HRegionInfo hri = e.getValue().getRegionInfo(); - if (!this.regionsInTransitionInRS.contains(hri.getEncodedNameAsBytes())) { + if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())) { // Don't update zk with this close transition; pass false. closeRegion(hri, abort, false); } @@ -2352,9 +2362,7 @@ public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode) throws IOException { checkOpen(); - if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { - throw new RegionAlreadyInTransitionException("open", region.getEncodedName()); - } + checkIfRegionAlreadyInTransition(region, OPEN); HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName()); if (null != onlineRegion) { LOG.warn("Attempted open of " + region.getEncodedName() @@ -2363,7 +2371,8 @@ } LOG.info("Received request to open region: " + region.getRegionNameAsString()); - this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes()); + this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), + true); HTableDescriptor htd = this.tableDescriptors.get(region.getTableName()); // Need to pass the expected version in the constructor. if (region.isRootRegion()) { @@ -2378,6 +2387,19 @@ } return RegionOpeningState.OPENED; } + + private void checkIfRegionAlreadyInTransition(HRegionInfo region, + String currentAction) throws RegionAlreadyInTransitionException { + byte[] encodedName = region.getEncodedNameAsBytes(); + if (this.regionsInTransitionInRS.containsKey(encodedName)) { + boolean openAction = this.regionsInTransitionInRS.get(encodedName); + // The below exception message will be used in master. + throw new RegionAlreadyInTransitionException("Received:" + currentAction + + " for the region:" + region.getRegionNameAsString() + + " ,which we are already trying to " + + (openAction ? OPEN : CLOSE)+ "."); + } + } @Override @QosPriority(priority=HIGH_QOS) @@ -2408,9 +2430,7 @@ throw new NotServingRegionException("Received close for " + region.getRegionNameAsString() + " but we are not serving it"); } - if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { - throw new RegionAlreadyInTransitionException("close", region.getEncodedName()); - } + checkIfRegionAlreadyInTransition(region, CLOSE); return closeRegion(region, false, zk); } @@ -2430,12 +2450,12 @@ */ protected boolean closeRegion(HRegionInfo region, final boolean abort, final boolean zk) { - if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) { LOG.warn("Received close for region we are already opening or closing; " + region.getEncodedName()); return false; } - this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes()); + this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false); CloseRegionHandler crh = null; if (region.isRootRegion()) { crh = new CloseRootHandler(this, this, region, abort, zk); @@ -3031,7 +3051,7 @@ } - public Set getRegionsInTransitionInRS() { + public ConcurrentSkipListMap getRegionsInTransitionInRS() { return this.regionsInTransitionInRS; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java (revision 1172481) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionAlreadyInTransitionException.java (working copy) @@ -27,8 +27,8 @@ */ public class RegionAlreadyInTransitionException extends IOException { - public RegionAlreadyInTransitionException(String action, String region) { - super("Received " + action + " for region we are" + - " already opening or closing; " + region); + public RegionAlreadyInTransitionException(String s) { + super(s); } + } Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (revision 1172481) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (working copy) @@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.zookeeper.KeeperException; -import java.util.Set; /** * Services provided by {@link HRegionServer} @@ -74,7 +74,8 @@ /** * Get the regions that are currently being opened or closed in the RS - * @return set of regions in transition in this RS + * @return map of regions in transition in this RS */ - public Set getRegionsInTransitionInRS(); -} \ No newline at end of file + public Map getRegionsInTransitionInRS(); + +} Index: src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (revision 1172481) +++ src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java (working copy) @@ -49,6 +49,8 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -239,7 +241,7 @@ HRegionInfo hri = getNonMetaRegion(hr0.getOnlineRegions()); // fake that hr1 is processing the region - hr1.getRegionsInTransitionInRS().add(hri.getEncodedNameAsBytes()); + hr1.getRegionsInTransitionInRS().putIfAbsent(hri.getEncodedNameAsBytes(), true); AtomicBoolean reopenEventProcessed = new AtomicBoolean(false); EventHandlerListener openListener = @@ -252,13 +254,10 @@ TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), Bytes.toBytes(hr1.getServerName().toString())); - while (!reopenEventProcessed.get()) { - Threads.sleep(100); - } - // make sure the region came back - assertTrue(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()) == null); + assertEquals(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()), null); + // remove the block and reset the boolean hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes()); reopenEventProcessed.set(false); Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/MockRegionServerServices.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/MockRegionServerServices.java (revision 1172481) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/MockRegionServerServices.java (working copy) @@ -19,9 +19,8 @@ import java.io.IOException; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; @@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -42,7 +42,8 @@ class MockRegionServerServices implements RegionServerServices { private final Map regions = new HashMap(); private boolean stopping = false; - private final Set rit = new HashSet(); + private final ConcurrentSkipListMap rit = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); @Override public boolean removeFromOnlineRegions(String encodedRegionName) { @@ -80,7 +81,7 @@ } @Override - public Set getRegionsInTransitionInRS() { + public ConcurrentSkipListMap getRegionsInTransitionInRS() { return rit; } @@ -138,4 +139,5 @@ // TODO Auto-generated method stub return false; } -} \ No newline at end of file + +}