Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java (revision 0) @@ -0,0 +1,69 @@ +package org.apache.hadoop.hbase.zookeeper; + + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestZKTable { + private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testTableStates() + throws ZooKeeperConnectionException, IOException, KeeperException { + final String name = "testDisabled"; + Abortable abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + }; + ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + name, abortable); + assertTrue(ZKTable.isEnabledTable(zkw, name)); + assertFalse(ZKTable.isDisabingTable(zkw, name)); + assertFalse(ZKTable.isDisabledTable(zkw, name)); + assertFalse(ZKTable.isEnablingTable(zkw, name)); + assertFalse(ZKTable.isDisablingOrDisabledTable(zkw, name)); + assertFalse(ZKTable.isDisabledOrEnablingTable(zkw, name)); + ZKTable.disablingTable(zkw, name); + assertTrue(ZKTable.isDisabingTable(zkw, name)); + assertTrue(ZKTable.isDisablingOrDisabledTable(zkw, name)); + assertFalse(ZKTable.getDisabledTables(zkw).contains(name)); + ZKTable.disableTable(zkw, name); + assertTrue(ZKTable.isDisabledTable(zkw, name)); + assertTrue(ZKTable.isDisablingOrDisabledTable(zkw, name)); + assertFalse(ZKTable.isDisabingTable(zkw, name)); + assertTrue(ZKTable.getDisabledTables(zkw).contains(name)); + ZKTable.enablingTable(zkw, name); + assertTrue(ZKTable.isEnablingTable(zkw, name)); + assertTrue(ZKTable.isDisabledOrEnablingTable(zkw, name)); + assertFalse(ZKTable.isDisabledTable(zkw, name)); + assertFalse(ZKTable.getDisabledTables(zkw).contains(name)); + ZKTable.enableTable(zkw, name); + assertTrue(ZKTable.isEnabledTable(zkw, name)); + assertFalse(ZKTable.isEnablingTable(zkw, name)); + } +} \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (revision 1032127) +++ src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (working copy) @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKTableDisable; +import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; @@ -324,7 +324,7 @@ log("Beginning to mock scenarios"); // Disable the disabledTable in ZK - ZKTableDisable.disableTable(zkw, Bytes.toString(disabledTable)); + ZKTable.disableTable(zkw, Bytes.toString(disabledTable)); /* * ZK = OFFLINE @@ -652,7 +652,7 @@ log("Beginning to mock scenarios"); // Disable the disabledTable in ZK - ZKTableDisable.disableTable(zkw, Bytes.toString(disabledTable)); + ZKTable.disableTable(zkw, Bytes.toString(disabledTable)); /* * ZK = CLOSING Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableDisable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableDisable.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableDisable.java (working copy) @@ -1,70 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.zookeeper; - -import java.util.List; - -import org.apache.zookeeper.KeeperException; - -/** - * Helper class for table disable tracking in zookeeper. - *

- * The node /disabled will contain a child node for every table which should be - * disabled, for example, /disabled/table. - */ -public class ZKTableDisable { - - /** - * Sets the specified table as disabled in zookeeper. Fails silently if the - * table is already disabled in zookeeper. Sets no watches. - * @param zkw - * @param tableName - * @throws KeeperException unexpected zookeeper exception - */ - public static void disableTable(ZooKeeperWatcher zkw, String tableName) - throws KeeperException { - ZKUtil.createAndFailSilent(zkw, ZKUtil.joinZNode(zkw.tableZNode, - tableName)); - } - - /** - * Unsets the specified table as disabled in zookeeper. Fails silently if the - * table is not currently disabled in zookeeper. Sets no watches. - * @param zkw - * @param tableName - * @throws KeeperException unexpected zookeeper exception - */ - public static void undisableTable(ZooKeeperWatcher zkw, String tableName) - throws KeeperException { - ZKUtil.deleteNodeFailSilent(zkw, ZKUtil.joinZNode(zkw.tableZNode, - tableName)); - } - - /** - * Gets a list of all the tables set as disabled in zookeeper. - * @param zkw - * @return list of disabled tables, empty list if none - * @throws KeeperException - */ - public static List getDisabledTables(ZooKeeperWatcher zkw) - throws KeeperException { - return ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode); - } -} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -713,6 +713,22 @@ } /** + * Set data into node creating node if it doesn't yet exist. + * @param zkw zk reference + * @param znode path of node + * @param data data to set for node + * @throws KeeperException + */ + public static void createSetData(final ZooKeeperWatcher zkw, final String znode, + final byte [] data) + throws KeeperException { + if (checkExists(zkw, znode) != -1) { + ZKUtil.createWithParents(zkw, znode); + } + ZKUtil.setData(zkw, znode, data); + } + + /** * Sets the data of the existing znode to be the specified data. The node * must exist but no checks are done on the existing data or version. * @@ -902,8 +918,7 @@ * @param znode path of node * @throws KeeperException if unexpected zookeeper exception */ - public static void createWithParents(ZooKeeperWatcher zkw, - String znode) + public static void createWithParents(ZooKeeperWatcher zkw, String znode) throws KeeperException { try { if(znode == null) { Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java (revision 0) @@ -0,0 +1,198 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +/** + * Helper class for table disable tracking. + * + * A znode will exist under the table directory if it is in any of the following + * states: ENABLING, DISABLING, or DISABLED. If ENABLED, there will be no + * entry for a table in zk. + */ +public class ZKTable { + private static final Log LOG = LogFactory.getLog(ZKTable.class); + // TODO: Make it so always a table znode. Put table schema here as well as table state. + // Have watcher on table znode so all are notified of state or schema change. + private static final String DISABLED = "disabled"; + private static final String DISABLING = "disabling"; + private static final String ENABLING = "enabling"; + + /** + * Sets the specified table as DISABLED in zookeeper. Fails silently if the + * table is already disabled in zookeeper. Sets no watches. + * @param zkw + * @param tableName + * @throws KeeperException unexpected zookeeper exception + */ + public static void disableTable(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + if (!isDisablingOrDisabledTable(zkw, tableName)) { + LOG.warn("Moving table " + tableName + " state to disabled but was not " + + "first in disabling state: " + getDataAsString(zkw, tableName)); + } + setTableState(zkw, tableName, DISABLED); + } + + /** + * Sets the specified table as DISABLING in zookeeper. Fails silently if the + * table is already disabled in zookeeper. Sets no watches. + * @param zkw + * @param tableName + * @throws KeeperException unexpected zookeeper exception + */ + public static void disablingTable(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + if (!isEnabledOrDisablingTable(zkw, tableName)) { + LOG.warn("Moving table " + tableName + " state to disabling but was not " + + "first in enabled state: " + getDataAsString(zkw, tableName)); + } + setTableState(zkw, tableName, DISABLING); + } + + /** + * Sets the specified table as ENABLING in zookeeper. Fails silently if the + * table is already disabled in zookeeper. Sets no watches. + * @param zkw + * @param tableName + * @throws KeeperException unexpected zookeeper exception + */ + public static void enablingTable(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + if (!isDisabledOrEnablingTable(zkw, tableName)) { + LOG.warn("Moving table " + tableName + " state to disabling but was not " + + "first in enabled state: " + getDataAsString(zkw, tableName)); + } + setTableState(zkw, tableName, ENABLING); + } + + private static void setTableState(ZooKeeperWatcher zkw, String tableName, + final String state) + throws KeeperException { + String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName); + if (ZKUtil.checkExists(zkw, znode) == -1) { + ZKUtil.createAndFailSilent(zkw, znode); + } + ZKUtil.setData(zkw, znode, Bytes.toBytes(state)); + } + + public static boolean isDisabledTable(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + return isTableState(zkw, tableName, DISABLED); + } + + public static boolean isDisabingTable(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + return isTableState(zkw, tableName, DISABLING); + } + + public static boolean isEnablingTable(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + return isTableState(zkw, tableName, ENABLING); + } + + public static boolean isEnabledTable(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName); + return ZKUtil.checkExists(zkw, znode) == -1; + } + + public static boolean isDisablingOrDisabledTable(ZooKeeperWatcher zkw, + final String tableName) + throws KeeperException { + byte [] data = getData(zkw, tableName); + if (data == null) return false; + String state = new String(data); + return state.equals(DISABLING) || state.equals(DISABLED); + } + + public static boolean isEnabledOrDisablingTable(ZooKeeperWatcher zkw, + final String tableName) + throws KeeperException { + byte [] data = getData(zkw, tableName); + if (data == null) return true; // This is 'enabled' + String state = new String(data); + return state.equals(DISABLING); + } + + public static boolean isDisabledOrEnablingTable(ZooKeeperWatcher zkw, + final String tableName) + throws KeeperException { + byte [] data = getData(zkw, tableName); + if (data == null) return false; + String state = new String(data); + return state.equals(ENABLING) || state.equals(DISABLED); + } + + private static boolean isTableState(ZooKeeperWatcher zkw, String tableName, + final String state) + throws KeeperException { + byte [] data = getData(zkw, tableName); + return data == null? false: Bytes.toString(data).equals(state); + } + + private static byte [] getData(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName); + return ZKUtil.getData(zkw, znode); + } + + private static String getDataAsString(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + byte [] data = getData(zkw, tableName); + return data == null? null: Bytes.toString(data); + } + + /** + * Enables the table in zookeeper. Fails silently if the + * table is not currently disabled in zookeeper. Sets no watches. + * @param zkw + * @param tableName + * @throws KeeperException unexpected zookeeper exception + */ + public static void enableTable(ZooKeeperWatcher zkw, String tableName) + throws KeeperException { + ZKUtil.deleteNodeFailSilent(zkw, ZKUtil.joinZNode(zkw.tableZNode, + tableName)); + } + + /** + * Gets a list of all the tables set as disabled in zookeeper. + * @param zkw + * @return list of disabled tables, empty list if none + * @throws KeeperException + */ + public static List getDisabledTables(ZooKeeperWatcher zkw) + throws KeeperException { + List disabledTables = new ArrayList(); + List children = ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode); + for (String child: children) { + if (isDisabledTable(zkw, child)) disabledTables.add(child); + } + return disabledTables; + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -55,12 +55,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -2537,47 +2536,6 @@ } /** - * Utility method used by HMaster marking regions offlined. - * @param srvr META server to be updated - * @param metaRegionName Meta region name - * @param info HRegion to update in meta - * - * @throws IOException - */ - public static void offlineRegionInMETA(final HRegionInterface srvr, - final byte [] metaRegionName, final HRegionInfo info) - throws IOException { - // Puts and Deletes used to be "atomic" here. We can use row locks if - // we need to keep that property, or we can expand Puts and Deletes to - // allow them to be committed at once. - byte [] row = info.getRegionName(); - Put put = new Put(row); - info.setOffline(true); - put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, - Writables.getBytes(info)); - srvr.put(metaRegionName, put); - cleanRegionInMETA(srvr, metaRegionName, info); - } - - /** - * Clean COL_SERVER and COL_STARTCODE for passed info in - * .META. - * @param srvr - * @param metaRegionName - * @param info - * @throws IOException - */ - public static void cleanRegionInMETA(final HRegionInterface srvr, - final byte [] metaRegionName, final HRegionInfo info) - throws IOException { - Delete del = new Delete(info.getRegionName()); - del.deleteColumns(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - del.deleteColumns(HConstants.CATALOG_FAMILY, - HConstants.STARTCODE_QUALIFIER); - srvr.delete(metaRegionName, del); - } - - /** * Deletes all the files for a HRegion * * @param fs the file system object Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -56,9 +56,9 @@ import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.RegionTransitionData; -import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; @@ -67,11 +67,11 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKTableDisable; +import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.AsyncCallback; @@ -674,7 +674,7 @@ * @param destination * @param regions Regions to assign. */ - public void assign(final HServerInfo destination, + void assign(final HServerInfo destination, final List regions) { LOG.debug("Bulk assigning " + regions.size() + " region(s) to " + destination.getServerName()); @@ -1104,8 +1104,9 @@ * This is a synchronous call and will return once every region has been * assigned. If anything fails, an exception is thrown and the cluster * should be shutdown. + * @throws InterruptedException */ - public void assignAllUserRegions() throws IOException { + public void assignAllUserRegions() throws IOException, InterruptedException { // First experiment at synchronous assignment // Simpler because just wait for no regions in transition @@ -1120,51 +1121,140 @@ servers.size() + " server(s)"); // Generate a cluster startup region placement plan - Map> bulkPlan = + final Map> bulkPlan = LoadBalancer.bulkAssignment(allRegions, servers); + + // Use fixed count thread pool assigning. + BulkAssigner ba = new BulkAssigner(this.master) { + @Override + public boolean bulkAssign() throws InterruptedException { + // Disable timing out regions in transition up in zk while bulk assigning. + timeoutMonitor.bulkAssign(true); + try { + return super.bulkAssign(); + } finally { + // Reenable timing out regions in transition up in zi. + timeoutMonitor.bulkAssign(false); + } + } - // Make a fixed thread count pool to run bulk assignments. Thought is that - // if a 1k cluster, running 1k bulk concurrent assignment threads will kill - // master, HDFS or ZK? - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setDaemon(true); - builder.setNameFormat(this.master.getServerName() + "-BulkAssigner-%1$d"); - builder.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override - public void uncaughtException(Thread t, Throwable e) { - // Abort if exception of any kind. - master.abort("Uncaught exception bulk assigning in " + t.getName(), e); + protected String getThreadNamePrefix() { + return super.getThreadNamePrefix() + "-startup"; + } + + @Override + protected void populatePool(java.util.concurrent.ExecutorService pool) { + for (Map.Entry> e: bulkPlan.entrySet()) { + pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue())); + } } - }); - int threadCount = - this.master.getConfiguration().getInt("hbase.bulk.assignment.threadpool.size", 20); - java.util.concurrent.ExecutorService pool = - Executors.newFixedThreadPool(threadCount, builder.build()); - // Disable timing out regions in transition up in zk while bulk assigning. - this.timeoutMonitor.bulkAssign(true); - try { - for (Map.Entry> e: bulkPlan.entrySet()) { - pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue())); + + protected boolean waitUntilDone(final long timeout) + throws InterruptedException { + return waitUntilNoRegionsInTransition(timeout); } - // Wait for no regions to be in transition + }; + ba.bulkAssign(); + LOG.info("Bulk assigning done"); + } + + boolean waitUntilNoRegionsInTransition(final long timeout) + throws InterruptedException { + // Blocks until there are no regions in transition. It is possible that + // there + // are regions in transition immediately after this returns but guarantees + // that if it returns without an exception that there was a period of time + // with no regions in transition from the point-of-view of the in-memory + // state of the Master. + long startTime = System.currentTimeMillis(); + long remaining = timeout; + synchronized (regionsInTransition) { + while (regionsInTransition.size() > 0 && !this.master.isStopped() + && remaining > 0) { + regionsInTransition.wait(remaining); + remaining = timeout - (System.currentTimeMillis() - startTime); + } + } + return regionsInTransition.isEmpty(); + } + + /** + * Base class used bulk assigning and unassigning. + * Encapsulates a fixed size thread pool of executors to run assignment. + * Implement {@link #populatePool(java.util.concurrent.ExecutorService)}. + */ + public static abstract class BulkAssigner { + final Server server; + + /** + * @param server An instance of Server + * @param regionsInTransition A reference to {@link AssignmentManager#regionsInTransition} + */ + public BulkAssigner(final Server server) { + this.server = server; + } + + protected String getThreadNamePrefix() { + return this.server.getServerName() + "-BulkAssigner"; + } + + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + // Abort if exception of any kind. + server.abort("Uncaught exception in " + t.getName(), e); + } + }; + } + + protected int getThreadCount() { + return this.server.getConfiguration(). + getInt("hbase.bulk.assignment.threadpool.size", 20); + } + + protected long getTimeoutOnRIT() { + return this.server.getConfiguration(). + getLong("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000); + } + + protected abstract void populatePool(final java.util.concurrent.ExecutorService pool); + + /** + * Run the bulk assign. + * @throws InterruptedException + * @return True if done. + */ + public boolean bulkAssign() throws InterruptedException { + boolean result = false; + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setDaemon(true); + builder.setNameFormat(getThreadNamePrefix() + "-%1$d"); + builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler()); + int threadCount = getThreadCount(); + java.util.concurrent.ExecutorService pool = + Executors.newFixedThreadPool(threadCount, builder.build()); try { - // How long to wait on empty regions-in-transition. When we timeout, - // we'll put back in place the monitor of R-I-T. It should do fixup - // if server crashed during bulk assign, etc. - long timeout = - this.master.getConfiguration().getInt("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000); - waitUntilNoRegionsInTransition(timeout); - } catch (InterruptedException e) { - LOG.error("Interrupted waiting for regions to be assigned", e); - throw new IOException(e); + populatePool(pool); + // How long to wait on empty regions-in-transition. If we timeout, the + // RIT monitor should do fixup. + result = waitUntilDone(getTimeoutOnRIT()); + } finally { + // We're done with the pool. It'll exit when its done all in queue. + pool.shutdown(); } - } finally { - // We're done with the pool. It'll exit when its done all in queue. - pool.shutdown(); - // Reenable timing out regions in transition up in zi. - this.timeoutMonitor.bulkAssign(false); + return result; } - LOG.info("Bulk assigning done"); + + /** + * Wait until bulk assign is done. + * @param timeout How long to wait. + * @throws InterruptedException + * @return True if the condition we were waiting on happened. + */ + protected abstract boolean waitUntilDone(final long timeout) + throws InterruptedException; } /** @@ -1281,28 +1371,6 @@ } /** - * Blocks until there are no regions in transition. It is possible that there - * are regions in transition immediately after this returns but guarantees - * that if it returns without an exception that there was a period of time - * with no regions in transition from the point-of-view of the in-memory - * state of the Master. - * @param timeout How long to wait on empty regions-in-transition. - * @throws InterruptedException - */ - public void waitUntilNoRegionsInTransition(final long timeout) - throws InterruptedException { - long startTime = System.currentTimeMillis(); - long remaining = timeout; - synchronized (this.regionsInTransition) { - while(this.regionsInTransition.size() > 0 && - !this.master.isStopped() && remaining > 0) { - this.regionsInTransition.wait(remaining); - remaining = timeout - (System.currentTimeMillis() - startTime); - } - } - } - - /** * @return A copy of the Map of regions currently in transition. */ public NavigableMap getRegionsInTransition() { @@ -1365,13 +1433,13 @@ * @return */ public boolean isTableDisabled(String tableName) { - synchronized(disabledTables) { + synchronized (disabledTables) { return disabledTables.contains(tableName); } } /** - * Wait on regions to clean regions-in-transition. + * Wait on region to clear regions-in-transition. * @param hri Region to wait on. * @throws IOException */ @@ -1406,38 +1474,66 @@ * @param tableName table to be disabled */ public void disableTable(String tableName) { - synchronized(disabledTables) { - if(!isTableDisabled(tableName)) { - disabledTables.add(tableName); + synchronized (disabledTables) { + if (!isTableDisabled(tableName)) { try { - ZKTableDisable.disableTable(master.getZooKeeper(), tableName); + ZKTable.disableTable(master.getZooKeeper(), tableName); } catch (KeeperException e) { LOG.warn("ZK error setting table as disabled", e); } + this.disabledTables.add(tableName); } } } /** - * Unsets the specified table from being disabled. - *

- * This operation only acts on the in-memory - * @param tableName table to be undisabled + * Sets the specified table to be disabling. Not the same as + * disabled. + * @see {@link #disableTable(String)} + * @param tableName table to be disabling */ - public void undisableTable(String tableName) { + public void disablingTable(final String tableName) { + try { + ZKTable.disablingTable(this.master.getZooKeeper(), tableName); + } catch (KeeperException e) { + LOG.warn("ZK error setting table as disabling", e); + } + } + + /** + * Sets the specified table to be disabling. Not the same as + * disabled. + * @see {@link #disableTable(String)} + * @param tableName table to be disabling + */ + public void enablingTable(final String tableName) { synchronized(disabledTables) { - if(isTableDisabled(tableName)) { + if (isTableDisabled(tableName)) { disabledTables.remove(tableName); try { - ZKTableDisable.undisableTable(master.getZooKeeper(), tableName); + ZKTable.enablingTable(master.getZooKeeper(), tableName); } catch (KeeperException e) { - LOG.warn("ZK error setting table as disabled", e); + LOG.warn("ZK error setting table as enabling", e); } } } } /** + * Unsets the specified table from being disabled. + *

+ * This operation only acts on the in-memory + * @param tableName table to be undisabled + */ + public void enableTable(String tableName) { + try { + ZKTable.enableTable(this.master.getZooKeeper(), tableName); + } catch (KeeperException e) { + LOG.warn("ZK error setting table as enabled", e); + } + } + + /** * Rebuild the set of disabled tables from zookeeper. Used during master * failover. */ @@ -1445,7 +1541,7 @@ synchronized(disabledTables) { List disabledTables; try { - disabledTables = ZKTableDisable.getDisabledTables(master.getZooKeeper()); + disabledTables = ZKTable.getDisabledTables(master.getZooKeeper()); } catch (KeeperException e) { LOG.warn("ZK error getting list of disabled tables", e); return; @@ -1460,13 +1556,19 @@ /** * Gets the online regions of the specified table. + * This method looks at the in-memory state. It does not go to .META.. + * Only returns online regions. If a region on this table has been + * closed during a disable, etc., it will be included in the returned list. + * So, the returned list may not necessarily be ALL regions in this table, its + * all the ONLINE regions in the table. * @param tableName - * @return + * @return Online regions from tableName */ public List getRegionsOfTable(byte[] tableName) { List tableRegions = new ArrayList(); - for(HRegionInfo regionInfo : regions.tailMap(new HRegionInfo( - new HTableDescriptor(tableName), null, null)).keySet()) { + HRegionInfo boundary = + new HRegionInfo(new HTableDescriptor(tableName), null, null); + for (HRegionInfo regionInfo: this.regions.tailMap(boundary).keySet()) { if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) { tableRegions.add(regionInfo); } else { Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -795,13 +795,13 @@ } public void enableTable(final byte [] tableName) throws IOException { - new EnableTableHandler(this, tableName, catalogTracker, assignmentManager) - .process(); + new EnableTableHandler(this, tableName, catalogTracker, + assignmentManager).process(); } public void disableTable(final byte [] tableName) throws IOException { - new DisableTableHandler(this, tableName, catalogTracker, assignmentManager) - .process(); + new DisableTableHandler(this, tableName, catalogTracker, + assignmentManager).process(); } /** Index: src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (working copy) @@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ZKTableDisable; -import org.apache.zookeeper.KeeperException; public class DeleteTableHandler extends TableEventHandler { private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class); @@ -73,6 +71,6 @@ // If entry for this table in zk, and up in AssignmentManager, remove it. // Call to undisableTable does this. TODO: Make a more formal purge table. - am.undisableTable(Bytes.toString(tableName)); + am.enableTable(Bytes.toString(tableName)); } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,10 +34,11 @@ import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.util.Bytes; - +/** + * Handler to run disable of a table. + */ public class DisableTableHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(DisableTableHandler.class); - private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; @@ -52,7 +54,7 @@ // TODO: do we want to keep this in-memory as well? i guess this is // part of old master rewrite, schema to zk to check for table // existence and such - if(!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { + if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { throw new TableNotFoundException(Bytes.toString(tableName)); } } @@ -60,32 +62,79 @@ @Override public void process() { try { - LOG.info("Attemping to disable the table " + this.tableNameStr); + LOG.info("Attemping to disable table " + this.tableNameStr); handleDisableTable(); } catch (IOException e) { - LOG.error("Error trying to disable the table " + this.tableNameStr, e); + LOG.error("Error trying to disable table " + this.tableNameStr, e); } } private void handleDisableTable() throws IOException { if (this.assignmentManager.isTableDisabled(this.tableNameStr)) { - LOG.info("Table " + tableNameStr + " is already disabled; skipping disable"); + LOG.info("Table " + tableNameStr + " already disabled; skipping disable"); return; } - // Set the table as disabled so it doesn't get re-onlined - assignmentManager.disableTable(this.tableNameStr); - // Get the online regions of this table. - // TODO: What if region splitting at the time we get this listing? - // TODO: Remove offline flag from HRI - // TODO: Confirm we have parallel closing going on. - List regions = assignmentManager.getRegionsOfTable(tableName); - // Unassign the online regions - for(HRegionInfo region: regions) { - assignmentManager.unassign(region); + // Set table disabling flag up in zk. + this.assignmentManager.disablingTable(this.tableNameStr); + int online = -1; + do { + // Get list of online regions that are of this table. Regions that are + // already closed will not be included in this list; i.e. the returned + // list is not ALL regions in a table, its all online regions according to + // the in-memory state on this master. + final List regions = + this.assignmentManager.getRegionsOfTable(tableName); + online = regions.size(); + BulkDisabler bd = new BulkDisabler(this.server, regions); + try { + bd.bulkAssign(); + } catch (InterruptedException e) { + LOG.warn("Disable was interrupted"); + // Preserve the interrupt. + Thread.currentThread().interrupt(); + } + } while (online != 0); + // Flip the table to disabled. + this.assignmentManager.disableTable(this.tableNameStr); + } + + /** + * Run bulk disable. + */ + class BulkDisabler extends AssignmentManager.BulkAssigner { + private final List regions; + + BulkDisabler(final Server server, final List regions) { + super(server); + this.regions = regions; } - // Wait on table's regions to clear region in transition. - for (HRegionInfo region: regions) { - this.assignmentManager.waitOnRegionToClearRegionsInTransition(region); + + @Override + protected void populatePool(ExecutorService pool) { + for (HRegionInfo region: regions) { + if (assignmentManager.isRegionInTransition(region) != null) continue; + final HRegionInfo hri = region; + pool.execute(new Runnable() { + public void run() { + assignmentManager.unassign(hri); + } + }); + } } + + @Override + protected boolean waitUntilDone(long timeout) + throws InterruptedException { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + List regions = null; + while (!server.isStopped() && remaining > 0) { + Thread.sleep(1000); + regions = assignmentManager.getRegionsOfTable(tableName); + if (regions.isEmpty()) break; + remaining = timeout - (System.currentTimeMillis() - startTime); + } + return regions != null && regions.isEmpty(); + } } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy) @@ -36,7 +36,6 @@ public class EnableTableHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(EnableTableHandler.class); - private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1032127) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -62,7 +62,7 @@ import org.apache.hadoop.hbase.util.SoftValueSortedMap; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; -import org.apache.hadoop.hbase.zookeeper.ZKTableDisable; +import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; @@ -484,7 +484,7 @@ return true; } try { - List tables = ZKTableDisable.getDisabledTables(this.zooKeeper); + List tables = ZKTable.getDisabledTables(this.zooKeeper); String searchStr = Bytes.toString(tableName); boolean disabled = tables.contains(searchStr); return online? !disabled: disabled;