Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1031954)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -55,12 +55,12 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@@ -73,7 +73,6 @@
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -2537,47 +2536,6 @@
}
/**
- * Utility method used by HMaster marking regions offlined.
- * @param srvr META server to be updated
- * @param metaRegionName Meta region name
- * @param info HRegion to update in meta
- *
- * @throws IOException
- */
- public static void offlineRegionInMETA(final HRegionInterface srvr,
- final byte [] metaRegionName, final HRegionInfo info)
- throws IOException {
- // Puts and Deletes used to be "atomic" here. We can use row locks if
- // we need to keep that property, or we can expand Puts and Deletes to
- // allow them to be committed at once.
- byte [] row = info.getRegionName();
- Put put = new Put(row);
- info.setOffline(true);
- put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
- Writables.getBytes(info));
- srvr.put(metaRegionName, put);
- cleanRegionInMETA(srvr, metaRegionName, info);
- }
-
- /**
- * Clean COL_SERVER and COL_STARTCODE for passed info in
- * .META.
- * @param srvr
- * @param metaRegionName
- * @param info
- * @throws IOException
- */
- public static void cleanRegionInMETA(final HRegionInterface srvr,
- final byte [] metaRegionName, final HRegionInfo info)
- throws IOException {
- Delete del = new Delete(info.getRegionName());
- del.deleteColumns(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
- del.deleteColumns(HConstants.CATALOG_FAMILY,
- HConstants.STARTCODE_QUALIFIER);
- srvr.delete(metaRegionName, del);
- }
-
- /**
* Deletes all the files for a HRegion
*
* @param fs the file system object
Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1031954)
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -795,13 +795,13 @@
}
public void enableTable(final byte [] tableName) throws IOException {
- new EnableTableHandler(this, tableName, catalogTracker, assignmentManager)
- .process();
+ new EnableTableHandler(this, this, tableName, catalogTracker,
+ assignmentManager).process();
}
public void disableTable(final byte [] tableName) throws IOException {
- new DisableTableHandler(this, tableName, catalogTracker, assignmentManager)
- .process();
+ new DisableTableHandler(this, this, tableName, catalogTracker,
+ assignmentManager).process();
}
/**
Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1031954)
+++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy)
@@ -23,6 +23,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
@@ -674,7 +675,7 @@
* @param destination
* @param regions Regions to assign.
*/
- public void assign(final HServerInfo destination,
+ void assign(final HServerInfo destination,
final List regions) {
LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
destination.getServerName());
@@ -1168,6 +1169,75 @@
}
/**
+ * A Runnable that works on the passed {@link HRegionInfo}
+ * @see {@link AssignmentManager#inbulk(String, List, Class)}
+ */
+ public abstract class Bulkable implements Runnable {
+ @SuppressWarnings("unused")
+ private final HRegionInfo hri;
+
+ public Bulkable(final HRegionInfo hri) {
+ this.hri = hri;
+ }
+ }
+
+ /**
+ * Run bulk assign/unassigns.
+ * @param tag A tag that assigns this bulk job; e.g. 'enable_bulk_assign'.
+ * @param hris List of regions we're to operate on.
+ * @param bulkable Implementation of {@link Bulkable}
+ * @throws InterruptedException
+ */
+ public void inbulk(final String tag, final List hris,
+ final Class extends Bulkable> bulkable)
+ throws InterruptedException {
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setDaemon(true);
+ builder.setNameFormat(this.master.getServerName() + "-BulkAssigner-" +
+ tag + "-%1$d");
+ builder.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ // Abort if exception of any kind.
+ master.abort("Uncaught bulk exception " + t.getName(), e);
+ }
+ });
+ int threadCount =
+ this.master.getConfiguration().getInt("hbase.bulk.assignment.threadpool.size", 20);
+ java.util.concurrent.ExecutorService pool =
+ Executors.newFixedThreadPool(threadCount, builder.build());
+ try {
+ for (HRegionInfo hri: hris) {
+ Bulkable b = bulkable.getConstructor(Configuration.class).newInstance(hri);
+ pool.execute(b);
+ }
+ // Wait for no regions to be in transition
+
+ // How long to wait on empty regions-in-transition. When we timeout,
+ // we'll put back in place the monitor of R-I-T. It should do fixup
+ // if server crashed during bulk assign, etc.
+ long timeout =
+ this.master.getConfiguration().getInt("hbase.bulk.assignment.waiton.empty.rit", 10 * 60 * 1000);
+ waitUntilNoRegionsInTransition(timeout);
+ } catch (IllegalArgumentException e) {
+ this.master.abort("Failed bulk assign", e);
+ } catch (SecurityException e) {
+ this.master.abort("Failed bulk assign", e);
+ } catch (InstantiationException e) {
+ this.master.abort("Failed bulk assign", e);
+ } catch (IllegalAccessException e) {
+ this.master.abort("Failed bulk assign", e);
+ } catch (InvocationTargetException e) {
+ this.master.abort("Failed bulk assign", e);
+ } catch (NoSuchMethodException e) {
+ this.master.abort("Failed bulk assign", e);
+ } finally {
+ // We're done with the pool. It'll exit when its done all in queue.
+ pool.shutdown();
+ }
+ }
+
+ /**
* Manage bulk assigning to a server.
*/
class SingleServerBulkAssigner implements Runnable {
Index: src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (revision 1031954)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (working copy)
@@ -31,20 +31,23 @@
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
public class DisableTableHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(DisableTableHandler.class);
-
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
+ private final MasterServices services;
- public DisableTableHandler(Server server, byte [] tableName,
+ public DisableTableHandler(Server server, MasterServices services,
+ byte [] tableName,
CatalogTracker catalogTracker, AssignmentManager assignmentManager)
throws TableNotFoundException, IOException {
super(server, EventType.C_M_DISABLE_TABLE);
+ this.services = services;
this.tableName = tableName;
this.tableNameStr = Bytes.toString(this.tableName);
this.assignmentManager = assignmentManager;
@@ -72,20 +75,28 @@
LOG.info("Table " + tableNameStr + " is already disabled; skipping disable");
return;
}
- // Set the table as disabled so it doesn't get re-onlined
- assignmentManager.disableTable(this.tableNameStr);
- // Get the online regions of this table.
- // TODO: What if region splitting at the time we get this listing?
- // TODO: Remove offline flag from HRI
- // TODO: Confirm we have parallel closing going on.
- List regions = assignmentManager.getRegionsOfTable(tableName);
- // Unassign the online regions
- for(HRegionInfo region: regions) {
- assignmentManager.unassign(region);
- }
+ // Set table disabling flag up in zk.
+ this.assignmentManager.disablingTable(this.tableNameStr);
+ do {
+ List regions =
+ this.assignmentManager.getRegionsOfTable(tableName);
+ int online = regions.size();
+ for (HRegionInfo region: regions) {
+ if (region.isOffline()) {
+ online--;
+ continue;
+ }
+ if (this.assignmentManager.isRegionInTransition(region) != null) continue;
+ this.services.getExecutorService();
+ this.server.
+ this.assignmentManager.unassign(region);
+ }
// Wait on table's regions to clear region in transition.
for (HRegionInfo region: regions) {
this.assignmentManager.waitOnRegionToClearRegionsInTransition(region);
- }
+ } while (online != 0);
+ // Flip the table to disabled.
+ this.assignmentManager.move from diabling
+ this.assignmentManager.disableTable(this.tableNameStr);
}
}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1031954)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy)
@@ -31,21 +31,24 @@
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
public class EnableTableHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(EnableTableHandler.class);
-
+ private final MasterServices services;
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
private final CatalogTracker ct;
- public EnableTableHandler(Server server, byte [] tableName,
- CatalogTracker catalogTracker, AssignmentManager assignmentManager)
+ public EnableTableHandler(Server server, final MasterServices services,
+ byte [] tableName, CatalogTracker catalogTracker,
+ AssignmentManager assignmentManager)
throws TableNotFoundException, IOException {
super(server, EventType.C_M_ENABLE_TABLE);
+ this.services = services;
this.tableName = tableName;
this.tableNameStr = Bytes.toString(tableName);
this.ct = catalogTracker;