Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTable.java (revision 0)
@@ -0,0 +1,69 @@
+package org.apache.hadoop.hbase.zookeeper;
+
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestZKTable {
+ private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @Test
+ public void testTableStates()
+ throws ZooKeeperConnectionException, IOException, KeeperException {
+ final String name = "testDisabled";
+ Abortable abortable = new Abortable() {
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.info(why, e);
+ }
+ };
+ ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+ name, abortable);
+ assertTrue(ZKTable.isEnabledTable(zkw, name));
+ assertFalse(ZKTable.isDisabingTable(zkw, name));
+ assertFalse(ZKTable.isDisabledTable(zkw, name));
+ assertFalse(ZKTable.isEnablingTable(zkw, name));
+ assertFalse(ZKTable.isDisablingOrDisabledTable(zkw, name));
+ assertFalse(ZKTable.isDisabledOrEnablingTable(zkw, name));
+ ZKTable.disablingTable(zkw, name);
+ assertTrue(ZKTable.isDisabingTable(zkw, name));
+ assertTrue(ZKTable.isDisablingOrDisabledTable(zkw, name));
+ assertFalse(ZKTable.getDisabledTables(zkw).contains(name));
+ ZKTable.disableTable(zkw, name);
+ assertTrue(ZKTable.isDisabledTable(zkw, name));
+ assertTrue(ZKTable.isDisablingOrDisabledTable(zkw, name));
+ assertFalse(ZKTable.isDisabingTable(zkw, name));
+ assertTrue(ZKTable.getDisabledTables(zkw).contains(name));
+ ZKTable.enablingTable(zkw, name);
+ assertTrue(ZKTable.isEnablingTable(zkw, name));
+ assertTrue(ZKTable.isDisabledOrEnablingTable(zkw, name));
+ assertFalse(ZKTable.isDisabledTable(zkw, name));
+ assertFalse(ZKTable.getDisabledTables(zkw).contains(name));
+ ZKTable.enableTable(zkw, name);
+ assertTrue(ZKTable.isEnabledTable(zkw, name));
+ assertFalse(ZKTable.isEnablingTable(zkw, name));
+ }
+}
\ No newline at end of file
Index: src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (revision 1032127)
+++ src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (working copy)
@@ -48,7 +48,7 @@
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
@@ -324,7 +324,7 @@
log("Beginning to mock scenarios");
// Disable the disabledTable in ZK
- ZKTableDisable.disableTable(zkw, Bytes.toString(disabledTable));
+ ZKTable.disableTable(zkw, Bytes.toString(disabledTable));
/*
* ZK = OFFLINE
@@ -652,7 +652,7 @@
log("Beginning to mock scenarios");
// Disable the disabledTable in ZK
- ZKTableDisable.disableTable(zkw, Bytes.toString(disabledTable));
+ ZKTable.disableTable(zkw, Bytes.toString(disabledTable));
/*
* ZK = CLOSING
Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableDisable.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableDisable.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableDisable.java (working copy)
@@ -1,70 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.util.List;
-
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Helper class for table disable tracking in zookeeper.
- *
- * The node /disabled will contain a child node for every table which should be
- * disabled, for example, /disabled/table.
- */
-public class ZKTableDisable {
-
- /**
- * Sets the specified table as disabled in zookeeper. Fails silently if the
- * table is already disabled in zookeeper. Sets no watches.
- * @param zkw
- * @param tableName
- * @throws KeeperException unexpected zookeeper exception
- */
- public static void disableTable(ZooKeeperWatcher zkw, String tableName)
- throws KeeperException {
- ZKUtil.createAndFailSilent(zkw, ZKUtil.joinZNode(zkw.tableZNode,
- tableName));
- }
-
- /**
- * Unsets the specified table as disabled in zookeeper. Fails silently if the
- * table is not currently disabled in zookeeper. Sets no watches.
- * @param zkw
- * @param tableName
- * @throws KeeperException unexpected zookeeper exception
- */
- public static void undisableTable(ZooKeeperWatcher zkw, String tableName)
- throws KeeperException {
- ZKUtil.deleteNodeFailSilent(zkw, ZKUtil.joinZNode(zkw.tableZNode,
- tableName));
- }
-
- /**
- * Gets a list of all the tables set as disabled in zookeeper.
- * @param zkw
- * @return list of disabled tables, empty list if none
- * @throws KeeperException
- */
- public static List getDisabledTables(ZooKeeperWatcher zkw)
- throws KeeperException {
- return ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
- }
-}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy)
@@ -713,6 +713,22 @@
}
/**
+ * Set data into node creating node if it doesn't yet exist.
+ * @param zkw zk reference
+ * @param znode path of node
+ * @param data data to set for node
+ * @throws KeeperException
+ */
+ public static void createSetData(final ZooKeeperWatcher zkw, final String znode,
+ final byte [] data)
+ throws KeeperException {
+ if (checkExists(zkw, znode) != -1) {
+ ZKUtil.createWithParents(zkw, znode);
+ }
+ ZKUtil.setData(zkw, znode, data);
+ }
+
+ /**
* Sets the data of the existing znode to be the specified data. The node
* must exist but no checks are done on the existing data or version.
*
@@ -902,8 +918,7 @@
* @param znode path of node
* @throws KeeperException if unexpected zookeeper exception
*/
- public static void createWithParents(ZooKeeperWatcher zkw,
- String znode)
+ public static void createWithParents(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
if(znode == null) {
Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java (revision 0)
@@ -0,0 +1,198 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Helper class for table disable tracking.
+ *
+ * A znode will exist under the table directory if it is in any of the following
+ * states: ENABLING, DISABLING, or DISABLED. If ENABLED, there will be no
+ * entry for a table in zk.
+ */
+public class ZKTable {
+ private static final Log LOG = LogFactory.getLog(ZKTable.class);
+ // TODO: Make it so always a table znode. Put table schema here as well as table state.
+ // Have watcher on table znode so all are notified of state or schema change.
+ private static final String DISABLED = "disabled";
+ private static final String DISABLING = "disabling";
+ private static final String ENABLING = "enabling";
+
+ /**
+ * Sets the specified table as DISABLED in zookeeper. Fails silently if the
+ * table is already disabled in zookeeper. Sets no watches.
+ * @param zkw
+ * @param tableName
+ * @throws KeeperException unexpected zookeeper exception
+ */
+ public static void disableTable(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ if (!isDisablingOrDisabledTable(zkw, tableName)) {
+ LOG.warn("Moving table " + tableName + " state to disabled but was not " +
+ "first in disabling state: " + getDataAsString(zkw, tableName));
+ }
+ setTableState(zkw, tableName, DISABLED);
+ }
+
+ /**
+ * Sets the specified table as DISABLING in zookeeper. Fails silently if the
+ * table is already disabled in zookeeper. Sets no watches.
+ * @param zkw
+ * @param tableName
+ * @throws KeeperException unexpected zookeeper exception
+ */
+ public static void disablingTable(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ if (!isEnabledOrDisablingTable(zkw, tableName)) {
+ LOG.warn("Moving table " + tableName + " state to disabling but was not " +
+ "first in enabled state: " + getDataAsString(zkw, tableName));
+ }
+ setTableState(zkw, tableName, DISABLING);
+ }
+
+ /**
+ * Sets the specified table as ENABLING in zookeeper. Fails silently if the
+ * table is already disabled in zookeeper. Sets no watches.
+ * @param zkw
+ * @param tableName
+ * @throws KeeperException unexpected zookeeper exception
+ */
+ public static void enablingTable(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ if (!isDisabledOrEnablingTable(zkw, tableName)) {
+ LOG.warn("Moving table " + tableName + " state to disabling but was not " +
+ "first in enabled state: " + getDataAsString(zkw, tableName));
+ }
+ setTableState(zkw, tableName, ENABLING);
+ }
+
+ private static void setTableState(ZooKeeperWatcher zkw, String tableName,
+ final String state)
+ throws KeeperException {
+ String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName);
+ if (ZKUtil.checkExists(zkw, znode) == -1) {
+ ZKUtil.createAndFailSilent(zkw, znode);
+ }
+ ZKUtil.setData(zkw, znode, Bytes.toBytes(state));
+ }
+
+ public static boolean isDisabledTable(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ return isTableState(zkw, tableName, DISABLED);
+ }
+
+ public static boolean isDisabingTable(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ return isTableState(zkw, tableName, DISABLING);
+ }
+
+ public static boolean isEnablingTable(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ return isTableState(zkw, tableName, ENABLING);
+ }
+
+ public static boolean isEnabledTable(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName);
+ return ZKUtil.checkExists(zkw, znode) == -1;
+ }
+
+ public static boolean isDisablingOrDisabledTable(ZooKeeperWatcher zkw,
+ final String tableName)
+ throws KeeperException {
+ byte [] data = getData(zkw, tableName);
+ if (data == null) return false;
+ String state = new String(data);
+ return state.equals(DISABLING) || state.equals(DISABLED);
+ }
+
+ public static boolean isEnabledOrDisablingTable(ZooKeeperWatcher zkw,
+ final String tableName)
+ throws KeeperException {
+ byte [] data = getData(zkw, tableName);
+ if (data == null) return true; // This is 'enabled'
+ String state = new String(data);
+ return state.equals(DISABLING);
+ }
+
+ public static boolean isDisabledOrEnablingTable(ZooKeeperWatcher zkw,
+ final String tableName)
+ throws KeeperException {
+ byte [] data = getData(zkw, tableName);
+ if (data == null) return false;
+ String state = new String(data);
+ return state.equals(ENABLING) || state.equals(DISABLED);
+ }
+
+ private static boolean isTableState(ZooKeeperWatcher zkw, String tableName,
+ final String state)
+ throws KeeperException {
+ byte [] data = getData(zkw, tableName);
+ return data == null? false: Bytes.toString(data).equals(state);
+ }
+
+ private static byte [] getData(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName);
+ return ZKUtil.getData(zkw, znode);
+ }
+
+ private static String getDataAsString(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ byte [] data = getData(zkw, tableName);
+ return data == null? null: Bytes.toString(data);
+ }
+
+ /**
+ * Enables the table in zookeeper. Fails silently if the
+ * table is not currently disabled in zookeeper. Sets no watches.
+ * @param zkw
+ * @param tableName
+ * @throws KeeperException unexpected zookeeper exception
+ */
+ public static void enableTable(ZooKeeperWatcher zkw, String tableName)
+ throws KeeperException {
+ ZKUtil.deleteNodeFailSilent(zkw, ZKUtil.joinZNode(zkw.tableZNode,
+ tableName));
+ }
+
+ /**
+ * Gets a list of all the tables set as disabled in zookeeper.
+ * @param zkw
+ * @return list of disabled tables, empty list if none
+ * @throws KeeperException
+ */
+ public static List getDisabledTables(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ List disabledTables = new ArrayList();
+ List children = ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
+ for (String child: children) {
+ if (isDisabledTable(zkw, child)) disabledTables.add(child);
+ }
+ return disabledTables;
+ }
+}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -55,12 +55,12 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@@ -73,7 +73,6 @@
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -2537,47 +2536,6 @@
}
/**
- * Utility method used by HMaster marking regions offlined.
- * @param srvr META server to be updated
- * @param metaRegionName Meta region name
- * @param info HRegion to update in meta
- *
- * @throws IOException
- */
- public static void offlineRegionInMETA(final HRegionInterface srvr,
- final byte [] metaRegionName, final HRegionInfo info)
- throws IOException {
- // Puts and Deletes used to be "atomic" here. We can use row locks if
- // we need to keep that property, or we can expand Puts and Deletes to
- // allow them to be committed at once.
- byte [] row = info.getRegionName();
- Put put = new Put(row);
- info.setOffline(true);
- put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
- Writables.getBytes(info));
- srvr.put(metaRegionName, put);
- cleanRegionInMETA(srvr, metaRegionName, info);
- }
-
- /**
- * Clean COL_SERVER and COL_STARTCODE for passed info in
- * .META.
- * @param srvr
- * @param metaRegionName
- * @param info
- * @throws IOException
- */
- public static void cleanRegionInMETA(final HRegionInterface srvr,
- final byte [] metaRegionName, final HRegionInfo info)
- throws IOException {
- Delete del = new Delete(info.getRegionName());
- del.deleteColumns(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
- del.deleteColumns(HConstants.CATALOG_FAMILY,
- HConstants.STARTCODE_QUALIFIER);
- srvr.delete(metaRegionName, del);
- }
-
- /**
* Deletes all the files for a HRegion
*
* @param fs the file system object
Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy)
@@ -56,9 +56,9 @@
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
-import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
@@ -67,11 +67,11 @@
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.AsyncCallback;
@@ -674,7 +674,7 @@
* @param destination
* @param regions Regions to assign.
*/
- public void assign(final HServerInfo destination,
+ void assign(final HServerInfo destination,
final List regions) {
LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
destination.getServerName());
@@ -1104,8 +1104,9 @@
* This is a synchronous call and will return once every region has been
* assigned. If anything fails, an exception is thrown and the cluster
* should be shutdown.
+ * @throws InterruptedException
*/
- public void assignAllUserRegions() throws IOException {
+ public void assignAllUserRegions() throws IOException, InterruptedException {
// First experiment at synchronous assignment
// Simpler because just wait for no regions in transition
@@ -1120,51 +1121,140 @@
servers.size() + " server(s)");
// Generate a cluster startup region placement plan
- Map> bulkPlan =
+ final Map> bulkPlan =
LoadBalancer.bulkAssignment(allRegions, servers);
+
+ // Use fixed count thread pool assigning.
+ BulkAssigner ba = new BulkAssigner(this.master) {
+ @Override
+ public boolean bulkAssign() throws InterruptedException {
+ // Disable timing out regions in transition up in zk while bulk assigning.
+ timeoutMonitor.bulkAssign(true);
+ try {
+ return super.bulkAssign();
+ } finally {
+ // Reenable timing out regions in transition up in zi.
+ timeoutMonitor.bulkAssign(false);
+ }
+ }
- // Make a fixed thread count pool to run bulk assignments. Thought is that
- // if a 1k cluster, running 1k bulk concurrent assignment threads will kill
- // master, HDFS or ZK?
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setDaemon(true);
- builder.setNameFormat(this.master.getServerName() + "-BulkAssigner-%1$d");
- builder.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
- public void uncaughtException(Thread t, Throwable e) {
- // Abort if exception of any kind.
- master.abort("Uncaught exception bulk assigning in " + t.getName(), e);
+ protected String getThreadNamePrefix() {
+ return super.getThreadNamePrefix() + "-startup";
+ }
+
+ @Override
+ protected void populatePool(java.util.concurrent.ExecutorService pool) {
+ for (Map.Entry> e: bulkPlan.entrySet()) {
+ pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue()));
+ }
}
- });
- int threadCount =
- this.master.getConfiguration().getInt("hbase.bulk.assignment.threadpool.size", 20);
- java.util.concurrent.ExecutorService pool =
- Executors.newFixedThreadPool(threadCount, builder.build());
- // Disable timing out regions in transition up in zk while bulk assigning.
- this.timeoutMonitor.bulkAssign(true);
- try {
- for (Map.Entry> e: bulkPlan.entrySet()) {
- pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue()));
+
+ protected boolean waitUntilDone(final long timeout)
+ throws InterruptedException {
+ return waitUntilNoRegionsInTransition(timeout);
}
- // Wait for no regions to be in transition
+ };
+ ba.bulkAssign();
+ LOG.info("Bulk assigning done");
+ }
+
+ boolean waitUntilNoRegionsInTransition(final long timeout)
+ throws InterruptedException {
+ // Blocks until there are no regions in transition. It is possible that
+ // there
+ // are regions in transition immediately after this returns but guarantees
+ // that if it returns without an exception that there was a period of time
+ // with no regions in transition from the point-of-view of the in-memory
+ // state of the Master.
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+ synchronized (regionsInTransition) {
+ while (regionsInTransition.size() > 0 && !this.master.isStopped()
+ && remaining > 0) {
+ regionsInTransition.wait(remaining);
+ remaining = timeout - (System.currentTimeMillis() - startTime);
+ }
+ }
+ return regionsInTransition.isEmpty();
+ }
+
+ /**
+ * Base class used bulk assigning and unassigning.
+ * Encapsulates a fixed size thread pool of executors to run assignment.
+ * Implement {@link #populatePool(java.util.concurrent.ExecutorService)}.
+ */
+ public static abstract class BulkAssigner {
+ final Server server;
+
+ /**
+ * @param server An instance of Server
+ * @param regionsInTransition A reference to {@link AssignmentManager#regionsInTransition}
+ */
+ public BulkAssigner(final Server server) {
+ this.server = server;
+ }
+
+ protected String getThreadNamePrefix() {
+ return this.server.getServerName() + "-BulkAssigner";
+ }
+
+ protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ return new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ // Abort if exception of any kind.
+ server.abort("Uncaught exception in " + t.getName(), e);
+ }
+ };
+ }
+
+ protected int getThreadCount() {
+ return this.server.getConfiguration().
+ getInt("hbase.bulk.assignment.threadpool.size", 20);
+ }
+
+ protected long getTimeoutOnRIT() {
+ return this.server.getConfiguration().
+ getLong("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000);
+ }
+
+ protected abstract void populatePool(final java.util.concurrent.ExecutorService pool);
+
+ /**
+ * Run the bulk assign.
+ * @throws InterruptedException
+ * @return True if done.
+ */
+ public boolean bulkAssign() throws InterruptedException {
+ boolean result = false;
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setDaemon(true);
+ builder.setNameFormat(getThreadNamePrefix() + "-%1$d");
+ builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
+ int threadCount = getThreadCount();
+ java.util.concurrent.ExecutorService pool =
+ Executors.newFixedThreadPool(threadCount, builder.build());
try {
- // How long to wait on empty regions-in-transition. When we timeout,
- // we'll put back in place the monitor of R-I-T. It should do fixup
- // if server crashed during bulk assign, etc.
- long timeout =
- this.master.getConfiguration().getInt("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000);
- waitUntilNoRegionsInTransition(timeout);
- } catch (InterruptedException e) {
- LOG.error("Interrupted waiting for regions to be assigned", e);
- throw new IOException(e);
+ populatePool(pool);
+ // How long to wait on empty regions-in-transition. If we timeout, the
+ // RIT monitor should do fixup.
+ result = waitUntilDone(getTimeoutOnRIT());
+ } finally {
+ // We're done with the pool. It'll exit when its done all in queue.
+ pool.shutdown();
}
- } finally {
- // We're done with the pool. It'll exit when its done all in queue.
- pool.shutdown();
- // Reenable timing out regions in transition up in zi.
- this.timeoutMonitor.bulkAssign(false);
+ return result;
}
- LOG.info("Bulk assigning done");
+
+ /**
+ * Wait until bulk assign is done.
+ * @param timeout How long to wait.
+ * @throws InterruptedException
+ * @return True if the condition we were waiting on happened.
+ */
+ protected abstract boolean waitUntilDone(final long timeout)
+ throws InterruptedException;
}
/**
@@ -1281,28 +1371,6 @@
}
/**
- * Blocks until there are no regions in transition. It is possible that there
- * are regions in transition immediately after this returns but guarantees
- * that if it returns without an exception that there was a period of time
- * with no regions in transition from the point-of-view of the in-memory
- * state of the Master.
- * @param timeout How long to wait on empty regions-in-transition.
- * @throws InterruptedException
- */
- public void waitUntilNoRegionsInTransition(final long timeout)
- throws InterruptedException {
- long startTime = System.currentTimeMillis();
- long remaining = timeout;
- synchronized (this.regionsInTransition) {
- while(this.regionsInTransition.size() > 0 &&
- !this.master.isStopped() && remaining > 0) {
- this.regionsInTransition.wait(remaining);
- remaining = timeout - (System.currentTimeMillis() - startTime);
- }
- }
- }
-
- /**
* @return A copy of the Map of regions currently in transition.
*/
public NavigableMap getRegionsInTransition() {
@@ -1365,13 +1433,13 @@
* @return
*/
public boolean isTableDisabled(String tableName) {
- synchronized(disabledTables) {
+ synchronized (disabledTables) {
return disabledTables.contains(tableName);
}
}
/**
- * Wait on regions to clean regions-in-transition.
+ * Wait on region to clear regions-in-transition.
* @param hri Region to wait on.
* @throws IOException
*/
@@ -1406,38 +1474,66 @@
* @param tableName table to be disabled
*/
public void disableTable(String tableName) {
- synchronized(disabledTables) {
- if(!isTableDisabled(tableName)) {
- disabledTables.add(tableName);
+ synchronized (disabledTables) {
+ if (!isTableDisabled(tableName)) {
try {
- ZKTableDisable.disableTable(master.getZooKeeper(), tableName);
+ ZKTable.disableTable(master.getZooKeeper(), tableName);
} catch (KeeperException e) {
LOG.warn("ZK error setting table as disabled", e);
}
+ this.disabledTables.add(tableName);
}
}
}
/**
- * Unsets the specified table from being disabled.
- *
- * This operation only acts on the in-memory
- * @param tableName table to be undisabled
+ * Sets the specified table to be disabling. Not the same as
+ * disabled.
+ * @see {@link #disableTable(String)}
+ * @param tableName table to be disabling
*/
- public void undisableTable(String tableName) {
+ public void disablingTable(final String tableName) {
+ try {
+ ZKTable.disablingTable(this.master.getZooKeeper(), tableName);
+ } catch (KeeperException e) {
+ LOG.warn("ZK error setting table as disabling", e);
+ }
+ }
+
+ /**
+ * Sets the specified table to be disabling. Not the same as
+ * disabled.
+ * @see {@link #disableTable(String)}
+ * @param tableName table to be disabling
+ */
+ public void enablingTable(final String tableName) {
synchronized(disabledTables) {
- if(isTableDisabled(tableName)) {
+ if (isTableDisabled(tableName)) {
disabledTables.remove(tableName);
try {
- ZKTableDisable.undisableTable(master.getZooKeeper(), tableName);
+ ZKTable.enablingTable(master.getZooKeeper(), tableName);
} catch (KeeperException e) {
- LOG.warn("ZK error setting table as disabled", e);
+ LOG.warn("ZK error setting table as enabling", e);
}
}
}
}
/**
+ * Unsets the specified table from being disabled.
+ *
+ * This operation only acts on the in-memory
+ * @param tableName table to be undisabled
+ */
+ public void enableTable(String tableName) {
+ try {
+ ZKTable.enableTable(this.master.getZooKeeper(), tableName);
+ } catch (KeeperException e) {
+ LOG.warn("ZK error setting table as enabled", e);
+ }
+ }
+
+ /**
* Rebuild the set of disabled tables from zookeeper. Used during master
* failover.
*/
@@ -1445,7 +1541,7 @@
synchronized(disabledTables) {
List disabledTables;
try {
- disabledTables = ZKTableDisable.getDisabledTables(master.getZooKeeper());
+ disabledTables = ZKTable.getDisabledTables(master.getZooKeeper());
} catch (KeeperException e) {
LOG.warn("ZK error getting list of disabled tables", e);
return;
@@ -1460,13 +1556,19 @@
/**
* Gets the online regions of the specified table.
+ * This method looks at the in-memory state. It does not go to .META..
+ * Only returns online regions. If a region on this table has been
+ * closed during a disable, etc., it will be included in the returned list.
+ * So, the returned list may not necessarily be ALL regions in this table, its
+ * all the ONLINE regions in the table.
* @param tableName
- * @return
+ * @return Online regions from tableName
*/
public List getRegionsOfTable(byte[] tableName) {
List tableRegions = new ArrayList();
- for(HRegionInfo regionInfo : regions.tailMap(new HRegionInfo(
- new HTableDescriptor(tableName), null, null)).keySet()) {
+ HRegionInfo boundary =
+ new HRegionInfo(new HTableDescriptor(tableName), null, null);
+ for (HRegionInfo regionInfo: this.regions.tailMap(boundary).keySet()) {
if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
tableRegions.add(regionInfo);
} else {
Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -795,13 +795,13 @@
}
public void enableTable(final byte [] tableName) throws IOException {
- new EnableTableHandler(this, tableName, catalogTracker, assignmentManager)
- .process();
+ new EnableTableHandler(this, tableName, catalogTracker,
+ assignmentManager).process();
}
public void disableTable(final byte [] tableName) throws IOException {
- new DisableTableHandler(this, tableName, catalogTracker, assignmentManager)
- .process();
+ new DisableTableHandler(this, tableName, catalogTracker,
+ assignmentManager).process();
}
/**
Index: src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (working copy)
@@ -31,8 +31,6 @@
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
-import org.apache.zookeeper.KeeperException;
public class DeleteTableHandler extends TableEventHandler {
private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class);
@@ -73,6 +71,6 @@
// If entry for this table in zk, and up in AssignmentManager, remove it.
// Call to undisableTable does this. TODO: Make a more formal purge table.
- am.undisableTable(Bytes.toString(tableName));
+ am.enableTable(Bytes.toString(tableName));
}
}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (working copy)
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,10 +34,11 @@
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.util.Bytes;
-
+/**
+ * Handler to run disable of a table.
+ */
public class DisableTableHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(DisableTableHandler.class);
-
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
@@ -52,7 +54,7 @@
// TODO: do we want to keep this in-memory as well? i guess this is
// part of old master rewrite, schema to zk to check for table
// existence and such
- if(!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
+ if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
throw new TableNotFoundException(Bytes.toString(tableName));
}
}
@@ -60,32 +62,79 @@
@Override
public void process() {
try {
- LOG.info("Attemping to disable the table " + this.tableNameStr);
+ LOG.info("Attemping to disable table " + this.tableNameStr);
handleDisableTable();
} catch (IOException e) {
- LOG.error("Error trying to disable the table " + this.tableNameStr, e);
+ LOG.error("Error trying to disable table " + this.tableNameStr, e);
}
}
private void handleDisableTable() throws IOException {
if (this.assignmentManager.isTableDisabled(this.tableNameStr)) {
- LOG.info("Table " + tableNameStr + " is already disabled; skipping disable");
+ LOG.info("Table " + tableNameStr + " already disabled; skipping disable");
return;
}
- // Set the table as disabled so it doesn't get re-onlined
- assignmentManager.disableTable(this.tableNameStr);
- // Get the online regions of this table.
- // TODO: What if region splitting at the time we get this listing?
- // TODO: Remove offline flag from HRI
- // TODO: Confirm we have parallel closing going on.
- List regions = assignmentManager.getRegionsOfTable(tableName);
- // Unassign the online regions
- for(HRegionInfo region: regions) {
- assignmentManager.unassign(region);
+ // Set table disabling flag up in zk.
+ this.assignmentManager.disablingTable(this.tableNameStr);
+ int online = -1;
+ do {
+ // Get list of online regions that are of this table. Regions that are
+ // already closed will not be included in this list; i.e. the returned
+ // list is not ALL regions in a table, its all online regions according to
+ // the in-memory state on this master.
+ final List regions =
+ this.assignmentManager.getRegionsOfTable(tableName);
+ online = regions.size();
+ BulkDisabler bd = new BulkDisabler(this.server, regions);
+ try {
+ bd.bulkAssign();
+ } catch (InterruptedException e) {
+ LOG.warn("Disable was interrupted");
+ // Preserve the interrupt.
+ Thread.currentThread().interrupt();
+ }
+ } while (online != 0);
+ // Flip the table to disabled.
+ this.assignmentManager.disableTable(this.tableNameStr);
+ }
+
+ /**
+ * Run bulk disable.
+ */
+ class BulkDisabler extends AssignmentManager.BulkAssigner {
+ private final List regions;
+
+ BulkDisabler(final Server server, final List regions) {
+ super(server);
+ this.regions = regions;
}
- // Wait on table's regions to clear region in transition.
- for (HRegionInfo region: regions) {
- this.assignmentManager.waitOnRegionToClearRegionsInTransition(region);
+
+ @Override
+ protected void populatePool(ExecutorService pool) {
+ for (HRegionInfo region: regions) {
+ if (assignmentManager.isRegionInTransition(region) != null) continue;
+ final HRegionInfo hri = region;
+ pool.execute(new Runnable() {
+ public void run() {
+ assignmentManager.unassign(hri);
+ }
+ });
+ }
}
+
+ @Override
+ protected boolean waitUntilDone(long timeout)
+ throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+ List regions = null;
+ while (!server.isStopped() && remaining > 0) {
+ Thread.sleep(1000);
+ regions = assignmentManager.getRegionsOfTable(tableName);
+ if (regions.isEmpty()) break;
+ remaining = timeout - (System.currentTimeMillis() - startTime);
+ }
+ return regions != null && regions.isEmpty();
+ }
}
}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy)
@@ -36,7 +36,6 @@
public class EnableTableHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(EnableTableHandler.class);
-
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1032127)
+++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy)
@@ -62,7 +62,7 @@
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
-import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@@ -484,7 +484,7 @@
return true;
}
try {
- List tables = ZKTableDisable.getDisabledTables(this.zooKeeper);
+ List tables = ZKTable.getDisabledTables(this.zooKeeper);
String searchStr = Bytes.toString(tableName);
boolean disabled = tables.contains(searchStr);
return online? !disabled: disabled;