Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision )
@@ -54,6 +54,7 @@
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -71,7 +72,6 @@
import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
-import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -85,10 +85,7 @@
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
-import org.apache.hadoop.hbase.zookeeper.ClusterId;
-import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
-import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.*;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.DNS;
@@ -154,7 +151,10 @@
private CatalogTracker catalogTracker;
// Cluster status zk tracker and local setter
private ClusterStatusTracker clusterStatusTracker;
-
+
+ // Schema change tracker
+ private MasterSchemaChangeTracker schemaChangeTracker;
+
// buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting
// operations/debugging.
@@ -180,12 +180,19 @@
private CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner;
+ private Thread schemaJanitorChore;
private MasterCoprocessorHost cpHost;
private final ServerName serverName;
private TableDescriptors tableDescriptors;
-
+
+ // Whether or not schema alter changes go through ZK or not.
+ private boolean supportInstantSchemaChanges = false;
+
+ private volatile boolean loadBalancerRunning = false;
+
+
/**
* Initializes the HMaster. The steps are as follows:
*
@@ -249,7 +256,18 @@
}
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
this.metrics = new MasterMetrics(getServerName().toString());
+ // initialize instant schema change settings
+ this.supportInstantSchemaChanges = conf.getBoolean(
+ "hbase.instant.schema.alter.enabled", false);
+ if (supportInstantSchemaChanges) {
+ LOG.info("Instant schema change enabled. All schema alter operations will " +
+ "happen through ZK.");
- }
+ }
+ else {
+ LOG.info("Instant schema change disabled. All schema alter operations will " +
+ "happen normally.");
+ }
+ }
/**
* Stall startup if we are designated a backup master; i.e. we want someone
@@ -377,6 +395,12 @@
boolean wasUp = this.clusterStatusTracker.isClusterUp();
if (!wasUp) this.clusterStatusTracker.setClusterUp();
+ // initialize schema change tracker
+ this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(),
+ this, this,
+ conf.getInt("hbase.instant.schema.alter.timeout", 60000));
+ this.schemaChangeTracker.start();
+
LOG.info("Server active/primary master; " + this.serverName +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
@@ -487,6 +511,9 @@
this.catalogJanitorChore = new CatalogJanitor(this, this);
Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
+ // Schema janitor chore.
+ this.schemaJanitorChore = getAndStartSchemaJanitorChore(this);
+
status.markComplete("Initialization successful");
LOG.info("Master has completed initialization");
initialized = true;
@@ -570,6 +597,15 @@
return this.tableDescriptors;
}
+ @Override
+ public MasterSchemaChangeTracker getSchemaChangeTracker() {
+ return this.schemaChangeTracker;
+ }
+
+ public RegionServerTracker getRegionServerTracker() {
+ return this.regionServerTracker;
+ }
+
/** @return InfoServer object. Maybe null.*/
public InfoServer getInfoServer() {
return this.infoServer;
@@ -672,7 +708,28 @@
if (this.executorService != null) this.executorService.shutdown();
}
- private static Thread getAndStartBalancerChore(final HMaster master) {
+ /**
+ * Start the schema janitor. This Janitor will periodically sweep the failed/expired schema
+ * changes.
+ * @param master
+ * @return
+ */
+ private Thread getAndStartSchemaJanitorChore(final HMaster master) {
+ String name = master.getServerName() + "-SchemaJanitorChore";
+ int schemaJanitorPeriod =
+ master.getConfiguration().getInt("hbase.instant.schema.janitor.period", 120000);
+ // Start up the schema janitor chore
+ Chore chore = new Chore(name, schemaJanitorPeriod, master) {
+ @Override
+ protected void chore() {
+ master.getSchemaChangeTracker().handleFailedOrExpiredSchemaChanges();
+ }
+ };
+ return Threads.setDaemonThreadRunning(chore.getThread());
+ }
+
+
+ private Thread getAndStartBalancerChore(final HMaster master) {
String name = master.getServerName() + "-BalancerChore";
int balancerPeriod =
master.getConfiguration().getInt("hbase.balancer.period", 300000);
@@ -693,8 +750,12 @@
if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.interrupt();
}
+ if (this.schemaJanitorChore != null) {
+ this.schemaJanitorChore.interrupt();
- }
+ }
+ }
+
@Override
public MapWritable regionServerStartup(final int port,
final long serverStartCode, final long serverCurrentTime)
@@ -763,6 +824,15 @@
return balancerCutoffTime;
}
+
+ /**
+ * Check whether the Load Balancer is currently running.
+ * @return true if the Load balancer is currently running.
+ */
+ public boolean isLoadBalancerRunning() {
+ return loadBalancerRunning;
+ }
+
@Override
public boolean balance() {
// If balance not true, don't run balancer.
@@ -770,76 +840,96 @@
// Do this call outside of synchronized block.
int maximumBalanceTime = getBalancerCutoffTime();
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
- boolean balancerRan;
+ boolean balancerRan = false;
- synchronized (this.balancer) {
+ synchronized (this.balancer) {
+ if (loadBalancerRunning) {
+ LOG.debug("Load balancer is currently running. Skipping the current execution.");
+ return false;
+ }
+
- // Only allow one balance run at at time.
- if (this.assignmentManager.isRegionsInTransition()) {
- LOG.debug("Not running balancer because " +
- this.assignmentManager.getRegionsInTransition().size() +
- " region(s) in transition: " +
- org.apache.commons.lang.StringUtils.
- abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256));
- return false;
- }
- if (this.serverManager.areDeadServersInProgress()) {
- LOG.debug("Not running balancer because processing dead regionserver(s): " +
- this.serverManager.getDeadServers());
- return false;
- }
+ // Only allow one balance run at at time.
+ if (this.assignmentManager.isRegionsInTransition()) {
+ LOG.debug("Not running balancer because " +
+ this.assignmentManager.getRegionsInTransition().size() +
+ " region(s) in transition: " +
+ org.apache.commons.lang.StringUtils.
+ abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256));
+ return false;
+ }
+ if (this.serverManager.areDeadServersInProgress()) {
+ LOG.debug("Not running balancer because processing dead regionserver(s): " +
+ this.serverManager.getDeadServers());
+ return false;
+ }
-
+ waitForCurrentSchemaChange();
+ loadBalancerRunning = true;
- if (this.cpHost != null) {
- try {
- if (this.cpHost.preBalance()) {
+ if (this.cpHost != null) {
+ try {
+ if (this.cpHost.preBalance()) {
LOG.debug("Coprocessor bypassing balancer request");
return false;
- }
- } catch (IOException ioe) {
- LOG.error("Error invoking master coprocessor preBalance()", ioe);
- return false;
- }
- }
+ }
+ } catch (IOException ioe) {
+ LOG.error("Error invoking master coprocessor preBalance()", ioe);
+ return false;
+ }
+ }
- Map> assignments =
- this.assignmentManager.getAssignments();
- // Returned Map from AM does not include mention of servers w/o assignments.
- for (Map.Entry e:
- this.serverManager.getOnlineServers().entrySet()) {
- if (!assignments.containsKey(e.getKey())) {
- assignments.put(e.getKey(), new ArrayList());
- }
- }
- List plans = this.balancer.balanceCluster(assignments);
- int rpCount = 0; // number of RegionPlans balanced so far
- long totalRegPlanExecTime = 0;
- balancerRan = plans != null;
- if (plans != null && !plans.isEmpty()) {
- for (RegionPlan plan: plans) {
- LOG.info("balance " + plan);
- long balStartTime = System.currentTimeMillis();
- this.assignmentManager.balance(plan);
- totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
- rpCount++;
- if (rpCount < plans.size() &&
- // if performing next balance exceeds cutoff time, exit the loop
- (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
- LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
- maximumBalanceTime);
- break;
- }
- }
- }
- if (this.cpHost != null) {
- try {
- this.cpHost.postBalance();
- } catch (IOException ioe) {
- // balancing already succeeded so don't change the result
- LOG.error("Error invoking master coprocessor postBalance()", ioe);
- }
- }
+ Map> assignments =
+ this.assignmentManager.getAssignments();
+ // Returned Map from AM does not include mention of servers w/o assignments.
+ for (Map.Entry e:
+ this.serverManager.getOnlineServers().entrySet()) {
+ if (!assignments.containsKey(e.getKey())) {
+ assignments.put(e.getKey(), new ArrayList());
+ }
+ }
+ List plans = this.balancer.balanceCluster(assignments);
+ int rpCount = 0; // number of RegionPlans balanced so far
+ long totalRegPlanExecTime = 0;
+ balancerRan = plans != null;
+ if (plans != null && !plans.isEmpty()) {
+ for (RegionPlan plan: plans) {
+ LOG.info("balance " + plan);
+ long balStartTime = System.currentTimeMillis();
+ this.assignmentManager.balance(plan);
+ totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
+ rpCount++;
+ if (rpCount < plans.size() &&
+ // if performing next balance exceeds cutoff time, exit the loop
+ (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
+ LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
+ maximumBalanceTime);
+ break;
+ }
+ }
+ }
+ if (this.cpHost != null) {
+ try {
+ this.cpHost.postBalance();
+ } catch (IOException ioe) {
+ // balancing already succeeded so don't change the result
+ LOG.error("Error invoking master coprocessor postBalance()", ioe);
+ }
+ }
+ loadBalancerRunning = false;
- }
+ }
+
return balancerRan;
}
+ private void waitForCurrentSchemaChange() {
+ while (schemaChangeTracker.isSchemaChangeInProgress()) {
+ try {
+ LOG.debug("Schema change operation is in progress. Waiting for " +
+ "it to complete before running the load balancer.");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
enum BalanceSwitchMode {
SYNC,
ASYNC
@@ -984,22 +1074,63 @@
if (cpHost != null) {
cpHost.preDeleteTable(tableName);
}
- this.executorService.submit(new DeleteTableHandler(tableName, this, this));
+ this.executorService.submit(new DeleteTableHandler(tableName, this, this, this,
+ supportInstantSchemaChanges));
if (cpHost != null) {
cpHost.postDeleteTable(tableName);
}
}
+ /**
+ * Get the number of regions of the table that have been updated by the alter.
+ *
+ * @return Pair indicating the number of regions updated Pair.getFirst is the
+ * regions that are yet to be updated Pair.getSecond is the total number
+ * of regions of the table
+ */
public Pair getAlterStatus(byte[] tableName)
throws IOException {
- try {
+ if (supportInstantSchemaChanges) {
+ return getAlterStatusFromSchemaChangeTracker(tableName);
+ }
- return this.assignmentManager.getReopenStatus(tableName);
+ return this.assignmentManager.getReopenStatus(tableName);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted", e);
- }
+ }
+
+ /**
+ * Used by the client to identify if all regions have the schema updates
+ *
+ * @param tableName
+ * @return Pair indicating the status of the alter command
+ * @throws IOException
+ */
+ private Pair getAlterStatusFromSchemaChangeTracker(byte[] tableName)
+ throws IOException {
+ MasterSchemaChangeTracker.MasterAlterStatus alterStatus = null;
+ try {
+ alterStatus =
+ this.schemaChangeTracker.getMasterAlterStatus(Bytes.toString(tableName));
+ } catch (KeeperException ke) {
+ LOG.error("KeeperException while getting schema alter status for table = "
+ + Bytes.toString(tableName), ke);
- }
+ }
+ if (alterStatus != null) {
+ LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = "
+ + Bytes.toString(tableName) + " Alter Status = "
+ + alterStatus.toString());
+ int numberPending = alterStatus.getNumberOfRegionsToProcess() -
+ alterStatus.getNumberOfRegionsProcessed();
+ return new Pair(alterStatus.getNumberOfRegionsProcessed(),
+ alterStatus.getNumberOfRegionsToProcess());
+ } else {
+ LOG.debug("MasterAlterStatus is NULL for table = "
+ + Bytes.toString(tableName));
+ // should we throw IOException here as it makes more sense?
+ return new Pair(0,0);
+ }
+ }
+
public void addColumn(byte [] tableName, HColumnDescriptor column)
throws IOException {
if (cpHost != null) {
@@ -1007,7 +1138,8 @@
return;
}
}
- new TableAddFamilyHandler(tableName, column, this, this).process();
+ new TableAddFamilyHandler(tableName, column, this, this,
+ this, supportInstantSchemaChanges).process();
if (cpHost != null) {
cpHost.postAddColumn(tableName, column);
}
@@ -1020,7 +1152,8 @@
return;
}
}
- new TableModifyFamilyHandler(tableName, descriptor, this, this).process();
+ new TableModifyFamilyHandler(tableName, descriptor, this, this,
+ this, supportInstantSchemaChanges).process();
if (cpHost != null) {
cpHost.postModifyColumn(tableName, descriptor);
}
@@ -1033,7 +1166,8 @@
return;
}
}
- new TableDeleteFamilyHandler(tableName, c, this, this).process();
+ new TableDeleteFamilyHandler(tableName, c, this, this,
+ this, supportInstantSchemaChanges).process();
if (cpHost != null) {
cpHost.postDeleteColumn(tableName, c);
}
@@ -1044,7 +1178,7 @@
cpHost.preEnableTable(tableName);
}
this.executorService.submit(new EnableTableHandler(this, tableName,
- catalogTracker, assignmentManager, false));
+ catalogTracker, assignmentManager, false));
if (cpHost != null) {
cpHost.postEnableTable(tableName);
@@ -1100,22 +1234,32 @@
@Override
public void modifyTable(final byte[] tableName, HTableDescriptor htd)
- throws IOException {
+ throws IOException {
if (cpHost != null) {
cpHost.preModifyTable(tableName, htd);
}
-
this.executorService.submit(new ModifyTableHandler(tableName, htd, this,
- this));
-
+ this, this, supportInstantSchemaChanges));
if (cpHost != null) {
cpHost.postModifyTable(tableName, htd);
}
}
@Override
- public void checkTableModifiable(final byte [] tableName)
+ public void checkTableModifiable(final byte [] tableName,
+ EventHandler.EventType eventType)
throws IOException {
+ preCheckTableModifiable(tableName);
+ if (!eventType.isSchemaChangeEvent()) {
+ if (!getAssignmentManager().getZKTable().
+ isDisabledTable(Bytes.toString(tableName))) {
+ throw new TableNotDisabledException(tableName);
+ }
+ }
+ }
+
+ private void preCheckTableModifiable(final byte[] tableName)
+ throws IOException {
String tableNameStr = Bytes.toString(tableName);
if (isCatalogTable(tableName)) {
throw new IOException("Can't modify catalog tables");
@@ -1123,11 +1267,7 @@
if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) {
throw new TableNotFoundException(tableNameStr);
}
- if (!getAssignmentManager().getZKTable().
- isDisabledTable(Bytes.toString(tableName))) {
- throw new TableNotDisabledException(tableName);
- }
+ }
- }
public void clearFromTransition(HRegionInfo hri) {
if (this.assignmentManager.isRegionInTransition(hri) != null) {
Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision )
@@ -85,6 +85,8 @@
public String clusterIdZNode;
// znode used for log splitting work assignment
public String splitLogZNode;
+ // znode used to record table schema changes
+ public String schemaZNode;
private final Configuration conf;
@@ -140,6 +142,7 @@
ZKUtil.createAndFailSilent(this, rsZNode);
ZKUtil.createAndFailSilent(this, tableZNode);
ZKUtil.createAndFailSilent(this, splitLogZNode);
+ ZKUtil.createAndFailSilent(this, schemaZNode);
} catch (KeeperException e) {
throw new ZooKeeperConnectionException(
prefix("Unexpected KeeperException creating base node"), e);
@@ -187,6 +190,8 @@
conf.get("zookeeper.znode.clusterId", "hbaseid"));
splitLogZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
+ schemaZNode = ZKUtil.joinZNode(baseZNode,
+ conf.get("zookeeper.znode.schema", "schema"));
}
/**
Index: src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (revision )
@@ -472,7 +472,7 @@
*/
public HTableDescriptor addColumn(byte[] tableName, HColumnDescriptor hcd)
throws IOException {
- LOG.info("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " +
+ LOG.debug("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " +
hcd.toString());
HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
if (htd == null) {
Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision )
@@ -270,7 +270,7 @@
* @throws InterruptedException
*/
public Pair getReopenStatus(byte[] tableName)
- throws IOException, InterruptedException {
+ throws IOException {
List hris =
MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName);
Integer pending = 0;
Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java (revision )
@@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -40,8 +41,11 @@
public TableModifyFamilyHandler(byte[] tableName,
HColumnDescriptor familyDesc, Server server,
- final MasterServices masterServices) throws IOException {
- super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices);
+ final MasterServices masterServices,
+ HMasterInterface masterInterface,
+ boolean instantChange) throws IOException {
+ super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices,
+ masterInterface, instantChange);
this.familyDesc = familyDesc;
}
Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (revision )
@@ -28,17 +28,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.BulkReOpen;
import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
+import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -53,26 +57,22 @@
public abstract class TableEventHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(TableEventHandler.class);
protected final MasterServices masterServices;
+ protected HMasterInterface master = null;
protected final byte [] tableName;
protected final String tableNameStr;
+ protected boolean instantAction = false;
public TableEventHandler(EventType eventType, byte [] tableName, Server server,
- MasterServices masterServices)
+ MasterServices masterServices, HMasterInterface masterInterface,
+ boolean instantSchemaChange)
throws IOException {
super(server, eventType);
this.masterServices = masterServices;
this.tableName = tableName;
- try {
- this.masterServices.checkTableModifiable(tableName);
- } catch (TableNotDisabledException ex) {
- if (eventType.isOnlineSchemaChangeSupported()) {
- LOG.debug("Ignoring table not disabled exception " +
- "for supporting online schema changes.");
- } else {
- throw ex;
- }
- }
+ this.masterServices.checkTableModifiable(tableName, eventType);
this.tableNameStr = Bytes.toString(this.tableName);
+ this.instantAction = instantSchemaChange;
+ this.master = masterInterface;
}
@Override
@@ -84,22 +84,46 @@
MetaReader.getTableRegions(this.server.getCatalogTracker(),
tableName);
handleTableOperation(hris);
- if (eventType.isOnlineSchemaChangeSupported() && this.masterServices.
+ handleSchemaChanges(hris);
+ } catch (IOException e) {
+ LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
+ } catch (KeeperException e) {
+ LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
+ }
+ }
+
+ private void handleSchemaChanges(List regions)
+ throws IOException {
+ if (instantAction && regions != null && !regions.isEmpty()) {
+ handleInstantSchemaChanges(regions.size());
+ } else {
+ handleRegularSchemaChanges(regions);
+ }
+ }
+
+
+ /**
+ * Perform schema changes only if the table is in enabled state.
+ * @return
+ */
+ private boolean canPerformSchemaChange() {
+ return (eventType.isSchemaChangeEvent() && this.masterServices.
- getAssignmentManager().getZKTable().
+ getAssignmentManager().getZKTable().
- isEnabledTable(Bytes.toString(tableName))) {
- if (reOpenAllRegions(hris)) {
+ isEnabledTable(Bytes.toString(tableName)));
+ }
+
+ private void handleRegularSchemaChanges(List regions)
+ throws IOException {
+ if (canPerformSchemaChange()) {
+ this.masterServices.getAssignmentManager().setRegionsToReopen(regions);
+ if (reOpenAllRegions(regions)) {
- LOG.info("Completed table operation " + eventType + " on table " +
- Bytes.toString(tableName));
- } else {
- LOG.warn("Error on reopening the regions");
- }
- }
+ LOG.info("Completed table operation " + eventType + " on table " +
+ Bytes.toString(tableName));
+ } else {
+ LOG.warn("Error on reopening the regions");
+ }
+ }
- } catch (IOException e) {
- LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
- } catch (KeeperException e) {
- LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
- }
+ }
- }
public boolean reOpenAllRegions(List regions) throws IOException {
boolean done = false;
@@ -107,7 +131,8 @@
HTable table = new HTable(masterServices.getConfiguration(), tableName);
TreeMap> serverToRegions = Maps
.newTreeMap();
- NavigableMap hriHserverMapping = table.getRegionLocations();
+ NavigableMap hriHserverMapping
+ = table.getRegionLocations();
List reRegions = new ArrayList();
for (HRegionInfo hri : regions) {
ServerName rsLocation = hriHserverMapping.get(hri);
@@ -148,6 +173,64 @@
}
return done;
}
+
+ /**
+ * If load balancer is currently running, then we wait for it to complete before
+ * processing the schema change. This is required as other wise we will be working
+ * with wrong data set. Master is already performing this check and we don't need this
+ * here again?. We do need it just in case if the Load balancer was started in the mean time.
+ * @param status
+ */
+ private void waitForLoadBalancerToComplete(MonitoredTask status) {
+ if (master.isLoadBalancerRunning()) {
+ status.setStatus("Load balancer is currently running. Waiting for load " +
+ "balancer to complete before processing the alter table request for table = "
+ + tableNameStr);
+ while (master.isLoadBalancerRunning()) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ protected void handleInstantSchemaChanges(int numberOfRegions) {
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Handling alter table request for table = " + tableNameStr);
+ if (canPerformSchemaChange()) {
+ try {
+ // Wait for currently running load balancer to finish.
+ waitForLoadBalancerToComplete(status);
+ MasterSchemaChangeTracker masterSchemaChangeTracker =
+ this.masterServices.getSchemaChangeTracker();
+ masterSchemaChangeTracker
+ .createSchemaChangeNode(Bytes.toString(tableName),
+ numberOfRegions);
+ while(!masterSchemaChangeTracker.doesSchemaChangeNodeExists(
+ Bytes.toString(tableName))) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ status.setStatus("Created ZK node for handling the alter table request for table = "
+ + tableNameStr);
+ } catch (KeeperException e) {
+ LOG.warn("Instant schema change failed for table " + tableNameStr, e);
+ status.setStatus("Instant schema change failed for table " + tableNameStr
+ + " Cause = " + e.getCause());
+
+ } catch (IOException ioe) {
+ LOG.warn("Instant schema change failed for table " + tableNameStr, ioe);
+ status.setStatus("Instant schema change failed for table " + tableNameStr
+ + " Cause = " + ioe.getCause());
+ }
+ }
+ }
+
protected abstract void handleTableOperation(List regions)
throws IOException, KeeperException;
}
Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java (revision )
@@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -38,8 +39,10 @@
private final HColumnDescriptor familyDesc;
public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc,
- Server server, final MasterServices masterServices) throws IOException {
- super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
+ Server server, final MasterServices masterServices,
+ HMasterInterface masterInterface, boolean instantChange) throws IOException {
+ super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices,
+ masterInterface, instantChange);
this.familyDesc = familyDesc;
}
Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision )
@@ -320,11 +320,21 @@
}
}
+ /**
+ * Exclude a RS from any pending schema change process.
+ * @param serverName
+ */
+ private void excludeRegionServerFromSchemaChanges(final ServerName serverName) {
+ this.services.getSchemaChangeTracker()
+ .excludeRegionServerForSchemaChanges(serverName.getServerName());
+ }
+
/*
* Expire the passed server. Add it to list of deadservers and queue a
* shutdown processing.
*/
public synchronized void expireServer(final ServerName serverName) {
+ excludeRegionServerFromSchemaChanges(serverName);
if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Received expiration of " + serverName +
" but server is not currently online");
Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java (revision )
@@ -26,6 +26,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -38,8 +39,11 @@
private final byte [] familyName;
public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName,
- Server server, final MasterServices masterServices) throws IOException {
- super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
+ Server server, final MasterServices masterServices,
+ HMasterInterface masterInterface,
+ boolean instantChange) throws IOException {
+ super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices,
+ masterInterface, instantChange);
this.familyName = familyName;
}
Index: src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (revision )
@@ -26,7 +26,10 @@
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
+import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
/**
* Services Master supplies
@@ -53,12 +56,15 @@
public ExecutorService getExecutorService();
/**
- * Check table is modifiable; i.e. exists and is offline.
- * @param tableName Name of table to check.
- * @throws TableNotDisabledException
- * @throws TableNotFoundException
+ * Check table modifiable. i.e not ROOT or META and offlined for all commands except
+ * alter commands
+ * @param tableName
+ * @param eventType
+ * @throws IOException
*/
- public void checkTableModifiable(final byte [] tableName) throws IOException;
+ public void checkTableModifiable(final byte [] tableName,
+ EventHandler.EventType eventType)
+ throws IOException;
/**
* Create a table using the given table definition.
@@ -73,4 +79,17 @@
* @return Return table descriptors implementation.
*/
public TableDescriptors getTableDescriptors();
+
+ /**
+ * Get Master Schema change tracker
+ * @return
+ */
+ public MasterSchemaChangeTracker getSchemaChangeTracker();
+
+ /**
+ * Return the Region server tracker.
+ * @return RegionServerTracker
+ */
+ public RegionServerTracker getRegionServerTracker();
+
}
Index: src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (revision )
@@ -245,17 +245,17 @@
public HTableDescriptor[] getHTableDescriptors();
/**
- * Get current HTD for a given tablename
- * @param tableName
- * @return HTableDescriptor for the table
- */
- //public HTableDescriptor getHTableDescriptor(final byte[] tableName);
-
- /**
* Get array of HTDs for requested tables.
* @param tableNames
* @return array of HTableDescriptor
*/
public HTableDescriptor[] getHTableDescriptors(List tableNames);
+ /**
+ * Returns the current running status of load balancer.
+ * @return True if LoadBalancer is running now else False.
+ */
+ public boolean isLoadBalancerRunning();
+
+
}
Index: src/main/resources/hbase-default.xml
===================================================================
--- src/main/resources/hbase-default.xml (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/resources/hbase-default.xml (revision )
@@ -732,17 +732,41 @@
- hbase.coprocessor.abortonerror
- false
-
- Set to true to cause the hosting server (master or regionserver) to
- abort if a coprocessor throws a Throwable object that is not IOException or
- a subclass of IOException. Setting it to true might be useful in development
- environments where one wants to terminate the server as soon as possible to
- simplify coprocessor failure analysis.
-
+ hbase.coprocessor.abortonerror
+ false
+
+ Set to true to cause the hosting server (master or regionserver) to
+ abort if a coprocessor throws a Throwable object that is not IOException or
+ a subclass of IOException. Setting it to true might be useful in development
+ environments where one wants to terminate the server as soon as possible to
+ simplify coprocessor failure analysis.
+
+ hbase.instant.schema.alter.enabled
+ false
+ Whether or not to handle alter schema changes instantly or not.
+ If enabled, all schema change alter operations will be instant, as the master will not
+ explicitly unassign/assign the impacted regions and instead will rely on Region servers to
+ refresh their schema changes. If enabled, the schema alter requests will survive
+ master or RS failures.
+
+
+
+ hbase.instant.schema.janitor.period
+ 120000
+ The Schema Janitor process wakes up every millis and sweeps all
+ expired/failed schema change requests.
+
+
+
+ hbase.instant.schema.alter.timeout
+ 60000
+ Timeout in millis after which any pending schema alter request will be
+ considered as failed.
+
+
+
dfs.support.append
true
Does HDFS allow appends to files?
Index: src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision )
@@ -141,13 +141,12 @@
* Constructor
*/
EventType(int value) {}
- public boolean isOnlineSchemaChangeSupported() {
+ public boolean isSchemaChangeEvent() {
return (
- this.equals(EventType.C_M_ADD_FAMILY) ||
- this.equals(EventType.C_M_DELETE_FAMILY) ||
- this.equals(EventType.C_M_MODIFY_FAMILY) ||
+ this.equals(EventType.C_M_ADD_FAMILY) ||
+ this.equals(EventType.C_M_DELETE_FAMILY) ||
+ this.equals(EventType.C_M_MODIFY_FAMILY) ||
- this.equals(EventType.C_M_MODIFY_TABLE)
- );
+ this.equals(EventType.C_M_MODIFY_TABLE));
}
}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision )
@@ -142,6 +142,7 @@
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
@@ -278,6 +279,9 @@
// Cluster Status Tracker
private ClusterStatusTracker clusterStatusTracker;
+ // Schema change Tracker
+ private SchemaChangeTracker schemaChangeTracker;
+
// Log Splitting Worker
private SplitLogWorker splitLogWorker;
@@ -569,6 +573,11 @@
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
catalogTracker.start();
+
+ // Schema change tracker
+ this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper,
+ this, this);
+ this.schemaChangeTracker.start();
}
/**
@@ -3334,6 +3343,57 @@
return wal.rollWriter(true);
}
+ /**
+ * Refresh schema changes for given regions.
+ * @param hRegion HRegion to refresh
+ * @throws IOException
+ */
+ public void refreshRegion(HRegion hRegion) throws IOException {
+
+ if (hRegion != null) {
+ HRegionInfo regionInfo = hRegion.getRegionInfo();
+ // Close the region
+ hRegion.close();
+ // Remove from online regions
+ removeFromOnlineRegions(regionInfo.getEncodedName());
+ // Get new HTD
+ HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName());
+ LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString()
+ + " Is = " + htd );
+ HRegion region =
+ HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf);
+ // Add new region to the onlineRegions
+ addToOnlineRegions(region);
+ }
+ }
+
+ /**
+ * Gets the online regions of the specified table.
+ * This method looks at the in-memory onlineRegions. It does not go to .META..
+ * Only returns online regions. If a region on this table has been
+ * closed during a disable, etc., it will not be included in the returned list.
+ * So, the returned list may not necessarily be ALL regions in this table, its
+ * all the ONLINE regions in the table.
+ * @param tableName
+ * @return Online regions from tableName
+ */
+ public List getOnlineRegions(byte[] tableName) {
+ List tableRegions = new ArrayList();
+ synchronized (this.onlineRegions) {
+ for (HRegion region: this.onlineRegions.values()) {
+ HRegionInfo regionInfo = region.getRegionInfo();
+ if(Bytes.equals(regionInfo.getTableName(), tableName)) {
+ tableRegions.add(region);
+ }
+ }
+ }
+ return tableRegions;
+ }
+
+ public SchemaChangeTracker getSchemaChangeTracker() {
+ return this.schemaChangeTracker;
+ }
+
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getCoprocessors() {
HServerLoad hsl = buildServerLoad();
Index: src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java (revision )
@@ -25,6 +25,7 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -33,11 +34,19 @@
public ModifyTableHandler(final byte [] tableName,
final HTableDescriptor htd, final Server server,
- final MasterServices masterServices) throws IOException {
- super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices);
+ final MasterServices masterServices,
+ final HMasterInterface masterInterface,
+ boolean instantModify) throws IOException {
+ super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices,
+ masterInterface, instantModify);
this.htd = htd;
+ validateTable(tableName, htd);
+ }
+
+ private void validateTable(final byte[] tableName, HTableDescriptor htd)
+ throws IOException {
if (!Bytes.equals(tableName, htd.getName())) {
- throw new IOException("TableDescriptor name & tableName must match: "
+ throw new IOException("TableDescriptor name & tableName must match: "
+ htd.getNameAsString() + " vs " + Bytes.toString(tableName));
}
}
Index: src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision )
@@ -305,7 +305,9 @@
*/
public HMaster getActiveMaster() {
for (JVMClusterUtil.MasterThread mt : masterThreads) {
- if (mt.getMaster().isActiveMaster()) {
+ // Ensure that the current active master is not stopped.
+ // We don't want to return a stopping master as an active master.
+ if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
return mt.getMaster();
}
}
@@ -449,4 +451,4 @@
admin.createTable(htd);
cluster.shutdown();
}
-}
\ No newline at end of file
+}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (revision )
@@ -21,6 +21,9 @@
import org.apache.hadoop.hbase.Server;
+import java.io.IOException;
+import java.util.List;
+
/**
* Interface to Map of online regions. In the Map, the key is the region's
* encoded name and the value is an {@link HRegion} instance.
@@ -49,4 +52,18 @@
* null if named region is not member of the online regions.
*/
public HRegion getFromOnlineRegions(String encodedRegionName);
+ /**
+ * Get all online regions of a table in this RS.
+ * @param tableName
+ * @return List of HRegion
+ * @throws java.io.IOException
+ */
+ public List getOnlineRegions(byte[] tableName) throws IOException;
+
+ /**
+ * Refresh a given region updating it with latest HTD info.
+ * @param hRegion
+ */
+ public void refreshRegion(HRegion hRegion) throws IOException;
+
}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (revision )
@@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -37,9 +38,11 @@
private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class);
public DeleteTableHandler(byte [] tableName, Server server,
- final MasterServices masterServices)
+ final MasterServices masterServices, HMasterInterface masterInterface,
+ boolean instantChange)
throws IOException {
- super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices);
+ super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices,
+ masterInterface, instantChange);
}
@Override
Index: src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java (revision )
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -55,6 +56,13 @@
return this.regions.get(encodedRegionName);
}
+ public List getOnlineRegions(byte[] tableName) throws IOException {
+ return null;
+ }
+
+ public void refreshRegion(HRegion hRegion) throws IOException {
+ }
+
@Override
public void addToOnlineRegions(HRegion r) {
this.regions.put(r.getRegionInfo().getEncodedName(), r);
Index: src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (revision 3bfa409689e91f3a6314525b4143270655539053)
+++ src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (revision )
@@ -51,12 +51,15 @@
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
+import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.mockito.Mockito;
@@ -147,11 +150,6 @@
}
@Override
- public void checkTableModifiable(byte[] tableName) throws IOException {
- //no-op
- }
-
- @Override
public void createTable(HTableDescriptor desc, byte[][] splitKeys)
throws IOException {
// no-op
@@ -167,6 +165,11 @@
return null;
}
+ public void checkTableModifiable(byte[] tableName,
+ EventHandler.EventType eventType)
+ throws IOException {
+ }
+
@Override
public MasterFileSystem getMasterFileSystem() {
return this.mfs;
@@ -251,8 +254,16 @@
}
};
}
+
+ public MasterSchemaChangeTracker getSchemaChangeTracker() {
+ return null;
- }
+ }
+ public RegionServerTracker getRegionServerTracker() {
+ return null;
+ }
+ }
+
@Test
public void testGetHRegionInfo() throws IOException {
assertNull(CatalogJanitor.getHRegionInfo(new Result()));
Index: src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision )
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision )
@@ -0,0 +1,830 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.Writable;
+
+public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker {
+ public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class);
+ private final MasterServices masterServices;
+ // Used by tests only. Do not change this.
+ private volatile int sleepTimeMillis = 0;
+ // schema changes pending more than this time will be timed out.
+ private long schemaChangeTimeoutMillis = 30000;
+
+ /**
+ * Constructs a new ZK node tracker.
+ *
+ * After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param abortable
+ */
+ public MasterSchemaChangeTracker(ZooKeeperWatcher watcher,
+ Abortable abortable, MasterServices masterServices,
+ long schemaChangeTimeoutMillis) {
+ super(watcher, watcher.schemaZNode, abortable);
+ this.masterServices = masterServices;
+ this.schemaChangeTimeoutMillis = schemaChangeTimeoutMillis;
+ }
+
+ @Override
+ public void start() {
+ try {
+ watcher.registerListener(this);
+ List tables =
+ ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
+ processCompletedSchemaChanges(tables);
+ } catch (KeeperException e) {
+ LOG.error("MasterSchemaChangeTracker startup failed.", e);
+ abortable.abort("MasterSchemaChangeTracker startup failed", e);
+ }
+ }
+
+ private List getCurrentTables() throws KeeperException {
+ return
+ ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
+ }
+
+ /**
+ * When a primary master crashes and the secondary master takes over
+ * mid-flight during an alter process, the secondary should cleanup any completed
+ * schema changes not handled by the previous master.
+ * @param tables
+ * @throws KeeperException
+ */
+ private void processCompletedSchemaChanges(List tables)
+ throws KeeperException {
+ if (tables == null || tables.isEmpty()) {
+ String msg = "No current schema change in progress. Skipping cleanup";
+ LOG.debug(msg);
+ }
+ String msg = "Master seeing following tables undergoing schema change " +
+ "process. Tables = " + tables;
+ MonitoredTask status = TaskMonitor.get().createStatus(msg);
+ LOG.debug(msg);
+ for (String table : tables) {
+ LOG.debug("Processing table = "+ table);
+ status.setStatus("Processing table = "+ table);
+ try {
+ processTableNode(table);
+ } catch (IOException e) {
+ String errmsg = "IOException while processing completed schema changes."
+ + " Cause = " + e.getCause();
+ LOG.error(errmsg, e);
+ status.setStatus(errmsg);
+ }
+ }
+ }
+
+ /**
+ * Get current alter status for a table.
+ * @param tableName
+ * @return MasterAlterStatus
+ * @throws KeeperException
+ * @throws IOException
+ */
+ public MasterAlterStatus getMasterAlterStatus(String tableName)
+ throws KeeperException, IOException {
+ String path = getSchemaChangeNodePathForTable(tableName);
+ byte[] state = ZKUtil.getData(watcher, path);
+ if (state == null || state.length <= 0) {
+ return null;
+ }
+ MasterAlterStatus mas = new MasterAlterStatus();
+ Writables.getWritable(state, mas);
+ return mas;
+ }
+
+ /**
+ * Get RS specific alter status for a table & server
+ * @param tableName
+ * @param serverName
+ * @return Region Server's Schema alter status
+ * @throws KeeperException
+ * @throws IOException
+ */
+ private SchemaChangeTracker.SchemaAlterStatus getRSSchemaAlterStatus(
+ String tableName, String serverName)
+ throws KeeperException, IOException {
+ String childPath =
+ getSchemaChangeNodePathForTableAndServer(tableName, serverName);
+ byte[] childData = ZKUtil.getData(this.watcher, childPath);
+ if (childData == null || childData.length <= 0) {
+ return null;
+ }
+ SchemaChangeTracker.SchemaAlterStatus sas =
+ new SchemaChangeTracker.SchemaAlterStatus();
+ Writables.getWritable(childData, sas);
+ LOG.debug("Schema Status data for server = " + serverName + " table = "
+ + tableName + " == " + sas);
+ return sas;
+ }
+
+ /**
+ * Update the master's alter status based on all region server's response.
+ * @param servers
+ * @param tableName
+ * @throws IOException
+ */
+ private void updateMasterAlterStatus(MasterAlterStatus mas,
+ List servers, String tableName)
+ throws IOException, KeeperException {
+ for (String serverName : servers) {
+ SchemaChangeTracker.SchemaAlterStatus sas =
+ getRSSchemaAlterStatus(tableName, serverName);
+ if (sas != null) {
+ mas.update(sas);
+ LOG.debug("processTableNodeWithState:Updated Master Alter Status = "
+ + mas + " for server = " + serverName);
+ } else {
+ LOG.debug("SchemaAlterStatus is NULL for table = " + tableName);
+ }
+ }
+ }
+
+ /**
+ * If schema alter is handled for this table, then delete all the ZK nodes created
+ * for this table.
+ * @param tableName
+ * @throws KeeperException
+ */
+ private void processTableNode(String tableName) throws KeeperException,
+ IOException {
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Checking alter schema request status for table = " + tableName);
+
+ LOG.debug("processTableNodeWithState. TableName = " + tableName);
+ List servers =
+ ZKUtil.listChildrenAndWatchThem(watcher,
+ getSchemaChangeNodePathForTable(tableName));
+ MasterAlterStatus mas = getMasterAlterStatus(tableName);
+ if (mas == null) {
+ LOG.debug("MasterAlterStatus is NULL. Table = " + tableName);
+ return;
+ }
+ updateMasterAlterStatus(mas, servers, tableName);
+ status.setStatus("Current alter status for table = "
+ + tableName + " ==> " + mas.toString());
+ LOG.debug("Current Alter status = " + mas);
+ String nodePath = getSchemaChangeNodePathForTable(tableName);
+ ZKUtil.updateExistingNodeData(this.watcher, nodePath,
+ Writables.getBytes(mas), getZKNodeVersion(nodePath));
+ processAlterStatus(mas, tableName, servers, status);
+ }
+
+ /**
+ * Evaluate the master alter status and determine the current status.
+ * @param alterStatus
+ * @param tableName
+ * @param servers
+ * @param status
+ */
+ private void processAlterStatus(MasterAlterStatus alterStatus,
+ String tableName, List servers,
+ MonitoredTask status) throws KeeperException {
+ if (alterStatus.getNumberOfRegionsToProcess()
+ == alterStatus.getNumberOfRegionsProcessed()) {
+ // schema change completed.
+ String msg = "All region servers have successfully processed the " +
+ "schema changes for table = " + tableName
+ + " . Deleting the schema change node for table = "
+ + tableName + " Region servers processed the schema change" +
+ " request = " + alterStatus.getProcessedHosts()
+ + " Total number of regions = " + alterStatus.getNumberOfRegionsToProcess()
+ + " Processed regions = " + alterStatus.getNumberOfRegionsProcessed();
+ status.setStatus(msg);
+ LOG.debug(msg);
+ cleanProcessedTableNode(getSchemaChangeNodePathForTable(tableName));
+ } else {
+ if (alterStatus.getErrorCause() != null
+ && alterStatus.getErrorCause().trim().length() > 0) {
+ String msg = "Alter schema change failed "
+ + "for table = " + tableName + " Number of online regions = "
+ + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = "
+ + alterStatus.getNumberOfRegionsProcessed()
+ + " Original list = " + alterStatus.hostsToProcess + " Processed servers = "
+ + servers
+ + " Error Cause = " + alterStatus.getErrorCause();
+ // we have errors.
+ LOG.debug(msg);
+ status.setStatus(msg);
+ } else {
+ String msg = "Not all region servers have processed the schema changes"
+ + "for table = " + tableName + " Number of online regions = "
+ + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = "
+ + alterStatus.getNumberOfRegionsProcessed()
+ + " Original list = " + alterStatus.hostsToProcess + " Processed servers = "
+ + servers + " Alter STate = "
+ + alterStatus.getCurrentAlterStatus();
+ LOG.debug(msg);
+ status.setStatus(msg);
+ }
+ }
+ }
+
+ /**
+ * Check whether a in-flight schema change request has expired.
+ * @param tableName
+ * @return true is the schema change request expired.
+ * @throws IOException
+ */
+ private boolean hasSchemaChangeExpiredFor(String tableName)
+ throws IOException, KeeperException {
+ MasterAlterStatus mas = getMasterAlterStatus(tableName);
+ long createdTimeStamp = mas.getStamp();
+ long duration = System.currentTimeMillis() - createdTimeStamp;
+ LOG.debug("Created TimeStamp = " + createdTimeStamp
+ + " duration = " + duration + " Table = " + tableName
+ + " Master Alter Status = " + mas);
+ return (duration > schemaChangeTimeoutMillis);
+ }
+
+ /**
+ * Handle failed and expired schema changes. We simply delete all the
+ * expired/failed schema change attempts. Why we should do this ?
+ * 1) Keeping the failed/expired schema change nodes longer prohibits any
+ * future schema changes for the table.
+ * 2) Any lingering expired/failed schema change requests will prohibit the
+ * load balancer from running.
+ */
+ public void handleFailedOrExpiredSchemaChanges() {
+ try {
+ List tables = getCurrentTables();
+ for (String table : tables) {
+ String statmsg = "Cleaning failed or expired schema change requests. " +
+ "current tables undergoing " +
+ "schema change process = " + tables;
+ MonitoredTask status = TaskMonitor.get().createStatus(statmsg);
+ LOG.debug(statmsg);
+ if (hasSchemaChangeExpiredFor(table)) {
+ // time out.. currently, we abandon the in-flight schema change due to
+ // time out.
+ // Here, there are couple of options to consider. One could be to
+ // attempt a retry of the schema change and see if it succeeds, and
+ // another could be to simply rollback the schema change effort and
+ // see if it succeeds.
+ String msg = "Schema change for table = " + table + " has expired."
+ + " Schema change for this table has been in progress for " +
+ + schemaChangeTimeoutMillis +
+ "Deleting the node now.";
+ LOG.debug(msg);
+ ZKUtil.deleteNodeRecursively(this.watcher,
+ getSchemaChangeNodePathForTable(table));
+ } else {
+ String msg = "Schema change request is in progress for " +
+ " table = " + table;
+ LOG.debug(msg);
+ status.setStatus(msg);
+ }
+ }
+ } catch (IOException e) {
+ String msg = "IOException during handleFailedExpiredSchemaChanges."
+ + e.getCause();
+ LOG.error(msg, e);
+ TaskMonitor.get().createStatus(msg);
+ } catch (KeeperException ke) {
+ String msg = "KeeperException during handleFailedExpiredSchemaChanges."
+ + ke.getCause();
+ LOG.error(msg, ke);
+ TaskMonitor.get().createStatus(msg);
+ }
+ }
+
+ /**
+ * Clean the nodes of completed schema change table.
+ * @param path
+ * @throws KeeperException
+ */
+ private void cleanProcessedTableNode(String path) throws KeeperException {
+ if (sleepTimeMillis > 0) {
+ try {
+ LOG.debug("Master schema change tracker sleeping for "
+ + sleepTimeMillis);
+ Thread.sleep(sleepTimeMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ ZKUtil.deleteNodeRecursively(this.watcher, path);
+ LOG.debug("Deleted all nodes for path " + path);
+
+ }
+
+ /**
+ * Exclude a RS from schema change request (if applicable)
+ * We will exclude a RS from schema change request processing if 1) RS
+ * has online regions for the table AND 2) RS went down mid-flight
+ * during schema change process. We don't have to deal with RS going
+ * down mid-flight during a schema change as the online regions from
+ * the dead RS will get reassigned to some other RS and the
+ * process of reassign inherently takes care of the schema change as well.
+ * @param serverName
+ */
+ public void excludeRegionServerForSchemaChanges(String serverName) {
+ try {
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Processing schema change exclusion for region server = " + serverName);
+ List tables =
+ ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
+ if (tables == null || tables.isEmpty()) {
+ String msg = "No schema change in progress. Skipping exclusion for " +
+ "server = "+ serverName;
+ LOG.debug(msg);
+ status.setStatus(msg);
+ return ;
+ }
+ for (String tableName : tables) {
+ excludeRegionServer(tableName, serverName, status);
+ }
+ } catch(KeeperException ke) {
+ LOG.error("KeeperException during excludeRegionServerForSchemaChanges", ke);
+ } catch(IOException ioe) {
+ LOG.error("IOException during excludeRegionServerForSchemaChanges", ioe);
+
+ }
+ }
+
+ /**
+ * Check whether a schema change is in progress for a given table on a
+ * given RS.
+ * @param tableName
+ * @param serverName
+ * @return TRUE is this RS is currently processing a schema change request
+ * for the table.
+ * @throws KeeperException
+ */
+ private boolean isSchemaChangeApplicableFor(String tableName,
+ String serverName)
+ throws KeeperException {
+ List servers = ZKUtil.listChildrenAndWatchThem(watcher,
+ getSchemaChangeNodePathForTable(tableName));
+ return (servers.contains(serverName));
+ }
+
+ /**
+ * Exclude a region server for a table (if applicable) from schema change processing.
+ * @param tableName
+ * @param serverName
+ * @param status
+ * @throws KeeperException
+ * @throws IOException
+ */
+ private void excludeRegionServer(String tableName, String serverName,
+ MonitoredTask status)
+ throws KeeperException, IOException {
+ if (isSchemaChangeApplicableFor(tableName, serverName)) {
+ String msg = "Excluding RS " + serverName + " from schema change process" +
+ " for table = " + tableName;
+ LOG.debug(msg);
+ status.setStatus(msg);
+ SchemaChangeTracker.SchemaAlterStatus sas =
+ getRSSchemaAlterStatus(tableName, serverName);
+ if (sas == null) {
+ LOG.debug("SchemaAlterStatus is NULL for table = " + tableName
+ + " server = " + serverName);
+ }
+ // Set the status to IGNORED so we can process it accordignly.
+ sas.setCurrentAlterStatus(
+ SchemaChangeTracker.SchemaAlterStatus.AlterState.IGNORED);
+ LOG.debug("Updating the current schema status to " + sas);
+ String nodePath = getSchemaChangeNodePathForTableAndServer(tableName,
+ serverName);
+ ZKUtil.updateExistingNodeData(this.watcher,
+ nodePath, Writables.getBytes(sas), getZKNodeVersion(nodePath));
+ } else {
+ LOG.debug("Skipping exclusion of RS " + serverName
+ + " from schema change process"
+ + " for table = " + tableName
+ + " as it did not possess any online regions for the table");
+ }
+ processTableNode(tableName);
+ }
+
+ private int getZKNodeVersion(String nodePath) throws KeeperException {
+ return ZKUtil.checkExists(this.watcher, nodePath);
+ }
+
+ /**
+ * Create a new schema change ZK node.
+ * @param tableName Table name that is getting altered
+ * @throws KeeperException
+ */
+ public void createSchemaChangeNode(String tableName,
+ int numberOfRegions)
+ throws KeeperException, IOException {
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Creating schema change node for table = " + tableName);
+ LOG.debug("Creating schema change node for table = "
+ + tableName + " Path = "
+ + getSchemaChangeNodePathForTable(tableName));
+ if (doesSchemaChangeNodeExists(tableName)) {
+ LOG.debug("Schema change node already exists for table = " + tableName
+ + " Deleting the schema change node.");
+ // If we already see a schema change node for this table we wait till the previous
+ // alter process is complete. Ideally, we need not wait and we could simply delete
+ // existing schema change node for this table and create new one. But then the
+ // RS cloud will not be able to process concurrent schema updates for the same table
+ // as they will be working with same set of online regions for this table. Meaning the
+ // second alter change will not see any online regions (as they were being closed and
+ // re opened by the first change) and will miss the second one.
+ // We either handle this at the RS level using explicit locks while processing a table
+ // or do it here. I prefer doing it here as it seems much simpler and cleaner.
+ while(doesSchemaChangeNodeExists(tableName)) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode);
+ // if number of online RS = 0, we should not do anything!
+ if (rsCount <= 0) {
+ String msg = "Master is not seeing any online region servers. Aborting the " +
+ "schema change processing by region servers.";
+ LOG.debug(msg);
+ status.setStatus(msg);
+ } else {
+ LOG.debug("Master is seeing " + rsCount + " region servers online before " +
+ "the schema change process.");
+ MasterAlterStatus mas = new MasterAlterStatus(numberOfRegions,
+ getActiveRegionServersAsString());
+ LOG.debug("Master creating the master alter status = " + mas);
+ ZKUtil.createSetData(this.watcher,
+ getSchemaChangeNodePathForTable(tableName), Writables.getBytes(mas));
+ status.setStatus("Created the ZK node for schema change. Current Alter Status = "
+ + mas.toString());
+ ZKUtil.listChildrenAndWatchThem(this.watcher,
+ getSchemaChangeNodePathForTable(tableName));
+ }
+ }
+
+ private String getActiveRegionServersAsString() {
+ StringBuffer sbuf = new StringBuffer();
+ List currentRS =
+ masterServices.getRegionServerTracker().getOnlineServers();
+ for (ServerName serverName : currentRS) {
+ sbuf.append(serverName.getServerName());
+ sbuf.append(" ");
+ }
+ LOG.debug("Current list of RS to process the schema change = "
+ + sbuf.toString());
+ return sbuf.toString();
+ }
+
+ /**
+ * Create a new schema change ZK node.
+ * @param tableName
+ * @throws KeeperException
+ */
+ public boolean doesSchemaChangeNodeExists(String tableName)
+ throws KeeperException {
+ return ZKUtil.checkExists(watcher,
+ getSchemaChangeNodePathForTable(tableName)) != -1;
+ }
+
+ /**
+ * Check whether there are any schema change requests that are in progress now.
+ * We simply assume that a schema change is in progress if we see a ZK schema node for
+ * any table. We may revisit for fine grained checks such as check the current alter status
+ * et al, but it is not required now.
+ * @return
+ */
+ public boolean isSchemaChangeInProgress() {
+ try {
+ int schemaChangeCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.schemaZNode);
+ return schemaChangeCount > 0;
+ } catch (KeeperException ke) {
+ LOG.debug("KeeperException while getting current schema change progress.");
+ // What do we do now??? currently reporting as false.
+ }
+ return false;
+ }
+
+ /**
+ * We get notified when a RS processes/or completed the schema change request.
+ * The path will be of the format /hbase/schema/
+ * @param path full path of the node whose children have changed
+ */
+ @Override
+ public void nodeChildrenChanged(String path) {
+ String tableName = null;
+ if (path.startsWith(watcher.schemaZNode) &&
+ !path.equals(watcher.schemaZNode)) {
+ try {
+ LOG.debug("NodeChildrenChanged Path = " + path);
+ tableName = path.substring(path.lastIndexOf("/")+1, path.length());
+ processTableNode(tableName);
+ } catch (KeeperException e) {
+ TaskMonitor.get().createStatus(
+ "MasterSchemaChangeTracker: ZK exception while processing " +
+ " nodeChildrenChanged() event for table = " + tableName
+ + " Cause = " + e.getCause());
+ LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting"
+ + " schema change nodes", e);
+ } catch(IOException ioe) {
+ TaskMonitor.get().createStatus(
+ "MasterSchemaChangeTracker: ZK exception while processing " +
+ " nodeChildrenChanged() event for table = " + tableName
+ + " Cause = " + ioe.getCause());
+ LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting"
+ + " schema change nodes", ioe);
+ }
+ }
+ }
+
+ /**
+ * We get notified as and when the RS cloud updates their ZK nodes with
+ * progress information. The path will be of the format
+ * /hbase/schema//
+ * @param path
+ */
+ @Override
+ public void nodeDataChanged(String path) {
+ String tableName = null;
+ if (path.startsWith(watcher.schemaZNode) &&
+ !path.equals(watcher.schemaZNode)) {
+ try {
+ LOG.debug("NodeDataChanged Path = " + path);
+ String[] paths = path.split("/");
+ tableName = paths[3];
+ processTableNode(tableName);
+ } catch (KeeperException e) {
+ TaskMonitor.get().createStatus(
+ "MasterSchemaChangeTracker: ZK exception while processing " +
+ " nodeDataChanged() event for table = " + tableName
+ + " Cause = " + e.getCause());
+ LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting"
+ + " schema change nodes", e);
+ } catch(IOException ioe) {
+ TaskMonitor.get().createStatus(
+ "MasterSchemaChangeTracker: IO exception while processing " +
+ " nodeDataChanged() event for table = " + tableName
+ + " Cause = " + ioe.getCause());
+ LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting"
+ + " schema change nodes", ioe);
+
+ }
+ }
+ }
+
+ public String getSchemaChangeNodePathForTable(String tableName) {
+ return ZKUtil.joinZNode(watcher.schemaZNode, tableName);
+ }
+
+ /**
+ * Used only for tests. Do not use this. See TestInstantSchemaChange for more details
+ * on how this is getting used. This is primarily used to delay the schema complete
+ * processing by master so that we can test some complex scenarios such as
+ * master failover.
+ * @param sleepTimeMillis
+ */
+ public void setSleepTimeMillis(int sleepTimeMillis) {
+ this.sleepTimeMillis = sleepTimeMillis;
+ }
+
+ private String getSchemaChangeNodePathForTableAndServer(
+ String tableName, String regionServerName) {
+ return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName),
+ regionServerName);
+ }
+
+
+ /**
+ * Holds the current alter state for a table. Alter state includes the
+ * current alter status (INPROCESS, FAILURE or SUCCESS (success is not getting
+ * used now.), timestamp of alter request, number of hosts online at the time
+ * of alter request, number of online regions to process for the schema change
+ * request, number of processed regions and a list of region servers that
+ * actually processed the schema change request.
+ *
+ * Master keeps track of schema change requests using the alter status and
+ * periodically updates the alter status based on RS cloud processings.
+ */
+ public static class MasterAlterStatus implements Writable {
+
+ public enum AlterState {
+ INPROCESS, // Inprocess alter
+ SUCCESS, // completed alter
+ FAILURE // failure alter
+ }
+
+ private AlterState currentAlterStatus;
+ // TimeStamp
+ private long stamp;
+ private int numberOfRegionsToProcess;
+ private StringBuffer errorCause = new StringBuffer(" ");
+ private StringBuffer processedHosts = new StringBuffer(" ");
+ private String hostsToProcess;
+ private int numberOfRegionsProcessed = 0;
+
+ public MasterAlterStatus() {
+
+ }
+
+ public MasterAlterStatus(int numberOfRegions, String activeHosts) {
+ this.numberOfRegionsToProcess = numberOfRegions;
+ this.stamp = System.currentTimeMillis();
+ this.currentAlterStatus = AlterState.INPROCESS;
+ //this.rsToProcess = activeHosts;
+ this.hostsToProcess = activeHosts;
+ }
+
+ public AlterState getCurrentAlterStatus() {
+ return currentAlterStatus;
+ }
+
+ public void setCurrentAlterStatus(AlterState currentAlterStatus) {
+ this.currentAlterStatus = currentAlterStatus;
+ }
+
+ public long getStamp() {
+ return stamp;
+ }
+
+ public void setStamp(long stamp) {
+ this.stamp = stamp;
+ }
+
+ public int getNumberOfRegionsToProcess() {
+ return numberOfRegionsToProcess;
+ }
+
+ public void setNumberOfRegionsToProcess(int numberOfRegionsToProcess) {
+ this.numberOfRegionsToProcess = numberOfRegionsToProcess;
+ }
+
+ public int getNumberOfRegionsProcessed() {
+ return numberOfRegionsProcessed;
+ }
+
+ public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) {
+ this.numberOfRegionsProcessed += numberOfRegionsProcessed;
+ }
+
+ public String getHostsToProcess() {
+ return hostsToProcess;
+ }
+
+ public void setHostsToProcess(String hostsToProcess) {
+ this.hostsToProcess = hostsToProcess;
+ }
+
+ public String getErrorCause() {
+ return errorCause == null ? null : errorCause.toString();
+ }
+
+ public void setErrorCause(String errorCause) {
+ if (errorCause == null || errorCause.trim().length() <= 0) {
+ return;
+ }
+ if (this.errorCause == null) {
+ this.errorCause = new StringBuffer(errorCause);
+ } else {
+ this.errorCause.append(errorCause);
+ }
+ }
+
+ public String getProcessedHosts() {
+ return processedHosts.toString();
+ }
+
+ public void setProcessedHosts(String processedHosts) {
+ if (this.processedHosts == null) {
+ this.processedHosts = new StringBuffer(processedHosts);
+ } else {
+ this.processedHosts.append(" ").append(processedHosts);
+ }
+ }
+
+ /**
+ * Ignore or exempt a RS from schema change processing.
+ * Master will tweak the number of regions to process based on the
+ * number of online regions on the target RS and also removes the
+ * RS from list of hosts to process.
+ * @param schemaAlterStatus
+ */
+ private void ignoreRSForSchemaChange(
+ SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) {
+ LOG.debug("Removing RS " + schemaAlterStatus.getHostName()
+ + " from schema change process.");
+ hostsToProcess =
+ new String(hostsToProcess).replaceAll(schemaAlterStatus.getHostName(), "");
+ int ignoreRegionsCount = schemaAlterStatus.getNumberOfOnlineRegions();
+ LOG.debug("Current number of regions processed = "
+ + this.numberOfRegionsProcessed + " deducting ignored = "
+ + ignoreRegionsCount
+ + " final = " + (this.numberOfRegionsToProcess-ignoreRegionsCount));
+ if (this.numberOfRegionsToProcess > 0) {
+ this.numberOfRegionsToProcess -= ignoreRegionsCount;
+ } else {
+ LOG.debug("Number of regions to process is less than zero. This is odd");
+ }
+ }
+
+ /**
+ * Update the master alter status for this table based on RS alter status.
+ * @param schemaAlterStatus
+ */
+ public void update(SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) {
+ this.setProcessedHosts(schemaAlterStatus.getHostName());
+ SchemaChangeTracker.SchemaAlterStatus.AlterState rsState =
+ schemaAlterStatus.getCurrentAlterStatus();
+ switch(rsState) {
+ case FAILURE:
+ LOG.debug("Schema update failure Status = "
+ + schemaAlterStatus);
+ this.setCurrentAlterStatus(
+ MasterSchemaChangeTracker.MasterAlterStatus.AlterState.FAILURE);
+ this.setNumberOfRegionsProcessed(
+ schemaAlterStatus.getNumberOfRegionsProcessed());
+ this.setErrorCause(schemaAlterStatus.getErrorCause());
+ break;
+ case SUCCESS:
+ LOG.debug("Schema update SUCCESS Status = "
+ + schemaAlterStatus);
+ this.setNumberOfRegionsProcessed(schemaAlterStatus.getNumberOfRegionsProcessed());
+ this.setCurrentAlterStatus(MasterSchemaChangeTracker.MasterAlterStatus.AlterState.SUCCESS);
+ break;
+ case IGNORED:
+ LOG.debug("Schema update IGNORED Updating regions to " +
+ "process count. Status = "+ schemaAlterStatus);
+ ignoreRSForSchemaChange(schemaAlterStatus);
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ currentAlterStatus = AlterState.valueOf(in.readUTF());
+ stamp = in.readLong();
+ numberOfRegionsToProcess = in.readInt();
+ hostsToProcess = Bytes.toString(Bytes.readByteArray(in));
+ processedHosts = new StringBuffer(Bytes.toString(Bytes.readByteArray(in)));
+ errorCause = new StringBuffer(Bytes.toString(Bytes.readByteArray(in)));
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(currentAlterStatus.name());
+ out.writeLong(stamp);
+ out.writeInt(numberOfRegionsToProcess);
+ Bytes.writeByteArray(out, Bytes.toBytes(hostsToProcess));
+ Bytes.writeByteArray(out, Bytes.toBytes(processedHosts.toString()));
+ Bytes.writeByteArray(out, Bytes.toBytes(errorCause.toString()));
+ }
+
+ @Override
+ public String toString() {
+ return
+ " state= " + currentAlterStatus
+ + ", ts= " + stamp
+ + ", number of regions to process = " + numberOfRegionsToProcess
+ + ", number of regions processed = " + numberOfRegionsProcessed
+ + ", hosts = " + hostsToProcess
+ + " , processed hosts = " + processedHosts
+ + " , errorCause = " + errorCause;
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision )
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision )
@@ -0,0 +1,452 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.zookeeper.KeeperException;
+import org.apache.hadoop.hbase.util.Writables;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.*;
+import java.util.List;
+
+/**
+ * Region server schema change tracker. RS uses this tracker to keep track of
+ * alter schema requests from master and updates the status once the schema change
+ * is complete.
+ */
+public class SchemaChangeTracker extends ZooKeeperNodeTracker {
+ public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class);
+ private RegionServerServices regionServer = null;
+ private volatile int sleepTimeMillis = 0;
+
+
+ /**
+ * Constructs a new ZK node tracker.
+ *
+ * After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param node
+ * @param abortable
+ */
+ public SchemaChangeTracker(ZooKeeperWatcher watcher,
+ Abortable abortable,
+ RegionServerServices regionServer) {
+ super(watcher, watcher.schemaZNode, abortable);
+ this.regionServer = regionServer;
+ }
+
+ @Override
+ public void start() {
+ try {
+ watcher.registerListener(this);
+ ZKUtil.listChildrenAndWatchThem(watcher, node);
+ // Clean-up old in-process schema changes for this RS now?
+ } catch (KeeperException e) {
+ LOG.error("RegionServer SchemaChangeTracker startup failed with " +
+ "KeeperException.", e);
+ }
+ }
+
+
+ /**
+ * This event will be triggered whenever new schema change request is processed by the
+ * master. The path will be of the format /hbase/schema/
+ * @param path full path of the node whose children have changed
+ */
+ @Override
+ public void nodeChildrenChanged(String path) {
+ LOG.debug("NodeChildrenChanged. Path = " + path);
+ if (path.equals(watcher.schemaZNode)) {
+ try {
+ List tables =
+ ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode);
+ LOG.debug("RS.SchemaChangeTracker: " +
+ "Current list of tables with schema change = " + tables);
+ if (tables != null) {
+ handleSchemaChange(tables);
+ } else {
+ LOG.error("No tables found for schema change event." +
+ " Skipping instant schema refresh");
+ }
+ } catch (KeeperException ke) {
+ String errmsg = "KeeperException while handling nodeChildrenChanged for path = "
+ + path + " Cause = " + ke.getCause();
+ LOG.error(errmsg, ke);
+ TaskMonitor.get().createStatus(errmsg);
+ }
+ }
+ }
+
+ private void handleSchemaChange(List tables) {
+ for (String tableName : tables) {
+ if (tableName != null) {
+ LOG.debug("Processing schema change with status for table = " + tableName);
+ handleSchemaChange(tableName);
+ }
+ }
+ }
+
+ private void handleSchemaChange(String tableName) {
+ int refreshedRegionsCount = 0, onlineRegionsCount = 0;
+ MonitoredTask status = null;
+ try {
+ List onlineRegions =
+ regionServer.getOnlineRegions(Bytes.toBytes(tableName));
+ if (onlineRegions != null && !onlineRegions.isEmpty()) {
+ status = TaskMonitor.get().createStatus("Region server "
+ + regionServer.getServerName().getServerName()
+ + " handling schema change for table = " + tableName
+ + " number of online regions = " + onlineRegions.size());
+ onlineRegionsCount = onlineRegions.size();
+ createStateNode(tableName, onlineRegions.size());
+ for (HRegion hRegion : onlineRegions) {
+ regionServer.refreshRegion(hRegion);
+ refreshedRegionsCount ++;
+ }
+ SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName);
+ alterStatus.update(SchemaAlterStatus.AlterState.SUCCESS, refreshedRegionsCount);
+ updateSchemaChangeStatus(tableName, alterStatus);
+ String msg = "Refresh schema completed for table name = " + tableName
+ + " server = " + regionServer.getServerName().getServerName()
+ + " online Regions = " + onlineRegions.size()
+ + " refreshed Regions = " + refreshedRegionsCount;
+ LOG.debug(msg);
+ status.setStatus(msg);
+ } else {
+ LOG.debug("Server " + regionServer.getServerName().getServerName()
+ + " has no online regions for table = " + tableName
+ + " Ignoring the schema change request");
+ }
+ } catch (IOException ioe) {
+ reportAndLogSchemaRefreshError(tableName, onlineRegionsCount,
+ refreshedRegionsCount, ioe, status);
+ } catch (KeeperException ke) {
+ reportAndLogSchemaRefreshError(tableName, onlineRegionsCount,
+ refreshedRegionsCount, ke, status);
+ }
+ }
+
+ private int getZKNodeVersion(String nodePath) throws KeeperException {
+ return ZKUtil.checkExists(this.watcher, nodePath);
+ }
+
+ private void reportAndLogSchemaRefreshError(String tableName,
+ int onlineRegionsCount,
+ int refreshedRegionsCount,
+ Throwable exception,
+ MonitoredTask status) {
+ try {
+ String errmsg =
+ " Region Server " + regionServer.getServerName().getServerName()
+ + " failed during schema change process. Cause = "
+ + exception.getCause()
+ + " Number of onlineRegions = " + onlineRegionsCount
+ + " Processed regions = " + refreshedRegionsCount;
+ SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName);
+ alterStatus.update(SchemaAlterStatus.AlterState.FAILURE,
+ refreshedRegionsCount, errmsg);
+ String nodePath = getSchemaChangeNodePathForTableAndServer(tableName,
+ regionServer.getServerName().getServerName());
+ ZKUtil.updateExistingNodeData(this.watcher, nodePath,
+ Writables.getBytes(alterStatus), getZKNodeVersion(nodePath));
+ LOG.info("reportAndLogSchemaRefreshError() " +
+ " Updated child ZKNode with SchemaAlterStatus = "
+ + alterStatus + " for table = " + tableName);
+ if (status == null) {
+ status = TaskMonitor.get().createStatus(errmsg);
+ } else {
+ status.setStatus(errmsg);
+ }
+ } catch (KeeperException e) {
+ // Retry ?
+ String errmsg = "KeeperException while updating the schema change node with "
+ + "error status for table = "
+ + tableName + " server = "
+ + regionServer.getServerName().getServerName()
+ + " Cause = " + e.getCause();
+ LOG.error(errmsg, e);
+ TaskMonitor.get().createStatus(errmsg);
+ } catch(IOException ioe) {
+ // retry ??
+ String errmsg = "IOException while updating the schema change node with "
+ + "server name for table = "
+ + tableName + " server = "
+ + regionServer.getServerName().getServerName()
+ + " Cause = " + ioe.getCause();
+ TaskMonitor.get().createStatus(errmsg);
+ LOG.error(errmsg, ioe);
+ }
+ }
+
+
+ private void createStateNode(String tableName, int numberOfOnlineRegions)
+ throws IOException {
+ SchemaAlterStatus sas =
+ new SchemaAlterStatus(regionServer.getServerName().getServerName(),
+ numberOfOnlineRegions);
+ LOG.debug("Creating Schema Alter State node = " + sas);
+ try {
+ ZKUtil.createSetData(this.watcher,
+ getSchemaChangeNodePathForTableAndServer(tableName,
+ regionServer.getServerName().getServerName()),
+ Writables.getBytes(sas));
+ } catch (KeeperException ke) {
+ String errmsg = "KeeperException while creating the schema change node with "
+ + "server name for table = "
+ + tableName + " server = "
+ + regionServer.getServerName().getServerName()
+ + " Message = " + ke.getCause();
+ LOG.error(errmsg, ke);
+ TaskMonitor.get().createStatus(errmsg);
+ }
+
+ }
+
+ private SchemaAlterStatus getSchemaAlterStatus(String tableName)
+ throws KeeperException, IOException {
+ byte[] statusBytes = ZKUtil.getData(this.watcher,
+ getSchemaChangeNodePathForTableAndServer(tableName,
+ regionServer.getServerName().getServerName()));
+ if (statusBytes == null || statusBytes.length <= 0) {
+ return null;
+ }
+ SchemaAlterStatus sas = new SchemaAlterStatus();
+ Writables.getWritable(statusBytes, sas);
+ return sas;
+ }
+
+ private void updateSchemaChangeStatus(String tableName,
+ SchemaAlterStatus schemaAlterStatus)
+ throws KeeperException, IOException {
+ try {
+ if(sleepTimeMillis > 0) {
+ try {
+ LOG.debug("SchemaChangeTracker sleeping for "
+ + sleepTimeMillis);
+ Thread.sleep(sleepTimeMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ ZKUtil.updateExistingNodeData(this.watcher,
+ getSchemaChangeNodePathForTableAndServer(tableName,
+ regionServer.getServerName().getServerName()),
+ Writables.getBytes(schemaAlterStatus), -1);
+ String msg = "Schema change tracker completed for table = " + tableName
+ + " status = " + schemaAlterStatus;
+ LOG.debug(msg);
+ TaskMonitor.get().createStatus(msg);
+ } catch (KeeperException.NoNodeException e) {
+ String errmsg = "KeeperException.NoNodeException while updating the schema "
+ + "change node with server name for table = "
+ + tableName + " server = "
+ + regionServer.getServerName().getServerName()
+ + " Cause = " + e.getCause();
+ TaskMonitor.get().createStatus(errmsg);
+ LOG.error(errmsg, e);
+ } catch (KeeperException e) {
+ // Retry ?
+ String errmsg = "KeeperException while updating the schema change node with "
+ + "server name for table = "
+ + tableName + " server = "
+ + regionServer.getServerName().getServerName()
+ + " Cause = " + e.getCause();
+ LOG.error(errmsg, e);
+ TaskMonitor.get().createStatus(errmsg);
+ } catch(IOException ioe) {
+ String errmsg = "IOException while updating the schema change node with "
+ + "server name for table = "
+ + tableName + " server = "
+ + regionServer.getServerName().getServerName()
+ + " Cause = " + ioe.getCause();
+ LOG.error(errmsg, ioe);
+ TaskMonitor.get().createStatus(errmsg);
+ }
+ }
+
+ private String getSchemaChangeNodePathForTable(String tableName) {
+ return ZKUtil.joinZNode(watcher.schemaZNode, tableName);
+ }
+
+ private String getSchemaChangeNodePathForTableAndServer(
+ String tableName, String regionServerName) {
+ return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName),
+ regionServerName);
+ }
+
+ public int getSleepTimeMillis() {
+ return sleepTimeMillis;
+ }
+
+ /**
+ * Set a sleep time in millis before this RS can update it's progress status.
+ * Used only for test cases to test complex test scenarios such as RS failures and
+ * RS exemption handling.
+ * @param sleepTimeMillis
+ */
+ public void setSleepTimeMillis(int sleepTimeMillis) {
+ this.sleepTimeMillis = sleepTimeMillis;
+ }
+
+
+ /**
+ * Holds the current alter state for a table. Alter state includes the
+ * current alter status (INPROCESS, FAILURE, SUCCESS, or IGNORED, current RS
+ * host name, timestamp of alter request, number of online regions this RS has for
+ * the given table, number of processed regions and an errorCause in case
+ * if the RS failed during the schema change process.
+ *
+ * RS keeps track of schema change requests per table using the alter status and
+ * periodically updates the alter status based on schema change status.
+ */
+ public static class SchemaAlterStatus implements Writable {
+
+ public enum AlterState {
+ INPROCESS, // Inprocess alter
+ SUCCESS, // completed alter
+ FAILURE, // failure alter
+ IGNORED // Ignore the alter processing.
+ }
+
+ private AlterState currentAlterStatus;
+ // TimeStamp
+ private long stamp;
+ private int numberOfOnlineRegions;
+ private String errorCause = " ";
+ private String hostName;
+ private int numberOfRegionsProcessed = 0;
+
+ public SchemaAlterStatus() {
+
+ }
+
+ public SchemaAlterStatus(String hostName, int numberOfOnlineRegions) {
+ this.numberOfOnlineRegions = numberOfOnlineRegions;
+ this.stamp = System.currentTimeMillis();
+ this.currentAlterStatus = AlterState.INPROCESS;
+ //this.rsToProcess = activeHosts;
+ this.hostName = hostName;
+ }
+
+ public AlterState getCurrentAlterStatus() {
+ return currentAlterStatus;
+ }
+
+ public void setCurrentAlterStatus(AlterState currentAlterStatus) {
+ this.currentAlterStatus = currentAlterStatus;
+ }
+
+ public int getNumberOfOnlineRegions() {
+ return numberOfOnlineRegions;
+ }
+
+ public void setNumberOfOnlineRegions(int numberOfRegions) {
+ this.numberOfOnlineRegions = numberOfRegions;
+ }
+
+ public int getNumberOfRegionsProcessed() {
+ return numberOfRegionsProcessed;
+ }
+
+ public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) {
+ this.numberOfRegionsProcessed = numberOfRegionsProcessed;
+ }
+
+ public String getErrorCause() {
+ return errorCause;
+ }
+
+ public void setErrorCause(String errorCause) {
+ this.errorCause = errorCause;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public void update(AlterState state, int numberOfRegions, String errorCause) {
+ this.currentAlterStatus = state;
+ this.numberOfRegionsProcessed = numberOfRegions;
+ this.errorCause = errorCause;
+ }
+
+ public void update(AlterState state, int numberOfRegions) {
+ this.currentAlterStatus = state;
+ this.numberOfRegionsProcessed = numberOfRegions;
+ }
+
+ public void update(AlterState state) {
+ this.currentAlterStatus = state;
+ }
+
+ public void update(SchemaAlterStatus status) {
+ this.currentAlterStatus = status.getCurrentAlterStatus();
+ this.numberOfRegionsProcessed = status.getNumberOfRegionsProcessed();
+ this.errorCause = status.getErrorCause();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ currentAlterStatus = AlterState.valueOf(in.readUTF());
+ stamp = in.readLong();
+ numberOfOnlineRegions = in.readInt();
+ hostName = Bytes.toString(Bytes.readByteArray(in));
+ numberOfRegionsProcessed = in.readInt();
+ errorCause = Bytes.toString(Bytes.readByteArray(in));
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(currentAlterStatus.name());
+ out.writeLong(stamp);
+ out.writeInt(numberOfOnlineRegions);
+ Bytes.writeByteArray(out, Bytes.toBytes(hostName));
+ out.writeInt(numberOfRegionsProcessed);
+ Bytes.writeByteArray(out, Bytes.toBytes(errorCause));
+ }
+
+ @Override
+ public String toString() {
+ return
+ " state= " + currentAlterStatus
+ + ", ts= " + stamp
+ + ", number of online regions = " + numberOfOnlineRegions
+ + ", host= " + hostName + " processed regions = " + numberOfRegionsProcessed
+ + ", errorCause = " + errorCause;
+ }
+ }
+
+}
Index: src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java (revision )
+++ src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java (revision )
@@ -0,0 +1,324 @@
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+public class TestInstantSchemaChangeFailover {
+
+ final Log LOG = LogFactory.getLog(getClass());
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private HBaseAdmin admin;
+ private static MiniHBaseCluster miniHBaseCluster = null;
+ private Configuration conf;
+ private ZooKeeperWatcher zkw;
+ private static MasterSchemaChangeTracker msct = null;
+
+ private final byte [] row = Bytes.toBytes("row");
+ private final byte [] qualifier = Bytes.toBytes("qualifier");
+ final byte [] value = Bytes.toBytes("value");
+ // do not change this count of RS
+ private static final int NUMBER_OF_REGION_SERVERS = 5;
+ // do not change this count of masters.
+ private static final int NUMBER_OF_MASTERS = 3;
+
+
+
+ @Before
+ public void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true);
+ TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000);
+ TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000);
+ //
+ miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5);
+ msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+
+ }
+
+ @After
+ public void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * This a pretty low cost signalling mechanism. It is quite possible that we will
+ * miss out the ZK node creation signal as in some cases the schema change process
+ * happens rather quickly and our thread waiting for ZK node creation might wait forver.
+ * The fool-proof strategy would be to directly listen for ZK events.
+ * @param tableName
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private void waitForSchemaChangeProcess(final String tableName)
+ throws KeeperException, InterruptedException {
+ LOG.info("Waiting for ZK node creation for table = " + tableName);
+ final MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ final Runnable r = new Runnable() {
+ public void run() {
+ try {
+ while(!msct.doesSchemaChangeNodeExists(tableName)) {
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (KeeperException ke) {
+ ke.printStackTrace();
+ }
+
+ LOG.info("Waiting for ZK node deletion for table = " + tableName);
+ try {
+ while(msct.doesSchemaChangeNodeExists(tableName)) {
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (KeeperException ke) {
+ ke.printStackTrace();
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ t.join(10000);
+ }
+
+
+ /**
+ * Kill a random RS and see that the schema change can succeed.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test (timeout=50000)
+ public void testInstantSchemaChangeWhileRSCrash() throws IOException,
+ KeeperException, InterruptedException {
+ LOG.info("Start testInstantSchemaChangeWhileRSCrash()");
+ zkw = miniHBaseCluster.getMaster().getZooKeeperWatcher();
+
+ final String tableName = "TestRSCrashDuringSchemaChange";
+ HTable ht = createTableAndValidate(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor("family2");
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+
+ miniHBaseCluster.getRegionServer(0).abort("Killing while instant schema change");
+ // Let the dust settle down
+ Thread.sleep(10000);
+ waitForSchemaChangeProcess(tableName);
+ Put put2 = new Put(row);
+ put2.add(Bytes.toBytes("family2"), qualifier, value);
+ ht.put(put2);
+
+ Get get2 = new Get(row);
+ get2.addColumn(Bytes.toBytes("family2"), qualifier);
+ Result r2 = ht.get(get2);
+ byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
+ int result2 = Bytes.compareTo(value, tvalue2);
+ assertEquals(result2, 0);
+ String nodePath = msct.getSchemaChangeNodePathForTable("TestRSCrashDuringSchemaChange");
+ assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1);
+ LOG.info("result2 = " + result2);
+ LOG.info("end testInstantSchemaChangeWhileRSCrash()");
+ }
+
+ /**
+ * Randomly bring down/up RS servers while schema change is in progress. This test
+ * is same as the above one but the only difference is that we intent to kill and start
+ * new RS instances while a schema change is in progress.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test (timeout=70000)
+ public void testInstantSchemaChangeWhileRandomRSCrashAndStart() throws IOException,
+ KeeperException, InterruptedException {
+ LOG.info("Start testInstantSchemaChangeWhileRandomRSCrashAndStart()");
+ miniHBaseCluster.getRegionServer(4).abort("Killing RS 4");
+ // Start a new RS before schema change .
+ // Commenting the start RS as it is failing with DFS user permission NPE.
+ //miniHBaseCluster.startRegionServer();
+
+ // Let the dust settle
+ Thread.sleep(10000);
+ final String tableName = "testInstantSchemaChangeWhileRandomRSCrashAndStart";
+ HTable ht = createTableAndValidate(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor("family2");
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+ // Kill 2 RS now.
+ miniHBaseCluster.getRegionServer(2).abort("Killing RS 2");
+ // Let the dust settle
+ Thread.sleep(10000);
+ // We will be left with only one RS.
+ waitForSchemaChangeProcess(tableName);
+ assertFalse(msct.doesSchemaChangeNodeExists(tableName));
+ Put put2 = new Put(row);
+ put2.add(Bytes.toBytes("family2"), qualifier, value);
+ ht.put(put2);
+
+ Get get2 = new Get(row);
+ get2.addColumn(Bytes.toBytes("family2"), qualifier);
+ Result r2 = ht.get(get2);
+ byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
+ int result2 = Bytes.compareTo(value, tvalue2);
+ assertEquals(result2, 0);
+ LOG.info("result2 = " + result2);
+ LOG.info("end testInstantSchemaChangeWhileRandomRSCrashAndStart()");
+ }
+
+ /**
+ * Test scenario where primary master is brought down while processing an
+ * alter request. This is harder one as it is very difficult the time this.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+
+ @Test (timeout=50000)
+ public void testInstantSchemaChangeWhileMasterFailover() throws IOException,
+ KeeperException, InterruptedException {
+ LOG.info("Start testInstantSchemaChangeWhileMasterFailover()");
+ //Thread.sleep(5000);
+
+ final String tableName = "testInstantSchemaChangeWhileMasterFailover";
+ HTable ht = createTableAndValidate(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor("family2");
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+ // Kill primary master now.
+ Thread.sleep(50);
+ miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception"));
+
+ // It may not be possible for us to check the schema change status
+ // using waitForSchemaChangeProcess as our ZK session in MasterSchemachangeTracker will be
+ // lost when master dies and hence may not be accurate. So relying on old-fashioned
+ // sleep here.
+ Thread.sleep(25000);
+ Put put2 = new Put(row);
+ put2.add(Bytes.toBytes("family2"), qualifier, value);
+ ht.put(put2);
+
+ Get get2 = new Get(row);
+ get2.addColumn(Bytes.toBytes("family2"), qualifier);
+ Result r2 = ht.get(get2);
+ byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
+ int result2 = Bytes.compareTo(value, tvalue2);
+ assertEquals(result2, 0);
+ LOG.info("result2 = " + result2);
+ LOG.info("end testInstantSchemaChangeWhileMasterFailover()");
+ }
+
+ /**
+ * TEst the master fail over during a schema change request in ZK.
+ * We create a fake schema change request in ZK and abort the primary master
+ * mid-flight to simulate a master fail over scenario during a mid-flight
+ * schema change process. The new master's schema janitor will eventually
+ * cleanup this fake request after time out.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testInstantSchemaOperationsInZKForMasterFailover() throws IOException,
+ KeeperException, InterruptedException {
+ LOG.info("testInstantSchemaOperationsInZKForMasterFailover() ");
+ String tableName = "testInstantSchemaOperationsInZKForMasterFailover";
+
+ conf = TEST_UTIL.getConfiguration();
+ MasterSchemaChangeTracker activesct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ activesct.createSchemaChangeNode(tableName, 10);
+ LOG.debug(activesct.getSchemaChangeNodePathForTable(tableName)
+ + " created");
+ assertTrue(activesct.doesSchemaChangeNodeExists(tableName));
+ // Kill primary master now.
+ miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception"));
+ // wait for 50 secs. This is so that our schema janitor from fail-over master will kick-in and
+ // cleanup this failed/expired schema change request.
+ Thread.sleep(50000);
+ MasterSchemaChangeTracker newmsct = miniHBaseCluster.getMaster().getSchemaChangeTracker();
+ assertFalse(newmsct.doesSchemaChangeNodeExists(tableName));
+ LOG.debug(newmsct.getSchemaChangeNodePathForTable(tableName)
+ + " deleted");
+ LOG.info("END testInstantSchemaOperationsInZKForMasterFailover() ");
+ }
+
+ private HTable createTableAndValidate(String tableName) throws IOException {
+ conf = TEST_UTIL.getConfiguration();
+ LOG.info("Start createTableAndValidate()");
+ HTableDescriptor[] tables = admin.listTables();
+ int numTables = 0;
+ if (tables != null) {
+ numTables = tables.length;
+ }
+ HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
+ HConstants.CATALOG_FAMILY);
+ tables = this.admin.listTables();
+ assertEquals(numTables + 1, tables.length);
+ LOG.info("created table = " + tableName);
+ return ht;
+ }
+
+}
+
+
Index: src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision )
+++ src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision )
@@ -0,0 +1,714 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class TestInstantSchemaChange {
+
+ final Log LOG = LogFactory.getLog(getClass());
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private HBaseAdmin admin;
+ private static MiniHBaseCluster miniHBaseCluster = null;
+ private Configuration conf;
+ private ZooKeeperWatcher zkw;
+ private static MasterSchemaChangeTracker msct = null;
+
+
+ private final byte [] row = Bytes.toBytes("row");
+ private final byte [] qualifier = Bytes.toBytes("qualifier");
+ final byte [] value = Bytes.toBytes("value");
+
+ @Before
+ public void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+ TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true);
+ TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000);
+ TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000);
+ //
+ miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5);
+ msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+
+ }
+
+ @After
+ public void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Find the RS that is currently holding our online region.
+ * @param tableName
+ * @return
+ */
+ private HRegionServer findRSWithOnlineRegionFor(String tableName) {
+ List rsThreads =
+ miniHBaseCluster.getLiveRegionServerThreads();
+ for (JVMClusterUtil.RegionServerThread rsT : rsThreads) {
+ HRegionServer rs = rsT.getRegionServer();
+ List regions = rs.getOnlineRegions(Bytes.toBytes(tableName));
+ if (regions != null && !regions.isEmpty()) {
+ return rs;
+ }
+ }
+ return null;
+ }
+
+ @Test
+ public void testThis() {
+ assertTrue(true);
+
+ }
+
+ @Test
+ public void testInstantSchemaChangeForModifyTable() throws IOException,
+ KeeperException, InterruptedException {
+
+ String tableName = "testInstantSchemaChangeForModifyTable";
+ conf = TEST_UTIL.getConfiguration();
+ LOG.info("Start testInstantSchemaChangeForModifyTable()");
+ HTable ht = createTableAndValidate(tableName);
+
+ String newFamily = "newFamily";
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(newFamily));
+
+ admin.modifyTable(Bytes.toBytes(tableName), htd);
+ waitForSchemaChangeProcess(tableName);
+ assertFalse(msct.doesSchemaChangeNodeExists(tableName));
+
+ Put put1 = new Put(row);
+ put1.add(Bytes.toBytes(newFamily), qualifier, value);
+ ht.put(put1);
+
+ Get get1 = new Get(row);
+ get1.addColumn(Bytes.toBytes(newFamily), qualifier);
+ Result r = ht.get(get1);
+ byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
+ int result = Bytes.compareTo(value, tvalue);
+ assertEquals(result, 0);
+ LOG.info("END testInstantSchemaChangeForModifyTable()");
+
+ }
+
+ private void waitForSchemaChangeProcess(final String tableName)
+ throws KeeperException, InterruptedException {
+ waitForSchemaChangeProcess(tableName, 10000);
+ }
+
+
+ /**
+ * This a pretty low cost signalling mechanism. It is quite possible that we will
+ * miss out the ZK node creation signal as in some cases the schema change process
+ * happens rather quickly and our thread waiting for ZK node creation might wait forver.
+ * The fool-proof strategy would be to directly listen for ZK events.
+ * @param tableName
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private void waitForSchemaChangeProcess(final String tableName, final long waitTimeMills)
+ throws KeeperException, InterruptedException {
+ LOG.info("Waiting for ZK node creation for table = " + tableName);
+ final MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ final Runnable r = new Runnable() {
+ public void run() {
+ try {
+ while(!msct.doesSchemaChangeNodeExists(tableName)) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (KeeperException ke) {
+ ke.printStackTrace();
+ }
+ LOG.info("Waiting for ZK node deletion for table = " + tableName);
+ try {
+ while(msct.doesSchemaChangeNodeExists(tableName)) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (KeeperException ke) {
+ ke.printStackTrace();
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ if (waitTimeMills > 0) {
+ t.join(waitTimeMills);
+ } else {
+ t.join(10000);
+ }
+ }
+
+ private HTable createTableAndValidate(String tableName) throws IOException {
+ conf = TEST_UTIL.getConfiguration();
+ LOG.info("Start createTableAndValidate()");
+ MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ HTableDescriptor[] tables = admin.listTables();
+ int numTables = 0;
+ if (tables != null) {
+ numTables = tables.length;
+ }
+ HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
+ HConstants.CATALOG_FAMILY);
+ tables = this.admin.listTables();
+ assertEquals(numTables + 1, tables.length);
+ LOG.info("created table = " + tableName);
+ return ht;
+ }
+
+ @Test
+ public void testInstantSchemaChangeForAddColumn() throws IOException,
+ KeeperException, InterruptedException {
+ LOG.info("Start testInstantSchemaChangeForAddColumn() ");
+ String tableName = "testSchemachangeForAddColumn";
+ HTable ht = createTableAndValidate(tableName);
+ String newFamily = "newFamily";
+ HColumnDescriptor hcd = new HColumnDescriptor("newFamily");
+
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+ waitForSchemaChangeProcess(tableName);
+ assertFalse(msct.doesSchemaChangeNodeExists(tableName));
+
+ Put put1 = new Put(row);
+ put1.add(Bytes.toBytes(newFamily), qualifier, value);
+ LOG.info("******** Put into new column family ");
+ ht.put(put1);
+
+ Get get1 = new Get(row);
+ get1.addColumn(Bytes.toBytes(newFamily), qualifier);
+ Result r = ht.get(get1);
+ byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
+ LOG.info(" Value put = " + value + " value from table = " + tvalue);
+ int result = Bytes.compareTo(value, tvalue);
+ assertEquals(result, 0);
+ LOG.info("End testInstantSchemaChangeForAddColumn() ");
+
+ }
+
+ @Test
+ public void testInstantSchemaChangeForModifyColumn() throws IOException,
+ KeeperException, InterruptedException {
+ LOG.info("Start testInstantSchemaChangeForModifyColumn() ");
+ String tableName = "testSchemachangeForModifyColumn";
+ HTable ht = createTableAndValidate(tableName);
+
+ HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
+ hcd.setMaxVersions(99);
+ hcd.setBlockCacheEnabled(false);
+
+ admin.modifyColumn(Bytes.toBytes(tableName), hcd);
+ waitForSchemaChangeProcess(tableName);
+ assertFalse(msct.doesSchemaChangeNodeExists(tableName));
+
+ List onlineRegions
+ = miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn"));
+ for (HRegion onlineRegion : onlineRegions) {
+ HTableDescriptor htd = onlineRegion.getTableDesc();
+ HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY);
+ assertTrue(tableHcd.isBlockCacheEnabled() == false);
+ assertEquals(tableHcd.getMaxVersions(), 99);
+ }
+ LOG.info("End testInstantSchemaChangeForModifyColumn() ");
+
+ }
+
+ @Test
+ public void testInstantSchemaChangeForDeleteColumn() throws IOException,
+ KeeperException, InterruptedException {
+ LOG.info("Start testInstantSchemaChangeForDeleteColumn() ");
+ String tableName = "testSchemachangeForDeleteColumn";
+ int numTables = 0;
+ HTableDescriptor[] tables = admin.listTables();
+ if (tables != null) {
+ numTables = tables.length;
+ }
+
+ byte[][] FAMILIES = new byte[][] {
+ Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") };
+
+ HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
+ FAMILIES);
+ tables = this.admin.listTables();
+ assertEquals(numTables + 1, tables.length);
+ LOG.info("Table testSchemachangeForDeleteColumn created");
+
+ admin.deleteColumn(tableName, "C");
+
+ waitForSchemaChangeProcess(tableName);
+ assertFalse(msct.doesSchemaChangeNodeExists(tableName));
+ HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(Bytes.toBytes(tableName));
+ HColumnDescriptor hcd = modifiedHtd.getFamily(Bytes.toBytes("C"));
+ assertTrue(hcd == null);
+ LOG.info("End testInstantSchemaChangeForDeleteColumn() ");
+ }
+
+ @Test
+ public void testInstantSchemaChangeWhenTableIsNotEnabled() throws IOException,
+ KeeperException {
+ final String tableName = "testInstantSchemaChangeWhenTableIsDisabled";
+ conf = TEST_UTIL.getConfiguration();
+ LOG.info("Start testInstantSchemaChangeWhenTableIsDisabled()");
+ HTable ht = createTableAndValidate(tableName);
+ // Disable table
+ admin.disableTable("testInstantSchemaChangeWhenTableIsDisabled");
+ // perform schema changes
+ HColumnDescriptor hcd = new HColumnDescriptor("newFamily");
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+ MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ assertTrue(msct.doesSchemaChangeNodeExists(tableName) == false);
+ }
+
+ /**
+ * Test that when concurrent alter requests are received for a table we don't miss any.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testConcurrentInstantSchemaChangeForAddColumn() throws IOException,
+ KeeperException, InterruptedException {
+ final String tableName = "testConcurrentInstantSchemaChangeForModifyTable";
+ conf = TEST_UTIL.getConfiguration();
+ LOG.info("Start testConcurrentInstantSchemaChangeForModifyTable()");
+ HTable ht = createTableAndValidate(tableName);
+
+ Runnable run1 = new Runnable() {
+ public void run() {
+ HColumnDescriptor hcd = new HColumnDescriptor("family1");
+ try {
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+
+ }
+ }
+ };
+ Runnable run2 = new Runnable() {
+ public void run() {
+ HColumnDescriptor hcd = new HColumnDescriptor("family2");
+ try {
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+
+ }
+ }
+ };
+
+ run1.run();
+ // We have to add a sleep here as in concurrent scenarios the HTD update
+ // in HDFS fails and returns with null HTD. This needs to be investigated,
+ // but it doesn't impact the instant alter functionality in any way.
+ Thread.sleep(100);
+ run2.run();
+
+ waitForSchemaChangeProcess(tableName);
+
+ Put put1 = new Put(row);
+ put1.add(Bytes.toBytes("family1"), qualifier, value);
+ ht.put(put1);
+
+ Get get1 = new Get(row);
+ get1.addColumn(Bytes.toBytes("family1"), qualifier);
+ Result r = ht.get(get1);
+ byte[] tvalue = r.getValue(Bytes.toBytes("family1"), qualifier);
+ int result = Bytes.compareTo(value, tvalue);
+ assertEquals(result, 0);
+ Thread.sleep(10000);
+
+ Put put2 = new Put(row);
+ put2.add(Bytes.toBytes("family2"), qualifier, value);
+ ht.put(put2);
+
+ Get get2 = new Get(row);
+ get2.addColumn(Bytes.toBytes("family2"), qualifier);
+ Result r2 = ht.get(get2);
+ byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
+ int result2 = Bytes.compareTo(value, tvalue2);
+ assertEquals(result2, 0);
+ LOG.info("END testConcurrentInstantSchemaChangeForModifyTable()");
+ }
+
+ /**
+ * The schema change request blocks while a LB run is in progress. This
+ * test validates this behavior.
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ @Test
+ public void testConcurrentInstantSchemaChangeAndLoadBalancerRun() throws IOException,
+ InterruptedException, KeeperException {
+ final String tableName = "testInstantSchemaChangeWithLoadBalancerRunning";
+ conf = TEST_UTIL.getConfiguration();
+ LOG.info("Start testInstantSchemaChangeWithLoadBalancerRunning()");
+ final String newFamily = "newFamily";
+ HTable ht = createTableAndValidate(tableName);
+ final MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+
+
+ Runnable balancer = new Runnable() {
+ public void run() {
+ // run the balancer now.
+ miniHBaseCluster.getMaster().balance();
+ }
+ };
+
+ Runnable schemaChanger = new Runnable() {
+ public void run() {
+ HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
+ try {
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+
+ }
+ }
+ };
+
+ balancer.run();
+ schemaChanger.run();
+ waitForSchemaChangeProcess(tableName);
+ assertFalse(msct.doesSchemaChangeNodeExists(tableName));
+
+ Put put1 = new Put(row);
+ put1.add(Bytes.toBytes(newFamily), qualifier, value);
+ LOG.info("******** Put into new column family ");
+ ht.put(put1);
+
+ Get get1 = new Get(row);
+ get1.addColumn(Bytes.toBytes(newFamily), qualifier);
+ Result r = ht.get(get1);
+ byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
+ LOG.info(" Value put = " + value + " value from table = " + tvalue);
+ int result = Bytes.compareTo(value, tvalue);
+ assertEquals(result, 0);
+
+ LOG.info("End testInstantSchemaChangeWithLoadBalancerRunning() ");
+ }
+
+
+ /**
+ * This test validates two things. One is that the LoadBalancer does not run when a schema
+ * change process is in progress. The second thing is that it also checks that failed/expired
+ * schema changes are expired to unblock the load balancer run.
+ *
+ */
+ @Test (timeout=70000)
+ public void testLoadBalancerBlocksDuringSchemaChangeRequests() throws KeeperException,
+ IOException, InterruptedException {
+ LOG.info("Start testConcurrentLoadBalancerSchemaChangeRequests() ");
+ final MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ // Test that the load balancer does not run while an in-flight schema
+ // change operation is in progress.
+ // Simulate a new schema change request.
+ msct.createSchemaChangeNode("testLoadBalancerBlocks", 0);
+ // The schema change node is created.
+ assertTrue(msct.doesSchemaChangeNodeExists("testLoadBalancerBlocks"));
+ // Now, request an explicit LB run.
+
+ Runnable balancer1 = new Runnable() {
+ public void run() {
+ // run the balancer now.
+ miniHBaseCluster.getMaster().balance();
+ }
+ };
+ balancer1.run();
+
+ // Load balancer should not run now.
+ assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
+ LOG.debug("testConcurrentLoadBalancerSchemaChangeRequests Asserted");
+ LOG.info("End testConcurrentLoadBalancerSchemaChangeRequests() ");
+ }
+
+ /**
+ * Test that instant schema change blocks while LB is running.
+ * @throws KeeperException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=10000)
+ public void testInstantSchemaChangeBlocksDuringLoadBalancerRun() throws KeeperException,
+ IOException, InterruptedException {
+ final MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+
+ final String tableName = "testInstantSchemaChangeBlocksDuringLoadBalancerRun";
+ conf = TEST_UTIL.getConfiguration();
+ LOG.info("Start testInstantSchemaChangeBlocksDuringLoadBalancerRun()");
+ final String newFamily = "newFamily";
+ HTable ht = createTableAndValidate(tableName);
+
+ // Test that the schema change request does not run while an in-flight LB run
+ // is in progress.
+ // First, request an explicit LB run.
+
+ Runnable balancer1 = new Runnable() {
+ public void run() {
+ // run the balancer now.
+ miniHBaseCluster.getMaster().balance();
+ }
+ };
+
+ Runnable schemaChanger = new Runnable() {
+ public void run() {
+ HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
+ try {
+ admin.addColumn(Bytes.toBytes(tableName), hcd);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+
+ }
+ }
+ };
+
+ Thread t1 = new Thread(balancer1);
+ Thread t2 = new Thread(schemaChanger);
+ t1.start();
+ t2.start();
+
+ // check that they both happen concurrently
+ Runnable balancerCheck = new Runnable() {
+ public void run() {
+ // check whether balancer is running.
+ while(!miniHBaseCluster.getMaster().isLoadBalancerRunning()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ try {
+ assertFalse(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
+ } catch (KeeperException ke) {
+ ke.printStackTrace();
+ }
+ LOG.debug("Load Balancer is now running or skipped");
+ while(miniHBaseCluster.getMaster().isLoadBalancerRunning()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
+ try {
+ assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
+ } catch (KeeperException ke) {
+
+ }
+
+ }
+ };
+
+ Thread t = new Thread(balancerCheck);
+ t.start();
+ t.join(1000);
+ // Load balancer should not run now.
+ //assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
+ // Schema change request node should now exist.
+ // assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
+ LOG.debug("testInstantSchemaChangeBlocksDuringLoadBalancerRun Asserted");
+ LOG.info("End testInstantSchemaChangeBlocksDuringLoadBalancerRun() ");
+ }
+
+ /**
+ * To test the schema janitor (that it cleans expired/failed schema alter attempts) we
+ * simply create a fake table (that doesn't exist, with fake number of online regions) in ZK.
+ * This schema alter request will time out (after 30 seconds) and our janitor will clean it up.
+ * regions
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testInstantSchemaJanitor() throws IOException,
+ KeeperException, InterruptedException {
+ LOG.info("testInstantSchemaWithFailedExpiredOperations() ");
+ String fakeTableName = "testInstantSchemaWithFailedExpiredOperations";
+ MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ msct.createSchemaChangeNode(fakeTableName, 10);
+ LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName)
+ + " created");
+ Thread.sleep(40000);
+ assertFalse(msct.doesSchemaChangeNodeExists(fakeTableName));
+ LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName)
+ + " deleted");
+ LOG.info("END testInstantSchemaWithFailedExpiredOperations() ");
+ }
+
+
+ /**
+ * The objective of the following test is to validate that schema exclusions happen properly.
+ * When a RS server dies or crashes(?) mid-flight during a schema refresh, we would exclude
+ * all online regions in that RS, as well as the RS itself from schema change process.
+ *
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testInstantSchemaChangeExclusions() throws IOException,
+ KeeperException, InterruptedException {
+ MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+ LOG.info("Start testInstantSchemaChangeExclusions() ");
+ String tableName = "testInstantSchemaChangeExclusions";
+ HTable ht = createTableAndValidate(tableName);
+
+ HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
+ hcd.setMaxVersions(99);
+ hcd.setBlockCacheEnabled(false);
+
+ HRegionServer hrs = findRSWithOnlineRegionFor(tableName);
+ //miniHBaseCluster.getRegionServer(0).abort("killed for test");
+ admin.modifyColumn(Bytes.toBytes(tableName), hcd);
+ hrs.abort("Aborting for tests");
+ hrs.getSchemaChangeTracker().setSleepTimeMillis(20000);
+
+ //admin.modifyColumn(Bytes.toBytes(tableName), hcd);
+ LOG.debug("Waiting for Schema Change process to complete");
+ waitForSchemaChangeProcess(tableName, 15000);
+ assertEquals(msct.doesSchemaChangeNodeExists(tableName), false);
+ // Sleep for some time so that our region is reassigned to some other RS
+ // by master.
+ Thread.sleep(10000);
+ List onlineRegions
+ = miniHBaseCluster.getRegions(Bytes.toBytes("testInstantSchemaChangeExclusions"));
+ assertTrue(!onlineRegions.isEmpty());
+ for (HRegion onlineRegion : onlineRegions) {
+ HTableDescriptor htd = onlineRegion.getTableDesc();
+ HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY);
+ assertTrue(tableHcd.isBlockCacheEnabled() == false);
+ assertEquals(tableHcd.getMaxVersions(), 99);
+ }
+ LOG.info("End testInstantSchemaChangeExclusions() ");
+
+ }
+
+ /**
+ * This test validates that when a schema change request fails on the
+ * RS side, we appropriately register the failure in the Master Schema change
+ * tracker's node as well as capture the error cause.
+ *
+ * Currently an alter request fails if RS fails with an IO exception say due to
+ * missing or incorrect codec. With instant schema change the same failure happens
+ * and we register the failure with associated cause and also update the
+ * monitor status appropriately.
+ *
+ * The region(s) will be orphaned in both the cases.
+ *
+ */
+ @Test
+ public void testInstantSchemaChangeWhileRSOpenRegionFailure() throws IOException,
+ KeeperException, InterruptedException {
+ MasterSchemaChangeTracker msct =
+ TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
+
+ LOG.info("Start testInstantSchemaChangeWhileRSOpenRegionFailure() ");
+ String tableName = "testInstantSchemaChangeWhileRSOpenRegionFailure";
+ HTable ht = createTableAndValidate(tableName);
+
+ // create now 100 regions
+ TEST_UTIL.createMultiRegions(conf, ht,
+ HConstants.CATALOG_FAMILY, 10);
+
+ // wait for all the regions to be assigned
+ Thread.sleep(10000);
+ List onlineRegions
+ = miniHBaseCluster.getRegions(
+ Bytes.toBytes("testInstantSchemaChangeWhileRSOpenRegionFailure"));
+ int size = onlineRegions.size();
+ // we will not have any online regions
+ LOG.info("Size of online regions = " + onlineRegions.size());
+
+ HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
+ hcd.setMaxVersions(99);
+ hcd.setBlockCacheEnabled(false);
+ hcd.setCompressionType(Compression.Algorithm.SNAPPY);
+
+ admin.modifyColumn(Bytes.toBytes(tableName), hcd);
+ Thread.sleep(100);
+
+ assertEquals(msct.doesSchemaChangeNodeExists(tableName), true);
+ Thread.sleep(10000);
+ // get the current alter status and validate that its failure with appropriate error msg.
+ MasterSchemaChangeTracker.MasterAlterStatus mas = msct.getMasterAlterStatus(tableName);
+ assertTrue(mas != null);
+ assertEquals(mas.getCurrentAlterStatus(),
+ MasterSchemaChangeTracker.MasterAlterStatus.AlterState.FAILURE);
+ assertTrue(mas.getErrorCause() != null);
+ LOG.info("End testInstantSchemaChangeWhileRSOpenRegionFailure() ");
+ }
+
+
+}
+
+