Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1031954) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -55,12 +55,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -2537,47 +2536,6 @@ } /** - * Utility method used by HMaster marking regions offlined. - * @param srvr META server to be updated - * @param metaRegionName Meta region name - * @param info HRegion to update in meta - * - * @throws IOException - */ - public static void offlineRegionInMETA(final HRegionInterface srvr, - final byte [] metaRegionName, final HRegionInfo info) - throws IOException { - // Puts and Deletes used to be "atomic" here. We can use row locks if - // we need to keep that property, or we can expand Puts and Deletes to - // allow them to be committed at once. - byte [] row = info.getRegionName(); - Put put = new Put(row); - info.setOffline(true); - put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, - Writables.getBytes(info)); - srvr.put(metaRegionName, put); - cleanRegionInMETA(srvr, metaRegionName, info); - } - - /** - * Clean COL_SERVER and COL_STARTCODE for passed info in - * .META. - * @param srvr - * @param metaRegionName - * @param info - * @throws IOException - */ - public static void cleanRegionInMETA(final HRegionInterface srvr, - final byte [] metaRegionName, final HRegionInfo info) - throws IOException { - Delete del = new Delete(info.getRegionName()); - del.deleteColumns(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - del.deleteColumns(HConstants.CATALOG_FAMILY, - HConstants.STARTCODE_QUALIFIER); - srvr.delete(metaRegionName, del); - } - - /** * Deletes all the files for a HRegion * * @param fs the file system object Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1031954) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -795,13 +795,13 @@ } public void enableTable(final byte [] tableName) throws IOException { - new EnableTableHandler(this, tableName, catalogTracker, assignmentManager) - .process(); + new EnableTableHandler(this, this, tableName, catalogTracker, + assignmentManager).process(); } public void disableTable(final byte [] tableName) throws IOException { - new DisableTableHandler(this, tableName, catalogTracker, assignmentManager) - .process(); + new DisableTableHandler(this, this, tableName, catalogTracker, + assignmentManager).process(); } /** Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1031954) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; +import java.lang.reflect.InvocationTargetException; import java.net.ConnectException; import java.util.ArrayList; import java.util.Collections; @@ -674,7 +675,7 @@ * @param destination * @param regions Regions to assign. */ - public void assign(final HServerInfo destination, + void assign(final HServerInfo destination, final List regions) { LOG.debug("Bulk assigning " + regions.size() + " region(s) to " + destination.getServerName()); @@ -1168,6 +1169,75 @@ } /** + * A Runnable that works on the passed {@link HRegionInfo} + * @see {@link AssignmentManager#inbulk(String, List, Class)} + */ + public abstract class Bulkable implements Runnable { + @SuppressWarnings("unused") + private final HRegionInfo hri; + + public Bulkable(final HRegionInfo hri) { + this.hri = hri; + } + } + + /** + * Run bulk assign/unassigns. + * @param tag A tag that assigns this bulk job; e.g. 'enable_bulk_assign'. + * @param hris List of regions we're to operate on. + * @param bulkable Implementation of {@link Bulkable} + * @throws InterruptedException + */ + public void inbulk(final String tag, final List hris, + final Class bulkable) + throws InterruptedException { + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setDaemon(true); + builder.setNameFormat(this.master.getServerName() + "-BulkAssigner-" + + tag + "-%1$d"); + builder.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + // Abort if exception of any kind. + master.abort("Uncaught bulk exception " + t.getName(), e); + } + }); + int threadCount = + this.master.getConfiguration().getInt("hbase.bulk.assignment.threadpool.size", 20); + java.util.concurrent.ExecutorService pool = + Executors.newFixedThreadPool(threadCount, builder.build()); + try { + for (HRegionInfo hri: hris) { + Bulkable b = bulkable.getConstructor(Configuration.class).newInstance(hri); + pool.execute(b); + } + // Wait for no regions to be in transition + + // How long to wait on empty regions-in-transition. When we timeout, + // we'll put back in place the monitor of R-I-T. It should do fixup + // if server crashed during bulk assign, etc. + long timeout = + this.master.getConfiguration().getInt("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000); + waitUntilNoRegionsInTransition(timeout); + } catch (IllegalArgumentException e) { + this.master.abort("Failed bulk assign", e); + } catch (SecurityException e) { + this.master.abort("Failed bulk assign", e); + } catch (InstantiationException e) { + this.master.abort("Failed bulk assign", e); + } catch (IllegalAccessException e) { + this.master.abort("Failed bulk assign", e); + } catch (InvocationTargetException e) { + this.master.abort("Failed bulk assign", e); + } catch (NoSuchMethodException e) { + this.master.abort("Failed bulk assign", e); + } finally { + // We're done with the pool. It'll exit when its done all in queue. + pool.shutdown(); + } + } + + /** * Manage bulk assigning to a server. */ class SingleServerBulkAssigner implements Runnable { Index: src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (revision 1031954) +++ src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (working copy) @@ -31,20 +31,23 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; public class DisableTableHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(DisableTableHandler.class); - private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; + private final MasterServices services; - public DisableTableHandler(Server server, byte [] tableName, + public DisableTableHandler(Server server, MasterServices services, + byte [] tableName, CatalogTracker catalogTracker, AssignmentManager assignmentManager) throws TableNotFoundException, IOException { super(server, EventType.C_M_DISABLE_TABLE); + this.services = services; this.tableName = tableName; this.tableNameStr = Bytes.toString(this.tableName); this.assignmentManager = assignmentManager; @@ -72,20 +75,28 @@ LOG.info("Table " + tableNameStr + " is already disabled; skipping disable"); return; } - // Set the table as disabled so it doesn't get re-onlined - assignmentManager.disableTable(this.tableNameStr); - // Get the online regions of this table. - // TODO: What if region splitting at the time we get this listing? - // TODO: Remove offline flag from HRI - // TODO: Confirm we have parallel closing going on. - List regions = assignmentManager.getRegionsOfTable(tableName); - // Unassign the online regions - for(HRegionInfo region: regions) { - assignmentManager.unassign(region); - } + // Set table disabling flag up in zk. + this.assignmentManager.disablingTable(this.tableNameStr); + do { + List regions = + this.assignmentManager.getRegionsOfTable(tableName); + int online = regions.size(); + for (HRegionInfo region: regions) { + if (region.isOffline()) { + online--; + continue; + } + if (this.assignmentManager.isRegionInTransition(region) != null) continue; + this.services.getExecutorService(); + this.server. + this.assignmentManager.unassign(region); + } // Wait on table's regions to clear region in transition. for (HRegionInfo region: regions) { this.assignmentManager.waitOnRegionToClearRegionsInTransition(region); - } + } while (online != 0); + // Flip the table to disabled. + this.assignmentManager.move from diabling + this.assignmentManager.disableTable(this.tableNameStr); } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1031954) +++ src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy) @@ -31,21 +31,24 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; public class EnableTableHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(EnableTableHandler.class); - + private final MasterServices services; private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; private final CatalogTracker ct; - public EnableTableHandler(Server server, byte [] tableName, - CatalogTracker catalogTracker, AssignmentManager assignmentManager) + public EnableTableHandler(Server server, final MasterServices services, + byte [] tableName, CatalogTracker catalogTracker, + AssignmentManager assignmentManager) throws TableNotFoundException, IOException { super(server, EventType.C_M_ENABLE_TABLE); + this.services = services; this.tableName = tableName; this.tableNameStr = Bytes.toString(tableName); this.ct = catalogTracker;