diff --git a/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index d25b20c..7df4c13 100644 --- a/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -305,7 +305,9 @@ public class LocalHBaseCluster { */ public HMaster getActiveMaster() { for (JVMClusterUtil.MasterThread mt : masterThreads) { - if (mt.getMaster().isActiveMaster()) { + // Ensure that the current active master is not stopped. + // We don't want to return a stopping master as an active master. + if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { return mt.getMaster(); } } diff --git a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 90c91ae..b1da154 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -141,13 +141,12 @@ public abstract class EventHandler implements Runnable, Comparable { * Constructor */ EventType(int value) {} - public boolean isOnlineSchemaChangeSupported() { + public boolean isSchemaChangeEvent() { return ( - this.equals(EventType.C_M_ADD_FAMILY) || - this.equals(EventType.C_M_DELETE_FAMILY) || - this.equals(EventType.C_M_MODIFY_FAMILY) || - this.equals(EventType.C_M_MODIFY_TABLE) - ); + this.equals(EventType.C_M_ADD_FAMILY) || + this.equals(EventType.C_M_DELETE_FAMILY) || + this.equals(EventType.C_M_MODIFY_FAMILY) || + this.equals(EventType.C_M_MODIFY_TABLE)); } } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java b/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java index 000f99c..7512c54 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java @@ -245,17 +245,17 @@ public interface HMasterInterface extends VersionedProtocol { public HTableDescriptor[] getHTableDescriptors(); /** - * Get current HTD for a given tablename - * @param tableName - * @return HTableDescriptor for the table - */ - //public HTableDescriptor getHTableDescriptor(final byte[] tableName); - - /** * Get array of HTDs for requested tables. * @param tableNames * @return array of HTableDescriptor */ public HTableDescriptor[] getHTableDescriptors(List tableNames); + /** + * Returns the current running status of load balancer. + * @return True if LoadBalancer is running now else False. + */ + public boolean isLoadBalancerRunning(); + + } diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 44846c8..4f40d1b 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -270,7 +270,7 @@ public class AssignmentManager extends ZooKeeperListener { * @throws InterruptedException */ public Pair getReopenStatus(byte[] tableName) - throws IOException, InterruptedException { + throws IOException { List hris = MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName); Integer pending = 0; diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3f8f929..9620230 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.ipc.HBaseRPC; @@ -83,10 +84,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; -import org.apache.hadoop.hbase.zookeeper.ClusterId; -import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; -import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.*; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.DNS; @@ -152,7 +150,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { private CatalogTracker catalogTracker; // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; - + + // Schema change tracker + private MasterSchemaChangeTracker schemaChangeTracker; + // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting // operations/debugging. @@ -178,11 +179,16 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { private CatalogJanitor catalogJanitorChore; private LogCleaner logCleaner; + private Thread schemaJanitorChore; private MasterCoprocessorHost cpHost; private final ServerName serverName; private TableDescriptors tableDescriptors; + + // Whether or not schema alter changes go through ZK or not. + private boolean supportInstantSchemaChanges = false; + /** * Initializes the HMaster. The steps are as follows: @@ -245,6 +251,17 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { } this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true); this.metrics = new MasterMetrics(getServerName().toString()); + // initialize instant schema change settings + this.supportInstantSchemaChanges = conf.getBoolean( + "hbase.instant.schema.alter.enabled", false); + if (supportInstantSchemaChanges) { + LOG.info("Instant schema change enabled. All schema alter operations will " + + "happen through ZK."); + } + else { + LOG.info("Instant schema change disabled. All schema alter operations will " + + "happen normally."); + } } /** @@ -373,6 +390,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); + // initialize schema change tracker + this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(), + this, this, + conf.getInt("hbase.instant.schema.alter.timeout", 60000)); + this.schemaChangeTracker.start(); + LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + @@ -483,6 +506,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.catalogJanitorChore = new CatalogJanitor(this, this); Threads.setDaemonThreadRunning(catalogJanitorChore.getThread()); + // Schema janitor chore. + this.schemaJanitorChore = getAndStartSchemaJanitorChore(this); + status.markComplete("Initialization successful"); LOG.info("Master has completed initialization"); initialized = true; @@ -566,6 +592,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return this.tableDescriptors; } + @Override + public MasterSchemaChangeTracker getSchemaChangeTracker() { + return this.schemaChangeTracker; + } + + public RegionServerTracker getRegionServerTracker() { + return this.regionServerTracker; + } + /** @return InfoServer object. Maybe null.*/ public InfoServer getInfoServer() { return this.infoServer; @@ -668,10 +703,39 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { if (this.executorService != null) this.executorService.shutdown(); } - private static Thread getAndStartBalancerChore(final HMaster master) { + /** + * Start the schema janitor. This Janitor will periodically sweep the failed/expired schema + * changes. + * @param master + * @return + */ + private Thread getAndStartSchemaJanitorChore(final HMaster master) { + String name = master.getServerName() + "-SchemaJanitorChore"; + int schemaJanitorPeriod = + master.getConfiguration().getInt("hbase.instant.schema.janitor.period", 120000); + // Start up the load balancer chore + Chore chore = new Chore(name, schemaJanitorPeriod, master) { + @Override + protected void chore() { + master.getSchemaChangeTracker().handleFailedExpiredSchemaChanges(); + } + }; + return Threads.setDaemonThreadRunning(chore.getThread()); + } + + + private Thread getAndStartBalancerChore(final HMaster master) { String name = master.getServerName() + "-BalancerChore"; int balancerPeriod = master.getConfiguration().getInt("hbase.balancer.period", 300000); + // cleanup mid-flight load balancer failures. + // If the previous master had crashed mid-flight while the balancer was running + // we will see a load balancer ZK node that needs to be cleaned. + if (doesLoadBalancerNodeExists()) { + LOG.debug("Deleting LoadBalancer ZK node. This is probably due to master crash during" + + "previous LoadBalancer run."); + deleteLoadBalancerNodeUntilDone(); + } // Start up the load balancer chore Chore chore = new Chore(name, balancerPeriod, master) { @Override @@ -689,6 +753,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { if (this.catalogJanitorChore != null) { this.catalogJanitorChore.interrupt(); } + if (this.schemaJanitorChore != null) { + this.schemaJanitorChore.interrupt(); + } + } @Override @@ -760,6 +828,67 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return balancerCutoffTime; } + + private boolean createLoadBalancerNode() { + // Wait for in-flight schema changes to complete. + // When the schema change(s) are in progress, the RS will try to close and reopen the + // impacted regions of a table. We don't want the load balancer to start while a schema change + // is currently processed by the region servers. + waitForCurrentSchemaChange(); + try { + LOG.debug("Creating the load balancer node in ZK."); + ZKUtil.createAndFailSilent(this.zooKeeper, zooKeeper.loadBalancerZNode); + } catch (KeeperException ke) { + LOG.warn("Load balancer node creation failed. This is critical as load balancer runs will not " + + "succeed."); + return false; + } + return true; + } + + private boolean deleteLoadBalancerNode() { + try { + LOG.debug("Deleting the load balancer node now"); + ZKUtil.deleteNode(this.zooKeeper, zooKeeper.loadBalancerZNode); + } catch (KeeperException ke) { + LOG.warn("Load balancer node deletion failed. This is critical as future load balancer " + + "runs will now block until this node is deleted"); + return false; + } + return true; + } + + + /** + * Check whether the Load Balancer is currently running. + * @return true if the Load balancer is currently running. + */ + public boolean isLoadBalancerRunning() { + return doesLoadBalancerNodeExists(); + } + + private boolean doesLoadBalancerNodeExists() { + try { + return ZKUtil.checkExists(this.zooKeeper, zooKeeper.loadBalancerZNode) != -1; + } catch (KeeperException ke) { + // What do we do now? Retry? or return as false? currently returning false. + LOG.debug("KeeperException while determining current load balancer node status."); + } + return false; + } + + private void waitForCurrentSchemaChange() { + while (schemaChangeTracker.isSchemaChangeInProgress()) { + try { + LOG.debug("Schema change operation is in progress. Waiting for " + + "it to complete before running the load balancer."); + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + @Override public boolean balance() { // If balance not true, don't run balancer. @@ -767,76 +896,108 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { // Do this call outside of synchronized block. int maximumBalanceTime = getBalancerCutoffTime(); long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; - boolean balancerRan; - synchronized (this.balancer) { - // Only allow one balance run at at time. - if (this.assignmentManager.isRegionsInTransition()) { - LOG.debug("Not running balancer because " + - this.assignmentManager.getRegionsInTransition().size() + - " region(s) in transition: " + - org.apache.commons.lang.StringUtils. - abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256)); - return false; - } - if (this.serverManager.areDeadServersInProgress()) { - LOG.debug("Not running balancer because processing dead regionserver(s): " + - this.serverManager.getDeadServers()); - return false; - } - - if (this.cpHost != null) { - try { - if (this.cpHost.preBalance()) { - LOG.debug("Coprocessor bypassing balancer request"); - return false; - } - } catch (IOException ioe) { - LOG.error("Error invoking master coprocessor preBalance()", ioe); + boolean balancerRan = false; + try { + synchronized (this.balancer) { + // I don't think we need the following check because we already have + // acquired the lock on balancer ?? + // If the following check is true that probably means the previous execution + // of load balancer did not complete cleanly?? + if (isLoadBalancerRunning()) { + LOG.debug("Load balancer is currently running. Skipping the current execution."); return false; } - } - - Map> assignments = - this.assignmentManager.getAssignments(); - // Returned Map from AM does not include mention of servers w/o assignments. - for (Map.Entry e: - this.serverManager.getOnlineServers().entrySet()) { - if (!assignments.containsKey(e.getKey())) { - assignments.put(e.getKey(), new ArrayList()); + // Only allow one balance run at at time. + if (this.assignmentManager.isRegionsInTransition()) { + LOG.debug("Not running balancer because " + + this.assignmentManager.getRegionsInTransition().size() + + " region(s) in transition: " + + org.apache.commons.lang.StringUtils. + abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256)); + return false; } - } - List plans = this.balancer.balanceCluster(assignments); - int rpCount = 0; // number of RegionPlans balanced so far - long totalRegPlanExecTime = 0; - balancerRan = plans != null; - if (plans != null && !plans.isEmpty()) { - for (RegionPlan plan: plans) { - LOG.info("balance " + plan); - long balStartTime = System.currentTimeMillis(); - this.assignmentManager.balance(plan); - totalRegPlanExecTime += System.currentTimeMillis()-balStartTime; - rpCount++; - if (rpCount < plans.size() && - // if performing next balance exceeds cutoff time, exit the loop - (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) { - LOG.debug("No more balancing till next balance run; maximumBalanceTime=" + - maximumBalanceTime); - break; - } + if (this.serverManager.areDeadServersInProgress()) { + LOG.debug("Not running balancer because processing dead regionserver(s): " + + this.serverManager.getDeadServers()); + return false; } - } - if (this.cpHost != null) { - try { - this.cpHost.postBalance(); - } catch (IOException ioe) { - // balancing already succeeded so don't change the result - LOG.error("Error invoking master coprocessor postBalance()", ioe); + // Should we block here until we successfully create a LB node in ZK? + if (createLoadBalancerNode()) { + LOG.debug("Load balancer node created in ZK. Proceeding with balancing"); + if (this.cpHost != null) { + try { + if (this.cpHost.preBalance()) { + LOG.debug("Coprocessor bypassing balancer request"); + return false; + } + } catch (IOException ioe) { + LOG.error("Error invoking master coprocessor preBalance()", ioe); + return false; + } + } + + Map> assignments = + this.assignmentManager.getAssignments(); + // Returned Map from AM does not include mention of servers w/o assignments. + for (Map.Entry e: + this.serverManager.getOnlineServers().entrySet()) { + if (!assignments.containsKey(e.getKey())) { + assignments.put(e.getKey(), new ArrayList()); + } + } + List plans = this.balancer.balanceCluster(assignments); + int rpCount = 0; // number of RegionPlans balanced so far + long totalRegPlanExecTime = 0; + balancerRan = plans != null; + if (plans != null && !plans.isEmpty()) { + for (RegionPlan plan: plans) { + LOG.info("balance " + plan); + long balStartTime = System.currentTimeMillis(); + this.assignmentManager.balance(plan); + totalRegPlanExecTime += System.currentTimeMillis()-balStartTime; + rpCount++; + if (rpCount < plans.size() && + // if performing next balance exceeds cutoff time, exit the loop + (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) { + LOG.debug("No more balancing till next balance run; maximumBalanceTime=" + + maximumBalanceTime); + break; + } + } + } + if (this.cpHost != null) { + try { + this.cpHost.postBalance(); + } catch (IOException ioe) { + // balancing already succeeded so don't change the result + LOG.error("Error invoking master coprocessor postBalance()", ioe); + } + } + LOG.debug("Load balancer completed processing. Deleting the " + + "load balancer node in ZK."); + } else { + LOG.warn("Load balancer did not run as ZK node creation failed."); } } + } finally { + // Delete the LB node now. We want to delete the node even if there were + // issues during the LB run. + deleteLoadBalancerNodeUntilDone(); } return balancerRan; } + private void deleteLoadBalancerNodeUntilDone() { + while (doesLoadBalancerNodeExists()) { + try { + deleteLoadBalancerNode(); + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + enum BalanceSwitchMode { SYNC, ASYNC @@ -981,22 +1142,63 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { if (cpHost != null) { cpHost.preDeleteTable(tableName); } - this.executorService.submit(new DeleteTableHandler(tableName, this, this)); + this.executorService.submit(new DeleteTableHandler(tableName, this, this, this, + supportInstantSchemaChanges)); if (cpHost != null) { cpHost.postDeleteTable(tableName); } } + /** + * Get the number of regions of the table that have been updated by the alter. + * + * @return Pair indicating the number of regions updated Pair.getFirst is the + * regions that are yet to be updated Pair.getSecond is the total number + * of regions of the table + */ public Pair getAlterStatus(byte[] tableName) throws IOException { + if (supportInstantSchemaChanges) { + return getAlterStatusFromSchemaChangeTracker(tableName); + } + return this.assignmentManager.getReopenStatus(tableName); + } + + /** + * Used by the client to identify if all regions have the schema updates + * + * @param tableName + * @return Pair indicating the status of the alter command + * @throws IOException + */ + private Pair getAlterStatusFromSchemaChangeTracker(byte[] tableName) + throws IOException { + MasterSchemaChangeTracker.MasterAlterStatus alterStatus = null; try { - return this.assignmentManager.getReopenStatus(tableName); - } catch (InterruptedException e) { - throw new IOException("Interrupted", e); + alterStatus = + this.schemaChangeTracker.getMasterAlterStatus(Bytes.toString(tableName)); + } catch (KeeperException ke) { + LOG.error("KeeperException while getting schema alter status for table = " + + Bytes.toString(tableName), ke); + } + if (alterStatus != null) { + LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = " + + Bytes.toString(tableName) + " Alter Status = " + + alterStatus.toString()); + int numberPending = alterStatus.getNumberOfRegionsToProcess() - + alterStatus.getNumberOfRegionsProcessed(); + return new Pair(alterStatus.getNumberOfRegionsProcessed(), + alterStatus.getNumberOfRegionsToProcess()); + } else { + LOG.debug("MasterAlterStatus is NULL for table = " + + Bytes.toString(tableName)); + // should we throw IOException here as it makes more sense? + return new Pair(0,0); } } + public void addColumn(byte [] tableName, HColumnDescriptor column) throws IOException { if (cpHost != null) { @@ -1004,7 +1206,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return; } } - new TableAddFamilyHandler(tableName, column, this, this).process(); + new TableAddFamilyHandler(tableName, column, this, this, + this, supportInstantSchemaChanges).process(); if (cpHost != null) { cpHost.postAddColumn(tableName, column); } @@ -1017,7 +1220,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return; } } - new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); + new TableModifyFamilyHandler(tableName, descriptor, this, this, + this, supportInstantSchemaChanges).process(); if (cpHost != null) { cpHost.postModifyColumn(tableName, descriptor); } @@ -1030,7 +1234,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { return; } } - new TableDeleteFamilyHandler(tableName, c, this, this).process(); + new TableDeleteFamilyHandler(tableName, c, this, this, + this, supportInstantSchemaChanges).process(); if (cpHost != null) { cpHost.postDeleteColumn(tableName, c); } @@ -1041,7 +1246,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { cpHost.preEnableTable(tableName); } this.executorService.submit(new EnableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, false)); if (cpHost != null) { cpHost.postEnableTable(tableName); @@ -1097,21 +1302,31 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { @Override public void modifyTable(final byte[] tableName, HTableDescriptor htd) - throws IOException { + throws IOException { if (cpHost != null) { cpHost.preModifyTable(tableName, htd); } - this.executorService.submit(new ModifyTableHandler(tableName, htd, this, - this)); - + this, this, supportInstantSchemaChanges)); if (cpHost != null) { cpHost.postModifyTable(tableName, htd); } } @Override - public void checkTableModifiable(final byte [] tableName) + public void checkTableModifiable(final byte [] tableName, + EventHandler.EventType eventType) + throws IOException { + preCheckTableModifiable(tableName); + if (!eventType.isSchemaChangeEvent()) { + if (!getAssignmentManager().getZKTable(). + isDisabledTable(Bytes.toString(tableName))) { + throw new TableNotDisabledException(tableName); + } + } + } + + private void preCheckTableModifiable(final byte[] tableName) throws IOException { String tableNameStr = Bytes.toString(tableName); if (isCatalogTable(tableName)) { @@ -1120,10 +1335,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) { throw new TableNotFoundException(tableNameStr); } - if (!getAssignmentManager().getZKTable(). - isDisabledTable(Bytes.toString(tableName))) { - throw new TableNotDisabledException(tableName); - } } public void clearFromTransition(HRegionInfo hri) { diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index c952311..fb61810 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -479,7 +479,7 @@ public class MasterFileSystem { */ public HTableDescriptor addColumn(byte[] tableName, HColumnDescriptor hcd) throws IOException { - LOG.info("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " + + LOG.debug("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " + hcd.toString()); HTableDescriptor htd = this.services.getTableDescriptors().get(tableName); if (htd == null) { diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 3d5124c..c4b42ae 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -26,7 +26,10 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; +import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; /** * Services Master supplies @@ -53,12 +56,15 @@ public interface MasterServices extends Server { public ExecutorService getExecutorService(); /** - * Check table is modifiable; i.e. exists and is offline. - * @param tableName Name of table to check. - * @throws TableNotDisabledException - * @throws TableNotFoundException + * Check table modifiable. i.e not ROOT or META and offlined for all commands except + * alter commands + * @param tableName + * @param eventType + * @throws IOException */ - public void checkTableModifiable(final byte [] tableName) throws IOException; + public void checkTableModifiable(final byte [] tableName, + EventHandler.EventType eventType) + throws IOException; /** * Create a table using the given table definition. @@ -73,4 +79,17 @@ public interface MasterServices extends Server { * @return Return table descriptors implementation. */ public TableDescriptors getTableDescriptors(); + + /** + * Get Master Schema change tracker + * @return + */ + public MasterSchemaChangeTracker getSchemaChangeTracker(); + + /** + * Return the Region server tracker. + * @return RegionServerTracker + */ + public RegionServerTracker getRegionServerTracker(); + } diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index b4fad6c..c38327c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -320,11 +320,21 @@ public class ServerManager { } } + /** + * Exclude a RS from any pending schema change process. + * @param serverName + */ + private void excludeRegionServerFromSchemaChanges(final ServerName serverName) { + this.services.getSchemaChangeTracker() + .excludeRegionServerForSchemaChanges(serverName.getServerName()); + } + /* * Expire the passed server. Add it to list of deadservers and queue a * shutdown processing. */ public synchronized void expireServer(final ServerName serverName) { + excludeRegionServerFromSchemaChanges(serverName); if (!this.onlineServers.containsKey(serverName)) { LOG.warn("Received expiration of " + serverName + " but server is not currently online"); diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java index caa1fe0..15abf86 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -37,9 +38,11 @@ public class DeleteTableHandler extends TableEventHandler { private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class); public DeleteTableHandler(byte [] tableName, Server server, - final MasterServices masterServices) + final MasterServices masterServices, HMasterInterface masterInterface, + boolean instantChange) throws IOException { - super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices); + super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices, + masterInterface, instantChange); } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java index 3d72463..b27f66f 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -33,11 +34,19 @@ public class ModifyTableHandler extends TableEventHandler { public ModifyTableHandler(final byte [] tableName, final HTableDescriptor htd, final Server server, - final MasterServices masterServices) throws IOException { - super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices); + final MasterServices masterServices, + final HMasterInterface masterInterface, + boolean instantModify) throws IOException { + super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices, + masterInterface, instantModify); this.htd = htd; + validateTable(tableName, htd); + } + + private void validateTable(final byte[] tableName, HTableDescriptor htd) + throws IOException { if (!Bytes.equals(tableName, htd.getName())) { - throw new IOException("TableDescriptor name & tableName must match: " + throw new IOException("TableDescriptor name & tableName must match: " + htd.getNameAsString() + " vs " + Bytes.toString(tableName)); } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java index 18cff0b..84129ce 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -38,8 +39,10 @@ public class TableAddFamilyHandler extends TableEventHandler { private final HColumnDescriptor familyDesc; public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, - Server server, final MasterServices masterServices) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + Server server, final MasterServices masterServices, + HMasterInterface masterInterface, boolean instantChange) throws IOException { + super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices, + masterInterface, instantChange); this.familyDesc = familyDesc; } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java index 2f65ff9..0b2b436 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -38,8 +39,11 @@ public class TableDeleteFamilyHandler extends TableEventHandler { private final byte [] familyName; public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, - Server server, final MasterServices masterServices) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + Server server, final MasterServices masterServices, + HMasterInterface masterInterface, + boolean instantChange) throws IOException { + super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices, + masterInterface, instantChange); this.familyName = familyName; } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java index 59fc758..e271f91 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java @@ -28,17 +28,21 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -53,26 +57,22 @@ import com.google.common.collect.Maps; public abstract class TableEventHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(TableEventHandler.class); protected final MasterServices masterServices; + protected HMasterInterface master = null; protected final byte [] tableName; protected final String tableNameStr; + protected boolean instantAction = false; public TableEventHandler(EventType eventType, byte [] tableName, Server server, - MasterServices masterServices) + MasterServices masterServices, HMasterInterface masterInterface, + boolean instantSchemaChange) throws IOException { super(server, eventType); this.masterServices = masterServices; this.tableName = tableName; - try { - this.masterServices.checkTableModifiable(tableName); - } catch (TableNotDisabledException ex) { - if (eventType.isOnlineSchemaChangeSupported()) { - LOG.debug("Ignoring table not disabled exception " + - "for supporting online schema changes."); - } else { - throw ex; - } - } + this.masterServices.checkTableModifiable(tableName, eventType); this.tableNameStr = Bytes.toString(this.tableName); + this.instantAction = instantSchemaChange; + this.master = masterInterface; } @Override @@ -84,16 +84,7 @@ public abstract class TableEventHandler extends EventHandler { MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); handleTableOperation(hris); - if (eventType.isOnlineSchemaChangeSupported() && this.masterServices. - getAssignmentManager().getZKTable(). - isEnabledTable(Bytes.toString(tableName))) { - if (reOpenAllRegions(hris)) { - LOG.info("Completed table operation " + eventType + " on table " + - Bytes.toString(tableName)); - } else { - LOG.warn("Error on reopening the regions"); - } - } + handleSchemaChanges(hris); } catch (IOException e) { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); } catch (KeeperException e) { @@ -101,13 +92,47 @@ public abstract class TableEventHandler extends EventHandler { } } + private void handleSchemaChanges(List regions) + throws IOException { + if (instantAction && regions != null && !regions.isEmpty()) { + handleInstantSchemaChanges(regions.size()); + } else { + handleRegularSchemaChanges(regions); + } + } + + + /** + * Perform schema changes only if the table is in enabled state. + * @return + */ + private boolean canPerformSchemaChange() { + return (eventType.isSchemaChangeEvent() && this.masterServices. + getAssignmentManager().getZKTable(). + isEnabledTable(Bytes.toString(tableName))); + } + + private void handleRegularSchemaChanges(List regions) + throws IOException { + if (canPerformSchemaChange()) { + this.masterServices.getAssignmentManager().setRegionsToReopen(regions); + if (reOpenAllRegions(regions)) { + LOG.info("Completed table operation " + eventType + " on table " + + Bytes.toString(tableName)); + } else { + LOG.warn("Error on reopening the regions"); + } + } + } + public boolean reOpenAllRegions(List regions) throws IOException { boolean done = false; LOG.info("Bucketing regions by region server..."); HTable table = new HTable(masterServices.getConfiguration(), tableName); TreeMap> serverToRegions = Maps .newTreeMap(); - NavigableMap hriHserverMapping = table.getRegionLocations(); + NavigableMap hriHserverMapping + = table.getRegionLocations(); List reRegions = new ArrayList(); for (HRegionInfo hri : regions) { ServerName rsLocation = hriHserverMapping.get(hri); @@ -148,6 +173,64 @@ public abstract class TableEventHandler extends EventHandler { } return done; } + + /** + * If load balancer is currently running, then we wait for it to complete before + * processing the schema change. This is required as other wise we will be working + * with wrong data set. Master is already performing this check and we don't need this + * here again?. We do need it just in case if the Load balancer was started in the mean time. + * @param status + */ + private void waitForLoadBalancerToComplete(MonitoredTask status) { + if (master.isLoadBalancerRunning()) { + status.setStatus("Load balancer is currently running. Waiting for load " + + "balancer to complete before processing the alter table request for table = " + + tableNameStr); + while (master.isLoadBalancerRunning()) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + protected void handleInstantSchemaChanges(int numberOfRegions) { + MonitoredTask status = TaskMonitor.get().createStatus( + "Handling alter table request for table = " + tableNameStr); + if (canPerformSchemaChange()) { + try { + // Wait for currently running load balancer to finish. + waitForLoadBalancerToComplete(status); + MasterSchemaChangeTracker masterSchemaChangeTracker = + this.masterServices.getSchemaChangeTracker(); + masterSchemaChangeTracker + .createSchemaChangeNode(Bytes.toString(tableName), + numberOfRegions); + while(!masterSchemaChangeTracker.doesSchemaChangeNodeExists( + Bytes.toString(tableName))) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + status.setStatus("Created ZK node for handling the alter table request for table = " + + tableNameStr); + } catch (KeeperException e) { + LOG.warn("Instant schema change failed for table " + tableNameStr, e); + status.setStatus("Instant schema change failed for table " + tableNameStr + + " Cause = " + e.getCause()); + + } catch (IOException ioe) { + LOG.warn("Instant schema change failed for table " + tableNameStr, ioe); + status.setStatus("Instant schema change failed for table " + tableNameStr + + " Cause = " + ioe.getCause()); + } + } + } + protected abstract void handleTableOperation(List regions) throws IOException, KeeperException; } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java index 708ee73..f2e178b 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -40,8 +41,11 @@ public class TableModifyFamilyHandler extends TableEventHandler { public TableModifyFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, Server server, - final MasterServices masterServices) throws IOException { - super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices); + final MasterServices masterServices, + HMasterInterface masterInterface, + boolean instantChange) throws IOException { + super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices, + masterInterface, instantChange); this.familyDesc = familyDesc; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 39c7940..715aaac 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -144,6 +144,7 @@ import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; @@ -280,6 +281,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Cluster Status Tracker private ClusterStatusTracker clusterStatusTracker; + // Schema change Tracker + private SchemaChangeTracker schemaChangeTracker; + // Log Splitting Worker private SplitLogWorker splitLogWorker; @@ -571,6 +575,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); + + // Schema change tracker + this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper, + this, this); + this.schemaChangeTracker.start(); } /** @@ -3352,6 +3361,57 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return wal.rollWriter(true); } + /** + * Refresh schema changes for given regions. + * @param hRegion HRegion to refresh + * @throws IOException + */ + public void refreshRegion(HRegion hRegion) throws IOException { + + if (hRegion != null) { + HRegionInfo regionInfo = hRegion.getRegionInfo(); + // Close the region + hRegion.close(); + // Remove from online regions + removeFromOnlineRegions(regionInfo.getEncodedName()); + // Get new HTD + HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName()); + LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString() + + " Is = " + htd ); + HRegion region = + HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf); + // Add new region to the onlineRegions + addToOnlineRegions(region); + } + } + + /** + * Gets the online regions of the specified table. + * This method looks at the in-memory onlineRegions. It does not go to .META.. + * Only returns online regions. If a region on this table has been + * closed during a disable, etc., it will not be included in the returned list. + * So, the returned list may not necessarily be ALL regions in this table, its + * all the ONLINE regions in the table. + * @param tableName + * @return Online regions from tableName + */ + public List getOnlineRegions(byte[] tableName) { + List tableRegions = new ArrayList(); + synchronized (this.onlineRegions) { + for (HRegion region: this.onlineRegions.values()) { + HRegionInfo regionInfo = region.getRegionInfo(); + if(Bytes.equals(regionInfo.getTableName(), tableName)) { + tableRegions.add(region); + } + } + } + return tableRegions; + } + + public SchemaChangeTracker getSchemaChangeTracker() { + return this.schemaChangeTracker; + } + // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). public String[] getCoprocessors() { HServerLoad hsl = buildServerLoad(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java index c4c6612..03ab47e 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java @@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.Server; +import java.io.IOException; +import java.util.List; + /** * Interface to Map of online regions. In the Map, the key is the region's * encoded name and the value is an {@link HRegion} instance. @@ -49,4 +52,18 @@ interface OnlineRegions extends Server { * null if named region is not member of the online regions. */ public HRegion getFromOnlineRegions(String encodedRegionName); + /** + * Get all online regions of a table in this RS. + * @param tableName + * @return List of HRegion + * @throws java.io.IOException + */ + public List getOnlineRegions(byte[] tableName) throws IOException; + + /** + * Refresh a given region updating it with latest HTD info. + * @param hRegion + */ + public void refreshRegion(HRegion hRegion) throws IOException; + } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java new file mode 100644 index 0000000..19ac9fe --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java @@ -0,0 +1,830 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.zookeeper.KeeperException; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.StringTokenizer; + +import org.apache.hadoop.io.Writable; + +public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker { + public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class); + private final MasterServices masterServices; + // Used by tests only. Do not change this. + private volatile int sleepTimeMillis = 0; + // schema changes pending more than this time will be timed out. + private long schemaChangeTimeoutMillis = 30000; + + /** + * Constructs a new ZK node tracker. + *

+ *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param abortable + */ + public MasterSchemaChangeTracker(ZooKeeperWatcher watcher, + Abortable abortable, MasterServices masterServices, + long schemaChangeTimeoutMillis) { + super(watcher, watcher.schemaZNode, abortable); + this.masterServices = masterServices; + this.schemaChangeTimeoutMillis = schemaChangeTimeoutMillis; + } + + @Override + public void start() { + try { + watcher.registerListener(this); + List tables = + ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode); + processCompletedSchemaChanges(tables); + } catch (KeeperException e) { + LOG.error("MasterSchemaChangeTracker startup failed.", e); + abortable.abort("MasterSchemaChangeTracker startup failed", e); + } + } + + private List getCurrentTables() throws KeeperException { + return + ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode); + } + + /** + * When a primary master crashes and the secondary master takes over + * mid-flight during an alter process, the secondary should cleanup any completed + * schema changes not handled by the previous master. + * @param tables + * @throws KeeperException + */ + private void processCompletedSchemaChanges(List tables) + throws KeeperException { + if (tables == null || tables.isEmpty()) { + String msg = "No current schema change in progress. Skipping cleanup"; + LOG.debug(msg); + } + String msg = "Master seeing following tables undergoing schema change " + + "process. Tables = " + tables; + MonitoredTask status = TaskMonitor.get().createStatus(msg); + LOG.debug(msg); + for (String table : tables) { + LOG.debug("Processing table = "+ table); + status.setStatus("Processing table = "+ table); + try { + processTableNode(table); + } catch (IOException e) { + String errmsg = "IOException while processing completed schema changes." + + " Cause = " + e.getCause(); + LOG.error(errmsg, e); + status.setStatus(errmsg); + } + } + } + + /** + * Get current alter status for a table. + * @param tableName + * @return MasterAlterStatus + * @throws KeeperException + * @throws IOException + */ + public MasterAlterStatus getMasterAlterStatus(String tableName) + throws KeeperException, IOException { + String path = getSchemaChangeNodePathForTable(tableName); + byte[] state = ZKUtil.getData(watcher, path); + if (state == null || state.length <= 0) { + return null; + } + MasterAlterStatus mas = new MasterAlterStatus(); + Writables.getWritable(state, mas); + return mas; + } + + /** + * Get RS specific alter status for a table & server + * @param tableName + * @param serverName + * @return Region Server's Schema alter status + * @throws KeeperException + * @throws IOException + */ + private SchemaChangeTracker.SchemaAlterStatus getRSSchemaAlterStatus( + String tableName, String serverName) + throws KeeperException, IOException { + String childPath = + getSchemaChangeNodePathForTableAndServer(tableName, serverName); + byte[] childData = ZKUtil.getData(this.watcher, childPath); + if (childData == null || childData.length <= 0) { + return null; + } + SchemaChangeTracker.SchemaAlterStatus sas = + new SchemaChangeTracker.SchemaAlterStatus(); + Writables.getWritable(childData, sas); + LOG.debug("Schema Status data for server = " + serverName + " table = " + + tableName + " == " + sas); + return sas; + } + + /** + * Update the master's alter status based on all region server's response. + * @param servers + * @param tableName + * @throws IOException + */ + private void updateMasterAlterStatus(MasterAlterStatus mas, + List servers, String tableName) + throws IOException, KeeperException { + for (String serverName : servers) { + SchemaChangeTracker.SchemaAlterStatus sas = + getRSSchemaAlterStatus(tableName, serverName); + if (sas != null) { + mas.update(sas); + LOG.debug("processTableNodeWithState:Updated Master Alter Status = " + + mas + " for server = " + serverName); + } else { + LOG.debug("SchemaAlterStatus is NULL for table = " + tableName); + } + } + } + + /** + * If schema alter is handled for this table, then delete all the ZK nodes created + * for this table. + * @param tableName + * @throws KeeperException + */ + private void processTableNode(String tableName) throws KeeperException, + IOException { + MonitoredTask status = TaskMonitor.get().createStatus( + "Checking alter schema request status for table = " + tableName); + + LOG.debug("processTableNodeWithState. TableName = " + tableName); + List servers = + ZKUtil.listChildrenAndWatchThem(watcher, + getSchemaChangeNodePathForTable(tableName)); + MasterAlterStatus mas = getMasterAlterStatus(tableName); + if (mas == null) { + LOG.debug("MasterAlterStatus is NULL. Table = " + tableName); + return; + } + updateMasterAlterStatus(mas, servers, tableName); + status.setStatus("Current alter status for table = " + + tableName + " ==> " + mas.toString()); + LOG.debug("Current Alter status = " + mas); + String nodePath = getSchemaChangeNodePathForTable(tableName); + ZKUtil.updateExistingNodeData(this.watcher, nodePath, + Writables.getBytes(mas), getZKNodeVersion(nodePath)); + processAlterStatus(mas, tableName, servers, status); + } + + /** + * Evaluate the master alter status and determine the current status. + * @param alterStatus + * @param tableName + * @param servers + * @param status + */ + private void processAlterStatus(MasterAlterStatus alterStatus, + String tableName, List servers, + MonitoredTask status) throws KeeperException { + if (alterStatus.getNumberOfRegionsToProcess() + == alterStatus.getNumberOfRegionsProcessed()) { + // schema change completed. + String msg = "All region servers have successfully processed the " + + "schema changes for table = " + tableName + + " . Deleting the schema change node for table = " + + tableName + " Region servers processed the schema change" + + " request = " + alterStatus.getProcessedHosts() + + " Total number of regions = " + alterStatus.getNumberOfRegionsToProcess() + + " Processed regions = " + alterStatus.getNumberOfRegionsProcessed(); + status.setStatus(msg); + LOG.debug(msg); + cleanProcessedTableNode(getSchemaChangeNodePathForTable(tableName)); + } else { + if (alterStatus.getErrorCause() != null + && alterStatus.getErrorCause().trim().length() > 0) { + String msg = "Alter schema change failed " + + "for table = " + tableName + " Number of online regions = " + + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = " + + alterStatus.getNumberOfRegionsProcessed() + + " Original list = " + alterStatus.hostsToProcess + " Processed servers = " + + servers + + " Error Cause = " + alterStatus.getErrorCause(); + // we have errors. + LOG.debug(msg); + status.setStatus(msg); + } else { + String msg = "Not all region servers have processed the schema changes" + + "for table = " + tableName + " Number of online regions = " + + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = " + + alterStatus.getNumberOfRegionsProcessed() + + " Original list = " + alterStatus.hostsToProcess + " Processed servers = " + + servers + " Alter STate = " + + alterStatus.getCurrentAlterStatus(); + LOG.debug(msg); + status.setStatus(msg); + } + } + } + + /** + * Check whether a in-flight schema change request has expired. + * @param tableName + * @return true is the schema change request expired. + * @throws IOException + */ + private boolean hasSchemaChangeExpiredFor(String tableName) + throws IOException, KeeperException { + MasterAlterStatus mas = getMasterAlterStatus(tableName); + long createdTimeStamp = mas.getStamp(); + long duration = System.currentTimeMillis() - createdTimeStamp; + LOG.debug("Created TimeStamp = " + createdTimeStamp + + " duration = " + duration + " Table = " + tableName + + " Master Alter Status = " + mas); + return (duration > schemaChangeTimeoutMillis); + } + + /** + * Handle failed and expired schema changes. We simply delete all the + * expired/failed schema change attempts. Why we should do this ? + * 1) Keeping the failed/expired schema change nodes longer prohibits any + * future schema changes for the table. + * 2) Any lingering expired/failed schema change requests will prohibit the + * load balancer from running. + */ + public void handleFailedExpiredSchemaChanges() { + try { + List tables = getCurrentTables(); + for (String table : tables) { + String statmsg = "Cleaning failed or expired schema change requests. " + + "current tables undergoing " + + "schema change process = " + tables; + MonitoredTask status = TaskMonitor.get().createStatus(statmsg); + LOG.debug(statmsg); + if (hasSchemaChangeExpiredFor(table)) { + // time out.. currently, we abandon the in-flight schema change due to + // time out. + // Here, there are couple of options to consider. One could be to + // attempt a retry of the schema change and see if it succeeds, and + // another could be to simply rollback the schema change effort and + // see if it succeeds. + String msg = "Schema change for table = " + table + " has expired." + + " Schema change for this table has been in progress for " + + + schemaChangeTimeoutMillis + + "Deleting the node now."; + LOG.debug(msg); + ZKUtil.deleteNodeRecursively(this.watcher, + getSchemaChangeNodePathForTable(table)); + } else { + String msg = "Schema change request is in progress for " + + " table = " + table; + LOG.debug(msg); + status.setStatus(msg); + } + } + } catch (IOException e) { + String msg = "IOException during handleFailedExpiredSchemaChanges." + + e.getCause(); + LOG.error(msg, e); + TaskMonitor.get().createStatus(msg); + } catch (KeeperException ke) { + String msg = "KeeperException during handleFailedExpiredSchemaChanges." + + ke.getCause(); + LOG.error(msg, ke); + TaskMonitor.get().createStatus(msg); + } + } + + /** + * Clean the nodes of completed schema change table. + * @param path + * @throws KeeperException + */ + private void cleanProcessedTableNode(String path) throws KeeperException { + if (sleepTimeMillis > 0) { + try { + LOG.debug("Master schema change tracker sleeping for " + + sleepTimeMillis); + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + ZKUtil.deleteNodeRecursively(this.watcher, path); + LOG.debug("Deleted all nodes for path " + path); + + } + + /** + * Exclude a RS from schema change request (if applicable) + * We will exclude a RS from schema change request processing if 1) RS + * has online regions for the table AND 2) RS went down mid-flight + * during schema change process. We don't have to deal with RS going + * down mid-flight during a schema change as the online regions from + * the dead RS will get reassigned to some other RS and the + * process of reassign inherently takes care of the schema change as well. + * @param serverName + */ + public void excludeRegionServerForSchemaChanges(String serverName) { + try { + MonitoredTask status = TaskMonitor.get().createStatus( + "Processing schema change exclusion for region server = " + serverName); + List tables = + ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode); + if (tables == null || tables.isEmpty()) { + String msg = "No schema change in progress. Skipping exclusion for " + + "server = "+ serverName; + LOG.debug(msg); + status.setStatus(msg); + return ; + } + for (String tableName : tables) { + excludeRegionServer(tableName, serverName, status); + } + } catch(KeeperException ke) { + LOG.error("KeeperException during excludeRegionServerForSchemaChanges", ke); + } catch(IOException ioe) { + LOG.error("IOException during excludeRegionServerForSchemaChanges", ioe); + + } + } + + /** + * Check whether a schema change is in progress for a given table on a + * given RS. + * @param tableName + * @param serverName + * @return TRUE is this RS is currently processing a schema change request + * for the table. + * @throws KeeperException + */ + private boolean isSchemaChangeApplicableFor(String tableName, + String serverName) + throws KeeperException { + List servers = ZKUtil.listChildrenAndWatchThem(watcher, + getSchemaChangeNodePathForTable(tableName)); + return (servers.contains(serverName)); + } + + /** + * Exclude a region server for a table (if applicable) from schema change processing. + * @param tableName + * @param serverName + * @param status + * @throws KeeperException + * @throws IOException + */ + private void excludeRegionServer(String tableName, String serverName, + MonitoredTask status) + throws KeeperException, IOException { + if (isSchemaChangeApplicableFor(tableName, serverName)) { + String msg = "Excluding RS " + serverName + " from schema change process" + + " for table = " + tableName; + LOG.debug(msg); + status.setStatus(msg); + SchemaChangeTracker.SchemaAlterStatus sas = + getRSSchemaAlterStatus(tableName, serverName); + if (sas == null) { + LOG.debug("SchemaAlterStatus is NULL for table = " + tableName + + " server = " + serverName); + } + // Set the status to IGNORED so we can process it accordignly. + sas.setCurrentAlterStatus( + SchemaChangeTracker.SchemaAlterStatus.AlterState.IGNORED); + LOG.debug("Updating the current schema status to " + sas); + String nodePath = getSchemaChangeNodePathForTableAndServer(tableName, + serverName); + ZKUtil.updateExistingNodeData(this.watcher, + nodePath, Writables.getBytes(sas), getZKNodeVersion(nodePath)); + } else { + LOG.debug("Skipping exclusion of RS " + serverName + + " from schema change process" + + " for table = " + tableName + + " as it did not possess any online regions for the table"); + } + processTableNode(tableName); + } + + private int getZKNodeVersion(String nodePath) throws KeeperException { + return ZKUtil.checkExists(this.watcher, nodePath); + } + + /** + * Create a new schema change ZK node. + * @param tableName Table name that is getting altered + * @throws KeeperException + */ + public void createSchemaChangeNode(String tableName, + int numberOfRegions) + throws KeeperException, IOException { + MonitoredTask status = TaskMonitor.get().createStatus( + "Creating schema change node for table = " + tableName); + LOG.debug("Creating schema change node for table = " + + tableName + " Path = " + + getSchemaChangeNodePathForTable(tableName)); + if (doesSchemaChangeNodeExists(tableName)) { + LOG.debug("Schema change node already exists for table = " + tableName + + " Deleting the schema change node."); + // If we already see a schema change node for this table we wait till the previous + // alter process is complete. Ideally, we need not wait and we could simply delete + // existing schema change node for this table and create new one. But then the + // RS cloud will not be able to process concurrent schema updates for the same table + // as they will be working with same set of online regions for this table. Meaning the + // second alter change will not see any online regions (as they were being closed and + // re opened by the first change) and will miss the second one. + // We either handle this at the RS level using explicit locks while processing a table + // or do it here. I prefer doing it here as it seems much simpler and cleaner. + while(doesSchemaChangeNodeExists(tableName)) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode); + // if number of online RS = 0, we should not do anything! + if (rsCount <= 0) { + String msg = "Master is not seeing any online region servers. Aborting the " + + "schema change processing by region servers."; + LOG.debug(msg); + status.setStatus(msg); + } else { + LOG.debug("Master is seeing " + rsCount + " region servers online before " + + "the schema change process."); + MasterAlterStatus mas = new MasterAlterStatus(numberOfRegions, + getActiveRegionServersAsString()); + LOG.debug("Master creating the master alter status = " + mas); + ZKUtil.createSetData(this.watcher, + getSchemaChangeNodePathForTable(tableName), Writables.getBytes(mas)); + status.setStatus("Created the ZK node for schema change. Current Alter Status = " + + mas.toString()); + ZKUtil.listChildrenAndWatchThem(this.watcher, + getSchemaChangeNodePathForTable(tableName)); + } + } + + private String getActiveRegionServersAsString() { + StringBuffer sbuf = new StringBuffer(); + List currentRS = + masterServices.getRegionServerTracker().getOnlineServers(); + for (ServerName serverName : currentRS) { + sbuf.append(serverName.getServerName()); + sbuf.append(" "); + } + LOG.debug("Current list of RS to process the schema change = " + + sbuf.toString()); + return sbuf.toString(); + } + + /** + * Create a new schema change ZK node. + * @param tableName + * @throws KeeperException + */ + public boolean doesSchemaChangeNodeExists(String tableName) + throws KeeperException { + return ZKUtil.checkExists(watcher, + getSchemaChangeNodePathForTable(tableName)) != -1; + } + + /** + * Check whether there are any schema change requests that are in progress now. + * We simply assume that a schema change is in progress if we see a ZK schema node for + * any table. We may revisit for fine grained checks such as check the current alter status + * et al, but it is not required now. + * @return + */ + public boolean isSchemaChangeInProgress() { + try { + int schemaChangeCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.schemaZNode); + return schemaChangeCount > 0; + } catch (KeeperException ke) { + LOG.debug("KeeperException while getting current schema change progress."); + // What do we do now??? currently reporting as false. + } + return false; + } + + /** + * We get notified when a RS processes/or completed the schema change request. + * The path will be of the format /hbase/schema/ + * @param path full path of the node whose children have changed + */ + @Override + public void nodeChildrenChanged(String path) { + String tableName = null; + if (path.startsWith(watcher.schemaZNode) && + !path.equals(watcher.schemaZNode)) { + try { + LOG.debug("NodeChildrenChanged Path = " + path); + tableName = path.substring(path.lastIndexOf("/")+1, path.length()); + processTableNode(tableName); + } catch (KeeperException e) { + TaskMonitor.get().createStatus( + "MasterSchemaChangeTracker: ZK exception while processing " + + " nodeChildrenChanged() event for table = " + tableName + + " Cause = " + e.getCause()); + LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" + + " schema change nodes", e); + } catch(IOException ioe) { + TaskMonitor.get().createStatus( + "MasterSchemaChangeTracker: ZK exception while processing " + + " nodeChildrenChanged() event for table = " + tableName + + " Cause = " + ioe.getCause()); + LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting" + + " schema change nodes", ioe); + } + } + } + + /** + * We get notified as and when the RS cloud updates their ZK nodes with + * progress information. The path will be of the format + * /hbase/schema/
/ + * @param path + */ + @Override + public void nodeDataChanged(String path) { + String tableName = null; + if (path.startsWith(watcher.schemaZNode) && + !path.equals(watcher.schemaZNode)) { + try { + LOG.debug("NodeDataChanged Path = " + path); + String[] paths = path.split("/"); + tableName = paths[3]; + processTableNode(tableName); + } catch (KeeperException e) { + TaskMonitor.get().createStatus( + "MasterSchemaChangeTracker: ZK exception while processing " + + " nodeDataChanged() event for table = " + tableName + + " Cause = " + e.getCause()); + LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" + + " schema change nodes", e); + } catch(IOException ioe) { + TaskMonitor.get().createStatus( + "MasterSchemaChangeTracker: IO exception while processing " + + " nodeDataChanged() event for table = " + tableName + + " Cause = " + ioe.getCause()); + LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting" + + " schema change nodes", ioe); + + } + } + } + + public String getSchemaChangeNodePathForTable(String tableName) { + return ZKUtil.joinZNode(watcher.schemaZNode, tableName); + } + + /** + * Used only for tests. Do not use this. See TestInstantSchemaChange for more details + * on how this is getting used. This is primarily used to delay the schema complete + * processing by master so that we can test some complex scenarios such as + * master failover. + * @param sleepTimeMillis + */ + public void setSleepTimeMillis(int sleepTimeMillis) { + this.sleepTimeMillis = sleepTimeMillis; + } + + private String getSchemaChangeNodePathForTableAndServer( + String tableName, String regionServerName) { + return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName), + regionServerName); + } + + + /** + * Holds the current alter state for a table. Alter state includes the + * current alter status (INPROCESS, FAILURE or SUCCESS (success is not getting + * used now.), timestamp of alter request, number of hosts online at the time + * of alter request, number of online regions to process for the schema change + * request, number of processed regions and a list of region servers that + * actually processed the schema change request. + * + * Master keeps track of schema change requests using the alter status and + * periodically updates the alter status based on RS cloud processings. + */ + public static class MasterAlterStatus implements Writable { + + public enum AlterState { + INPROCESS, // Inprocess alter + SUCCESS, // completed alter + FAILURE // failure alter + } + + private AlterState currentAlterStatus; + // TimeStamp + private long stamp; + private int numberOfRegionsToProcess; + private StringBuffer errorCause = new StringBuffer(" "); + private StringBuffer processedHosts = new StringBuffer(" "); + private String hostsToProcess; + private int numberOfRegionsProcessed = 0; + + public MasterAlterStatus() { + + } + + public MasterAlterStatus(int numberOfRegions, String activeHosts) { + this.numberOfRegionsToProcess = numberOfRegions; + this.stamp = System.currentTimeMillis(); + this.currentAlterStatus = AlterState.INPROCESS; + //this.rsToProcess = activeHosts; + this.hostsToProcess = activeHosts; + } + + public AlterState getCurrentAlterStatus() { + return currentAlterStatus; + } + + public void setCurrentAlterStatus(AlterState currentAlterStatus) { + this.currentAlterStatus = currentAlterStatus; + } + + public long getStamp() { + return stamp; + } + + public void setStamp(long stamp) { + this.stamp = stamp; + } + + public int getNumberOfRegionsToProcess() { + return numberOfRegionsToProcess; + } + + public void setNumberOfRegionsToProcess(int numberOfRegionsToProcess) { + this.numberOfRegionsToProcess = numberOfRegionsToProcess; + } + + public int getNumberOfRegionsProcessed() { + return numberOfRegionsProcessed; + } + + public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) { + this.numberOfRegionsProcessed += numberOfRegionsProcessed; + } + + public String getHostsToProcess() { + return hostsToProcess; + } + + public void setHostsToProcess(String hostsToProcess) { + this.hostsToProcess = hostsToProcess; + } + + public String getErrorCause() { + return errorCause == null ? null : errorCause.toString(); + } + + public void setErrorCause(String errorCause) { + if (errorCause == null || errorCause.trim().length() <= 0) { + return; + } + if (this.errorCause == null) { + this.errorCause = new StringBuffer(errorCause); + } else { + this.errorCause.append(errorCause); + } + } + + public String getProcessedHosts() { + return processedHosts.toString(); + } + + public void setProcessedHosts(String processedHosts) { + if (this.processedHosts == null) { + this.processedHosts = new StringBuffer(processedHosts); + } else { + this.processedHosts.append(" ").append(processedHosts); + } + } + + /** + * Ignore or exempt a RS from schema change processing. + * Master will tweak the number of regions to process based on the + * number of online regions on the target RS and also removes the + * RS from list of hosts to process. + * @param schemaAlterStatus + */ + private void ignoreRSForSchemaChange( + SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) { + LOG.debug("Removing RS " + schemaAlterStatus.getHostName() + + " from schema change process."); + hostsToProcess = + new String(hostsToProcess).replaceAll(schemaAlterStatus.getHostName(), ""); + int ignoreRegionsCount = schemaAlterStatus.getNumberOfOnlineRegions(); + LOG.debug("Current number of regions processed = " + + this.numberOfRegionsProcessed + " deducting ignored = " + + ignoreRegionsCount + + " final = " + (this.numberOfRegionsToProcess-ignoreRegionsCount)); + if (this.numberOfRegionsToProcess > 0) { + this.numberOfRegionsToProcess -= ignoreRegionsCount; + } else { + LOG.debug("Number of regions to process is less than zero. This is odd"); + } + } + + /** + * Update the master alter status for this table based on RS alter status. + * @param schemaAlterStatus + */ + public void update(SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) { + this.setProcessedHosts(schemaAlterStatus.getHostName()); + SchemaChangeTracker.SchemaAlterStatus.AlterState rsState = + schemaAlterStatus.getCurrentAlterStatus(); + switch(rsState) { + case FAILURE: + LOG.debug("Schema update failure Status = " + + schemaAlterStatus); + this.setCurrentAlterStatus( + MasterSchemaChangeTracker.MasterAlterStatus.AlterState.FAILURE); + this.setNumberOfRegionsProcessed( + schemaAlterStatus.getNumberOfRegionsProcessed()); + this.setErrorCause(schemaAlterStatus.getErrorCause()); + break; + case SUCCESS: + LOG.debug("Schema update SUCCESS Status = " + + schemaAlterStatus); + this.setNumberOfRegionsProcessed(schemaAlterStatus.getNumberOfRegionsProcessed()); + this.setCurrentAlterStatus(MasterSchemaChangeTracker.MasterAlterStatus.AlterState.SUCCESS); + break; + case IGNORED: + LOG.debug("Schema update IGNORED Updating regions to " + + "process count. Status = "+ schemaAlterStatus); + ignoreRSForSchemaChange(schemaAlterStatus); + break; + default: + break; + } + } + + @Override + public void readFields(DataInput in) throws IOException { + currentAlterStatus = AlterState.valueOf(in.readUTF()); + stamp = in.readLong(); + numberOfRegionsToProcess = in.readInt(); + hostsToProcess = Bytes.toString(Bytes.readByteArray(in)); + processedHosts = new StringBuffer(Bytes.toString(Bytes.readByteArray(in))); + errorCause = new StringBuffer(Bytes.toString(Bytes.readByteArray(in))); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(currentAlterStatus.name()); + out.writeLong(stamp); + out.writeInt(numberOfRegionsToProcess); + Bytes.writeByteArray(out, Bytes.toBytes(hostsToProcess)); + Bytes.writeByteArray(out, Bytes.toBytes(processedHosts.toString())); + Bytes.writeByteArray(out, Bytes.toBytes(errorCause.toString())); + } + + @Override + public String toString() { + return + " state= " + currentAlterStatus + + ", ts= " + stamp + + ", number of regions to process = " + numberOfRegionsToProcess + + ", number of regions processed = " + numberOfRegionsProcessed + + ", hosts = " + hostsToProcess + + " , processed hosts = " + processedHosts + + " , errorCause = " + errorCause; + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java new file mode 100644 index 0000000..25b3de0 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java @@ -0,0 +1,452 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.util.Writables; + +import org.apache.hadoop.io.Writable; + +import java.io.*; +import java.util.List; + +/** + * Region server schema change tracker. RS uses this tracker to keep track of + * alter schema requests from master and updates the status once the schema change + * is complete. + */ +public class SchemaChangeTracker extends ZooKeeperNodeTracker { + public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class); + private RegionServerServices regionServer = null; + private volatile int sleepTimeMillis = 0; + + + /** + * Constructs a new ZK node tracker. + *

+ *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param node + * @param abortable + */ + public SchemaChangeTracker(ZooKeeperWatcher watcher, + Abortable abortable, + RegionServerServices regionServer) { + super(watcher, watcher.schemaZNode, abortable); + this.regionServer = regionServer; + } + + @Override + public void start() { + try { + watcher.registerListener(this); + ZKUtil.listChildrenAndWatchThem(watcher, node); + // Clean-up old in-process schema changes for this RS now? + } catch (KeeperException e) { + LOG.error("RegionServer SchemaChangeTracker startup failed with " + + "KeeperException.", e); + } + } + + + /** + * This event will be triggered whenever new schema change request is processed by the + * master. The path will be of the format /hbase/schema/

+ * @param path full path of the node whose children have changed + */ + @Override + public void nodeChildrenChanged(String path) { + LOG.debug("NodeChildrenChanged. Path = " + path); + if (path.equals(watcher.schemaZNode)) { + try { + List tables = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode); + LOG.debug("RS.SchemaChangeTracker: " + + "Current list of tables with schema change = " + tables); + if (tables != null) { + handleSchemaChange(tables); + } else { + LOG.error("No tables found for schema change event." + + " Skipping instant schema refresh"); + } + } catch (KeeperException ke) { + String errmsg = "KeeperException while handling nodeChildrenChanged for path = " + + path + " Cause = " + ke.getCause(); + LOG.error(errmsg, ke); + TaskMonitor.get().createStatus(errmsg); + } + } + } + + private void handleSchemaChange(List tables) { + for (String tableName : tables) { + if (tableName != null) { + LOG.debug("Processing schema change with status for table = " + tableName); + handleSchemaChange(tableName); + } + } + } + + private void handleSchemaChange(String tableName) { + int refreshedRegionsCount = 0, onlineRegionsCount = 0; + MonitoredTask status = null; + try { + List onlineRegions = + regionServer.getOnlineRegions(Bytes.toBytes(tableName)); + if (onlineRegions != null && !onlineRegions.isEmpty()) { + status = TaskMonitor.get().createStatus("Region server " + + regionServer.getServerName().getServerName() + + " handling schema change for table = " + tableName + + " number of online regions = " + onlineRegions.size()); + onlineRegionsCount = onlineRegions.size(); + createStateNode(tableName, onlineRegions.size()); + for (HRegion hRegion : onlineRegions) { + regionServer.refreshRegion(hRegion); + refreshedRegionsCount ++; + } + SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName); + alterStatus.update(SchemaAlterStatus.AlterState.SUCCESS, refreshedRegionsCount); + updateSchemaChangeStatus(tableName, alterStatus); + String msg = "Refresh schema completed for table name = " + tableName + + " server = " + regionServer.getServerName().getServerName() + + " online Regions = " + onlineRegions.size() + + " refreshed Regions = " + refreshedRegionsCount; + LOG.debug(msg); + status.setStatus(msg); + } else { + LOG.debug("Server " + regionServer.getServerName().getServerName() + + " has no online regions for table = " + tableName + + " Ignoring the schema change request"); + } + } catch (IOException ioe) { + reportAndLogSchemaRefreshError(tableName, onlineRegionsCount, + refreshedRegionsCount, ioe, status); + } catch (KeeperException ke) { + reportAndLogSchemaRefreshError(tableName, onlineRegionsCount, + refreshedRegionsCount, ke, status); + } + } + + private int getZKNodeVersion(String nodePath) throws KeeperException { + return ZKUtil.checkExists(this.watcher, nodePath); + } + + private void reportAndLogSchemaRefreshError(String tableName, + int onlineRegionsCount, + int refreshedRegionsCount, + Throwable exception, + MonitoredTask status) { + try { + String errmsg = + " Region Server " + regionServer.getServerName().getServerName() + + " failed during schema change process. Cause = " + + exception.getCause() + + " Number of onlineRegions = " + onlineRegionsCount + + " Processed regions = " + refreshedRegionsCount; + SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName); + alterStatus.update(SchemaAlterStatus.AlterState.FAILURE, + refreshedRegionsCount, errmsg); + String nodePath = getSchemaChangeNodePathForTableAndServer(tableName, + regionServer.getServerName().getServerName()); + ZKUtil.updateExistingNodeData(this.watcher, nodePath, + Writables.getBytes(alterStatus), getZKNodeVersion(nodePath)); + LOG.info("reportAndLogSchemaRefreshError() " + + " Updated child ZKNode with SchemaAlterStatus = " + + alterStatus + " for table = " + tableName); + if (status == null) { + status = TaskMonitor.get().createStatus(errmsg); + } else { + status.setStatus(errmsg); + } + } catch (KeeperException e) { + // Retry ? + String errmsg = "KeeperException while updating the schema change node with " + + "error status for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName() + + " Cause = " + e.getCause(); + LOG.error(errmsg, e); + TaskMonitor.get().createStatus(errmsg); + } catch(IOException ioe) { + // retry ?? + String errmsg = "IOException while updating the schema change node with " + + "server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName() + + " Cause = " + ioe.getCause(); + TaskMonitor.get().createStatus(errmsg); + LOG.error(errmsg, ioe); + } + } + + + private void createStateNode(String tableName, int numberOfOnlineRegions) + throws IOException { + SchemaAlterStatus sas = + new SchemaAlterStatus(regionServer.getServerName().getServerName(), + numberOfOnlineRegions); + LOG.debug("Creating Schema Alter State node = " + sas); + try { + ZKUtil.createSetData(this.watcher, + getSchemaChangeNodePathForTableAndServer(tableName, + regionServer.getServerName().getServerName()), + Writables.getBytes(sas)); + } catch (KeeperException ke) { + String errmsg = "KeeperException while creating the schema change node with " + + "server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName() + + " Message = " + ke.getCause(); + LOG.error(errmsg, ke); + TaskMonitor.get().createStatus(errmsg); + } + + } + + private SchemaAlterStatus getSchemaAlterStatus(String tableName) + throws KeeperException, IOException { + byte[] statusBytes = ZKUtil.getData(this.watcher, + getSchemaChangeNodePathForTableAndServer(tableName, + regionServer.getServerName().getServerName())); + if (statusBytes == null || statusBytes.length <= 0) { + return null; + } + SchemaAlterStatus sas = new SchemaAlterStatus(); + Writables.getWritable(statusBytes, sas); + return sas; + } + + private void updateSchemaChangeStatus(String tableName, + SchemaAlterStatus schemaAlterStatus) + throws KeeperException, IOException { + try { + if(sleepTimeMillis > 0) { + try { + LOG.debug("SchemaChangeTracker sleeping for " + + sleepTimeMillis); + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + ZKUtil.updateExistingNodeData(this.watcher, + getSchemaChangeNodePathForTableAndServer(tableName, + regionServer.getServerName().getServerName()), + Writables.getBytes(schemaAlterStatus), -1); + String msg = "Schema change tracker completed for table = " + tableName + + " status = " + schemaAlterStatus; + LOG.debug(msg); + TaskMonitor.get().createStatus(msg); + } catch (KeeperException.NoNodeException e) { + String errmsg = "KeeperException.NoNodeException while updating the schema " + + "change node with server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName() + + " Cause = " + e.getCause(); + TaskMonitor.get().createStatus(errmsg); + LOG.error(errmsg, e); + } catch (KeeperException e) { + // Retry ? + String errmsg = "KeeperException while updating the schema change node with " + + "server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName() + + " Cause = " + e.getCause(); + LOG.error(errmsg, e); + TaskMonitor.get().createStatus(errmsg); + } catch(IOException ioe) { + String errmsg = "IOException while updating the schema change node with " + + "server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName() + + " Cause = " + ioe.getCause(); + LOG.error(errmsg, ioe); + TaskMonitor.get().createStatus(errmsg); + } + } + + private String getSchemaChangeNodePathForTable(String tableName) { + return ZKUtil.joinZNode(watcher.schemaZNode, tableName); + } + + private String getSchemaChangeNodePathForTableAndServer( + String tableName, String regionServerName) { + return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName), + regionServerName); + } + + public int getSleepTimeMillis() { + return sleepTimeMillis; + } + + /** + * Set a sleep time in millis before this RS can update it's progress status. + * Used only for test cases to test complex test scenarios such as RS failures and + * RS exemption handling. + * @param sleepTimeMillis + */ + public void setSleepTimeMillis(int sleepTimeMillis) { + this.sleepTimeMillis = sleepTimeMillis; + } + + + /** + * Holds the current alter state for a table. Alter state includes the + * current alter status (INPROCESS, FAILURE, SUCCESS, or IGNORED, current RS + * host name, timestamp of alter request, number of online regions this RS has for + * the given table, number of processed regions and an errorCause in case + * if the RS failed during the schema change process. + * + * RS keeps track of schema change requests per table using the alter status and + * periodically updates the alter status based on schema change status. + */ + public static class SchemaAlterStatus implements Writable { + + public enum AlterState { + INPROCESS, // Inprocess alter + SUCCESS, // completed alter + FAILURE, // failure alter + IGNORED // Ignore the alter processing. + } + + private AlterState currentAlterStatus; + // TimeStamp + private long stamp; + private int numberOfOnlineRegions; + private String errorCause = " "; + private String hostName; + private int numberOfRegionsProcessed = 0; + + public SchemaAlterStatus() { + + } + + public SchemaAlterStatus(String hostName, int numberOfOnlineRegions) { + this.numberOfOnlineRegions = numberOfOnlineRegions; + this.stamp = System.currentTimeMillis(); + this.currentAlterStatus = AlterState.INPROCESS; + //this.rsToProcess = activeHosts; + this.hostName = hostName; + } + + public AlterState getCurrentAlterStatus() { + return currentAlterStatus; + } + + public void setCurrentAlterStatus(AlterState currentAlterStatus) { + this.currentAlterStatus = currentAlterStatus; + } + + public int getNumberOfOnlineRegions() { + return numberOfOnlineRegions; + } + + public void setNumberOfOnlineRegions(int numberOfRegions) { + this.numberOfOnlineRegions = numberOfRegions; + } + + public int getNumberOfRegionsProcessed() { + return numberOfRegionsProcessed; + } + + public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) { + this.numberOfRegionsProcessed = numberOfRegionsProcessed; + } + + public String getErrorCause() { + return errorCause; + } + + public void setErrorCause(String errorCause) { + this.errorCause = errorCause; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public void update(AlterState state, int numberOfRegions, String errorCause) { + this.currentAlterStatus = state; + this.numberOfRegionsProcessed = numberOfRegions; + this.errorCause = errorCause; + } + + public void update(AlterState state, int numberOfRegions) { + this.currentAlterStatus = state; + this.numberOfRegionsProcessed = numberOfRegions; + } + + public void update(AlterState state) { + this.currentAlterStatus = state; + } + + public void update(SchemaAlterStatus status) { + this.currentAlterStatus = status.getCurrentAlterStatus(); + this.numberOfRegionsProcessed = status.getNumberOfRegionsProcessed(); + this.errorCause = status.getErrorCause(); + } + + @Override + public void readFields(DataInput in) throws IOException { + currentAlterStatus = AlterState.valueOf(in.readUTF()); + stamp = in.readLong(); + numberOfOnlineRegions = in.readInt(); + hostName = Bytes.toString(Bytes.readByteArray(in)); + numberOfRegionsProcessed = in.readInt(); + errorCause = Bytes.toString(Bytes.readByteArray(in)); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(currentAlterStatus.name()); + out.writeLong(stamp); + out.writeInt(numberOfOnlineRegions); + Bytes.writeByteArray(out, Bytes.toBytes(hostName)); + out.writeInt(numberOfRegionsProcessed); + Bytes.writeByteArray(out, Bytes.toBytes(errorCause)); + } + + @Override + public String toString() { + return + " state= " + currentAlterStatus + + ", ts= " + stamp + + ", number of online regions = " + numberOfOnlineRegions + + ", host= " + hostName + " processed regions = " + numberOfRegionsProcessed + + ", errorCause = " + errorCause; + } + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index b20d371..04465e1 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -85,6 +85,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable { public String clusterIdZNode; // znode used for log splitting work assignment public String splitLogZNode; + // znode used to record table schema changes + public String schemaZNode; + // znode used to keep track of Load Balancer run status + public String loadBalancerZNode; private final Configuration conf; @@ -140,6 +144,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { ZKUtil.createAndFailSilent(this, rsZNode); ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); + ZKUtil.createAndFailSilent(this, schemaZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -187,6 +192,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable { conf.get("zookeeper.znode.clusterId", "hbaseid")); splitLogZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); + schemaZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.schema", "schema")); + loadBalancerZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.loadbalancer", "loadbalancer")); } /** diff --git a/src/main/resources/hbase-default.xml b/src/main/resources/hbase-default.xml index 3785533..9407a3d 100644 --- a/src/main/resources/hbase-default.xml +++ b/src/main/resources/hbase-default.xml @@ -724,15 +724,39 @@ - hbase.coprocessor.abortonerror - false - - Set to true to cause the hosting server (master or regionserver) to - abort if a coprocessor throws a Throwable object that is not IOException or - a subclass of IOException. Setting it to true might be useful in development - environments where one wants to terminate the server as soon as possible to - simplify coprocessor failure analysis. - + hbase.coprocessor.abortonerror + false + + Set to true to cause the hosting server (master or regionserver) to + abort if a coprocessor throws a Throwable object that is not IOException or + a subclass of IOException. Setting it to true might be useful in development + environments where one wants to terminate the server as soon as possible to + simplify coprocessor failure analysis. + + + + hbase.instant.schema.alter.enabled + false + Whether or not to handle alter schema changes instantly or not. + If enabled, all schema change alter operations will be instant, as the master will not + explicitly unassign/assign the impacted regions and instead will rely on Region servers to + refresh their schema changes. If enabled, the schema alter requests will survive + master or RS failures. + + + + hbase.instant.schema.janitor.period + 120000 + The Schema Janitor process wakes up every millis and sweeps all + expired/failed schema change requests. + + + + hbase.instant.schema.alter.timeout + 60000 + Timeout in millis after which any pending schema alter request will be + considered as failed. + dfs.support.append diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java b/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java new file mode 100644 index 0000000..72e0941 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java @@ -0,0 +1,736 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public class TestInstantSchemaChange { + + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private HBaseAdmin admin; + private static MiniHBaseCluster miniHBaseCluster = null; + private Configuration conf; + private ZooKeeperWatcher zkw; + private static MasterSchemaChangeTracker msct = null; + + + private final byte [] row = Bytes.toBytes("row"); + private final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true); + TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000); + TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000); + // + miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5); + msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } + + /** + * Find the RS that is currently holding our online region. + * @param tableName + * @return + */ + private HRegionServer findRSWithOnlineRegionFor(String tableName) { + List rsThreads = + miniHBaseCluster.getLiveRegionServerThreads(); + for (JVMClusterUtil.RegionServerThread rsT : rsThreads) { + HRegionServer rs = rsT.getRegionServer(); + List regions = rs.getOnlineRegions(Bytes.toBytes(tableName)); + if (regions != null && !regions.isEmpty()) { + return rs; + } + } + return null; + } + + @Test + public void testInstantSchemaChangeForModifyTable() throws IOException, + KeeperException, InterruptedException { + + String tableName = "testInstantSchemaChangeForModifyTable"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testInstantSchemaChangeForModifyTable()"); + HTable ht = createTableAndValidate(tableName); + + String newFamily = "newFamily"; + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(newFamily)); + + admin.modifyTable(Bytes.toBytes(tableName), htd); + waitForSchemaChangeProcess(tableName); + assertFalse(msct.doesSchemaChangeNodeExists(tableName)); + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + LOG.info("END testInstantSchemaChangeForModifyTable()"); + + } + + private void waitForSchemaChangeProcess(final String tableName) + throws KeeperException, InterruptedException { + waitForSchemaChangeProcess(tableName, 10000); + } + + + /** + * This a pretty low cost signalling mechanism. It is quite possible that we will + * miss out the ZK node creation signal as in some cases the schema change process + * happens rather quickly and our thread waiting for ZK node creation might wait forver. + * The fool-proof strategy would be to directly listen for ZK events. + * @param tableName + * @throws KeeperException + * @throws InterruptedException + */ + private void waitForSchemaChangeProcess(final String tableName, final long waitTimeMills) + throws KeeperException, InterruptedException { + LOG.info("Waiting for ZK node creation for table = " + tableName); + final MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + final Runnable r = new Runnable() { + public void run() { + try { + while(!msct.doesSchemaChangeNodeExists(tableName)) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } catch (KeeperException ke) { + ke.printStackTrace(); + } + LOG.info("Waiting for ZK node deletion for table = " + tableName); + try { + while(msct.doesSchemaChangeNodeExists(tableName)) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } catch (KeeperException ke) { + ke.printStackTrace(); + } + } + }; + Thread t = new Thread(r); + t.start(); + if (waitTimeMills > 0) { + t.join(waitTimeMills); + } else { + t.join(10000); + } + } + + private HTable createTableAndValidate(String tableName) throws IOException { + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start createTableAndValidate()"); + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + HTableDescriptor[] tables = admin.listTables(); + int numTables = 0; + if (tables != null) { + numTables = tables.length; + } + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("created table = " + tableName); + return ht; + } + + @Test + public void testInstantSchemaChangeForAddColumn() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeForAddColumn() "); + String tableName = "testSchemachangeForAddColumn"; + HTable ht = createTableAndValidate(tableName); + String newFamily = "newFamily"; + HColumnDescriptor hcd = new HColumnDescriptor("newFamily"); + + admin.addColumn(Bytes.toBytes(tableName), hcd); + waitForSchemaChangeProcess(tableName); + assertFalse(msct.doesSchemaChangeNodeExists(tableName)); + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + LOG.info("******** Put into new column family "); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + LOG.info(" Value put = " + value + " value from table = " + tvalue); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + LOG.info("End testInstantSchemaChangeForAddColumn() "); + + } + + @Test + public void testInstantSchemaChangeForModifyColumn() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeForModifyColumn() "); + String tableName = "testSchemachangeForModifyColumn"; + HTable ht = createTableAndValidate(tableName); + + HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); + hcd.setMaxVersions(99); + hcd.setBlockCacheEnabled(false); + + admin.modifyColumn(Bytes.toBytes(tableName), hcd); + waitForSchemaChangeProcess(tableName); + assertFalse(msct.doesSchemaChangeNodeExists(tableName)); + + List onlineRegions + = miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn")); + for (HRegion onlineRegion : onlineRegions) { + HTableDescriptor htd = onlineRegion.getTableDesc(); + HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY); + assertTrue(tableHcd.isBlockCacheEnabled() == false); + assertEquals(tableHcd.getMaxVersions(), 99); + } + LOG.info("End testInstantSchemaChangeForModifyColumn() "); + + } + + @Test + public void testInstantSchemaChangeForDeleteColumn() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeForDeleteColumn() "); + String tableName = "testSchemachangeForDeleteColumn"; + int numTables = 0; + HTableDescriptor[] tables = admin.listTables(); + if (tables != null) { + numTables = tables.length; + } + + byte[][] FAMILIES = new byte[][] { + Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; + + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + FAMILIES); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachangeForDeleteColumn created"); + + admin.deleteColumn(tableName, "C"); + + waitForSchemaChangeProcess(tableName); + assertFalse(msct.doesSchemaChangeNodeExists(tableName)); + HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(Bytes.toBytes(tableName)); + HColumnDescriptor hcd = modifiedHtd.getFamily(Bytes.toBytes("C")); + assertTrue(hcd == null); + LOG.info("End testInstantSchemaChangeForDeleteColumn() "); + } + + @Test + public void testInstantSchemaChangeWhenTableIsNotEnabled() throws IOException, + KeeperException { + final String tableName = "testInstantSchemaChangeWhenTableIsDisabled"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testInstantSchemaChangeWhenTableIsDisabled()"); + HTable ht = createTableAndValidate(tableName); + // Disable table + admin.disableTable("testInstantSchemaChangeWhenTableIsDisabled"); + // perform schema changes + HColumnDescriptor hcd = new HColumnDescriptor("newFamily"); + admin.addColumn(Bytes.toBytes(tableName), hcd); + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + assertTrue(msct.doesSchemaChangeNodeExists(tableName) == false); + } + + /** + * Test that when concurrent alter requests are received for a table we don't miss any. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testConcurrentInstantSchemaChangeForAddColumn() throws IOException, + KeeperException, InterruptedException { + final String tableName = "testConcurrentInstantSchemaChangeForModifyTable"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testConcurrentInstantSchemaChangeForModifyTable()"); + HTable ht = createTableAndValidate(tableName); + + Runnable run1 = new Runnable() { + public void run() { + HColumnDescriptor hcd = new HColumnDescriptor("family1"); + try { + admin.addColumn(Bytes.toBytes(tableName), hcd); + } catch (IOException ioe) { + ioe.printStackTrace(); + + } + } + }; + Runnable run2 = new Runnable() { + public void run() { + HColumnDescriptor hcd = new HColumnDescriptor("family2"); + try { + admin.addColumn(Bytes.toBytes(tableName), hcd); + } catch (IOException ioe) { + ioe.printStackTrace(); + + } + } + }; + + run1.run(); + // We have to add a sleep here as in concurrent scenarios the HTD update + // in HDFS fails and returns with null HTD. This needs to be investigated, + // but it doesn't impact the instant alter functionality in any way. + Thread.sleep(100); + run2.run(); + + waitForSchemaChangeProcess(tableName); + + Put put1 = new Put(row); + put1.add(Bytes.toBytes("family1"), qualifier, value); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes("family1"), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes("family1"), qualifier); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + Thread.sleep(10000); + + Put put2 = new Put(row); + put2.add(Bytes.toBytes("family2"), qualifier, value); + ht.put(put2); + + Get get2 = new Get(row); + get2.addColumn(Bytes.toBytes("family2"), qualifier); + Result r2 = ht.get(get2); + byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); + int result2 = Bytes.compareTo(value, tvalue2); + assertEquals(result2, 0); + LOG.info("END testConcurrentInstantSchemaChangeForModifyTable()"); + } + + /** + * The schema change request blocks while a LB run is in progress. This + * test validates this behavior. + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test + public void testConcurrentInstantSchemaChangeAndLoadBalancerRun() throws IOException, + InterruptedException, KeeperException { + final String tableName = "testInstantSchemaChangeWithLoadBalancerRunning"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testInstantSchemaChangeWithLoadBalancerRunning()"); + final String newFamily = "newFamily"; + HTable ht = createTableAndValidate(tableName); + final MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + + Runnable balancer = new Runnable() { + public void run() { + // run the balancer now. + miniHBaseCluster.getMaster().balance(); + } + }; + + Runnable schemaChanger = new Runnable() { + public void run() { + HColumnDescriptor hcd = new HColumnDescriptor(newFamily); + try { + admin.addColumn(Bytes.toBytes(tableName), hcd); + } catch (IOException ioe) { + ioe.printStackTrace(); + + } + } + }; + + balancer.run(); + schemaChanger.run(); + waitForSchemaChangeProcess(tableName); + assertFalse(msct.doesSchemaChangeNodeExists(tableName)); + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + LOG.info("******** Put into new column family "); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + LOG.info(" Value put = " + value + " value from table = " + tvalue); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + + LOG.info("End testInstantSchemaChangeWithLoadBalancerRunning() "); + } + + + /** + * This test validates two things. One is that the LoadBalancer does not run when a schema + * change process is in progress. The second thing is that it also checks that failed/expired + * schema changes are expired to unblock the load balancer run. + * + */ + @Test (timeout=70000) + public void testLoadBalancerBlocksDuringSchemaChangeRequests() throws KeeperException, + IOException, InterruptedException { + LOG.info("Start testConcurrentLoadBalancerSchemaChangeRequests() "); + final MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + // Test that the load balancer does not run while an in-flight schema + // change operation is in progress. + // Simulate a new schema change request. + msct.createSchemaChangeNode("testLoadBalancerBlocks", 0); + // The schema change node is created. + assertTrue(msct.doesSchemaChangeNodeExists("testLoadBalancerBlocks")); + // Now, request an explicit LB run. + + Runnable balancer1 = new Runnable() { + public void run() { + // run the balancer now. + miniHBaseCluster.getMaster().balance(); + } + }; + balancer1.run(); + + // Load balancer should not run now. + assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false); + LOG.debug("testConcurrentLoadBalancerSchemaChangeRequests Asserted"); + LOG.info("End testConcurrentLoadBalancerSchemaChangeRequests() "); + } + + /** + * Test that instant schema change blocks while LB is running. + * @throws KeeperException + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=10000) + public void testInstantSchemaChangeBlocksDuringLoadBalancerRun() throws KeeperException, + IOException, InterruptedException { + LOG.info("Start testInstantSchemaChangeBlocksDuringLoadBalancerRun() "); + final MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + // Test that the schema change request does not run while an in-flight LB run + // is in progress. + // First, request an explicit LB run. + + Runnable balancer1 = new Runnable() { + public void run() { + // run the balancer now. + miniHBaseCluster.getMaster().balance(); + } + }; + + Runnable schemaChanger = new Runnable() { + public void run() { + try { + msct.createSchemaChangeNode("testSchemaChangeBlocks", 10); + } catch (IOException e) { + e.printStackTrace(); + } catch (KeeperException ke) { + ke.printStackTrace(); + } + } + }; + + Thread t1 = new Thread(balancer1); + Thread t2 = new Thread(schemaChanger); + t1.start(); + t2.start(); + + // check that they both happen concurrently + Runnable balancerCheck = new Runnable() { + public void run() { + // check whether balancer created ZK node. + while(!miniHBaseCluster.getMaster().isLoadBalancerRunning()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + try { + assertFalse(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks")); + } catch (KeeperException ke) { + ke.printStackTrace(); + } + LOG.debug("Load Balancer is now running"); + } + }; + + Thread t = new Thread(balancerCheck); + t.start(); + t.join(1000); + // Load balancer should not run now. + assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false); + // Schema change request node should now exist. + assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks")); + LOG.debug("testInstantSchemaChangeBlocksDuringLoadBalancerRun Asserted"); + LOG.info("End testInstantSchemaChangeBlocksDuringLoadBalancerRun() "); + } + + /** + * This test validates that we create a ZK node before LB run and delete the same + * after completing the LB run. + * @throws KeeperException + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=70000) + public void testLoadBalancerOperationInZK() throws KeeperException, + IOException, InterruptedException { + LOG.info("Start testLoadBalancerOperationInZK() "); + assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false); + Runnable balancer1 = new Runnable() { + public void run() { + // run the balancer now. + miniHBaseCluster.getMaster().balance(); + } + }; + balancer1.run(); + + Runnable balancerCheck = new Runnable() { + public void run() { + // check whether balancer created ZK node. + while(!miniHBaseCluster.getMaster().isLoadBalancerRunning()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + LOG.debug("Load Balancer is now running"); + } + }; + + Thread t = new Thread(balancerCheck); + t.start(); + t.join(1000); + // Load balancer node should be deleted now. + assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false); + LOG.debug("testLoadBalancerOperationInZK Asserted"); + LOG.info("End testLoadBalancerOperationInZK() "); + } + + /** + * To test the schema janitor (that it cleans expired/failed schema alter attempts) we + * simply create a fake table (that doesn't exist, with fake number of online regions) in ZK. + * This schema alter request will time out (after 30 seconds) and our janitor will clean it up. + * regions + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testInstantSchemaJanitor() throws IOException, + KeeperException, InterruptedException { + LOG.info("testInstantSchemaWithFailedExpiredOperations() "); + String fakeTableName = "testInstantSchemaWithFailedExpiredOperations"; + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + msct.createSchemaChangeNode(fakeTableName, 10); + LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName) + + " created"); + Thread.sleep(40000); + assertFalse(msct.doesSchemaChangeNodeExists(fakeTableName)); + LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName) + + " deleted"); + LOG.info("END testInstantSchemaWithFailedExpiredOperations() "); + } + + + /** + * The objective of the following test is to validate that schema exclusions happen properly. + * When a RS server dies or crashes(?) mid-flight during a schema refresh, we would exclude + * all online regions in that RS, as well as the RS itself from schema change process. + * + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testInstantSchemaChangeExclusions() throws IOException, + KeeperException, InterruptedException { + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + LOG.info("Start testInstantSchemaChangeExclusions() "); + String tableName = "testInstantSchemaChangeExclusions"; + HTable ht = createTableAndValidate(tableName); + + HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); + hcd.setMaxVersions(99); + hcd.setBlockCacheEnabled(false); + + HRegionServer hrs = findRSWithOnlineRegionFor(tableName); + //miniHBaseCluster.getRegionServer(0).abort("killed for test"); + admin.modifyColumn(Bytes.toBytes(tableName), hcd); + hrs.abort("Aborting for tests"); + hrs.getSchemaChangeTracker().setSleepTimeMillis(20000); + + //admin.modifyColumn(Bytes.toBytes(tableName), hcd); + LOG.debug("Waiting for Schema Change process to complete"); + waitForSchemaChangeProcess(tableName, 15000); + assertEquals(msct.doesSchemaChangeNodeExists(tableName), false); + // Sleep for some time so that our region is reassigned to some other RS + // by master. + Thread.sleep(10000); + List onlineRegions + = miniHBaseCluster.getRegions(Bytes.toBytes("testInstantSchemaChangeExclusions")); + assertTrue(!onlineRegions.isEmpty()); + for (HRegion onlineRegion : onlineRegions) { + HTableDescriptor htd = onlineRegion.getTableDesc(); + HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY); + assertTrue(tableHcd.isBlockCacheEnabled() == false); + assertEquals(tableHcd.getMaxVersions(), 99); + } + LOG.info("End testInstantSchemaChangeExclusions() "); + + } + + /** + * This test validates that when a schema change request fails on the + * RS side, we appropriately register the failure in the Master Schema change + * tracker's node as well as capture the error cause. + * + * Currently an alter request fails if RS fails with an IO exception say due to + * missing or incorrect codec. With instant schema change the same failure happens + * and we register the failure with associated cause and also update the + * monitor status appropriately. + * + * The region(s) will be orphaned in both the cases. + * + */ + @Test + public void testInstantSchemaChangeWhileRSOpenRegionFailure() throws IOException, + KeeperException, InterruptedException { + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + LOG.info("Start testInstantSchemaChangeWhileRSOpenRegionFailure() "); + String tableName = "testInstantSchemaChangeWhileRSOpenRegionFailure"; + HTable ht = createTableAndValidate(tableName); + + // create now 100 regions + TEST_UTIL.createMultiRegions(conf, ht, + HConstants.CATALOG_FAMILY, 10); + + // wait for all the regions to be assigned + Thread.sleep(10000); + List onlineRegions + = miniHBaseCluster.getRegions( + Bytes.toBytes("testInstantSchemaChangeWhileRSOpenRegionFailure")); + int size = onlineRegions.size(); + // we will not have any online regions + LOG.info("Size of online regions = " + onlineRegions.size()); + + HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); + hcd.setMaxVersions(99); + hcd.setBlockCacheEnabled(false); + hcd.setCompressionType(Compression.Algorithm.SNAPPY); + + admin.modifyColumn(Bytes.toBytes(tableName), hcd); + Thread.sleep(100); + + assertEquals(msct.doesSchemaChangeNodeExists(tableName), true); + Thread.sleep(10000); + // get the current alter status and validate that its failure with appropriate error msg. + MasterSchemaChangeTracker.MasterAlterStatus mas = msct.getMasterAlterStatus(tableName); + assertTrue(mas != null); + assertEquals(mas.getCurrentAlterStatus(), + MasterSchemaChangeTracker.MasterAlterStatus.AlterState.FAILURE); + assertTrue(mas.getErrorCause() != null); + LOG.info("End testInstantSchemaChangeWhileRSOpenRegionFailure() "); + } + + +} + + diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java b/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java new file mode 100644 index 0000000..0869bb4 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java @@ -0,0 +1,328 @@ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +public class TestInstantSchemaChangeFailover { + + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private HBaseAdmin admin; + private static MiniHBaseCluster miniHBaseCluster = null; + private Configuration conf; + private ZooKeeperWatcher zkw; + private static MasterSchemaChangeTracker msct = null; + + private final byte [] row = Bytes.toBytes("row"); + private final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + // do not change this count of RS + private static final int NUMBER_OF_REGION_SERVERS = 5; + // do not change this count of masters. + private static final int NUMBER_OF_MASTERS = 3; + + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true); + TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 30000); + TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000); + + miniHBaseCluster = TEST_UTIL.startMiniCluster(NUMBER_OF_MASTERS, + NUMBER_OF_REGION_SERVERS); + msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + //zkw = miniHBaseCluster.getMaster().getZooKeeperWatcher(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } + + /** + * This a pretty low cost signalling mechanism. It is quite possible that we will + * miss out the ZK node creation signal as in some cases the schema change process + * happens rather quickly and our thread waiting for ZK node creation might wait forver. + * The fool-proof strategy would be to directly listen for ZK events. + * @param tableName + * @throws KeeperException + * @throws InterruptedException + */ + private void waitForSchemaChangeProcess(final String tableName) + throws KeeperException, InterruptedException { + LOG.info("Waiting for ZK node creation for table = " + tableName); + final MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + final Runnable r = new Runnable() { + public void run() { + try { + while(!msct.doesSchemaChangeNodeExists(tableName)) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } catch (KeeperException ke) { + ke.printStackTrace(); + } + + LOG.info("Waiting for ZK node deletion for table = " + tableName); + try { + while(msct.doesSchemaChangeNodeExists(tableName)) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } catch (KeeperException ke) { + ke.printStackTrace(); + } + } + }; + Thread t = new Thread(r); + t.start(); + t.join(10000); + } + + + /** + * Kill a random RS and see that the schema change can succeed. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test (timeout=50000) + public void testInstantSchemaChangeWhileRSCrash() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeWhileRSCrash()"); + zkw = miniHBaseCluster.getMaster().getZooKeeperWatcher(); + + final String tableName = "TestRSCrashDuringSchemaChange"; + HTable ht = createTableAndValidate(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("family2"); + admin.addColumn(Bytes.toBytes(tableName), hcd); + + miniHBaseCluster.getRegionServer(0).abort("Killing while instant schema change"); + // Let the dust settle down + Thread.sleep(10000); + waitForSchemaChangeProcess(tableName); + Put put2 = new Put(row); + put2.add(Bytes.toBytes("family2"), qualifier, value); + ht.put(put2); + + Get get2 = new Get(row); + get2.addColumn(Bytes.toBytes("family2"), qualifier); + Result r2 = ht.get(get2); + byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); + int result2 = Bytes.compareTo(value, tvalue2); + assertEquals(result2, 0); + String nodePath = msct.getSchemaChangeNodePathForTable("TestRSCrashDuringSchemaChange"); + assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1); + LOG.info("result2 = " + result2); + LOG.info("end testInstantSchemaChangeWhileRSCrash()"); + } + + /** + * Randomly bring down/up RS servers while schema change is in progress. This test + * is same as the above one but the only difference is that we intent to kill and start + * new RS instances while a schema change is in progress. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test (timeout=70000) + public void testInstantSchemaChangeWhileRandomRSCrashAndStart() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeWhileRandomRSCrashAndStart()"); + miniHBaseCluster.getRegionServer(4).abort("Killing RS 4"); + // Start a new RS before schema change . + // Commenting the start RS as it is failing with DFS user permission NPE. + //miniHBaseCluster.startRegionServer(); + + // Let the dust settle + Thread.sleep(10000); + final String tableName = "testInstantSchemaChangeWhileRandomRSCrashAndStart"; + HTable ht = createTableAndValidate(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("family2"); + admin.addColumn(Bytes.toBytes(tableName), hcd); + // Kill 2 RS now. + miniHBaseCluster.getRegionServer(2).abort("Killing RS 2"); + // Let the dust settle + Thread.sleep(10000); + // We will be left with only one RS. + waitForSchemaChangeProcess(tableName); + assertFalse(msct.doesSchemaChangeNodeExists(tableName)); + Put put2 = new Put(row); + put2.add(Bytes.toBytes("family2"), qualifier, value); + ht.put(put2); + + Get get2 = new Get(row); + get2.addColumn(Bytes.toBytes("family2"), qualifier); + Result r2 = ht.get(get2); + byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); + int result2 = Bytes.compareTo(value, tvalue2); + assertEquals(result2, 0); + LOG.info("result2 = " + result2); + LOG.info("end testInstantSchemaChangeWhileRandomRSCrashAndStart()"); + } + + /** + * Test scenario where primary master is brought down while processing an + * alter request. This is harder one as it is very difficult the time this. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + + @Test (timeout=50000) + public void testInstantSchemaChangeWhileMasterFailover() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeWhileMasterFailover()"); + //Thread.sleep(5000); + + final String tableName = "testInstantSchemaChangeWhileMasterFailover"; + HTable ht = createTableAndValidate(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("family2"); + admin.addColumn(Bytes.toBytes(tableName), hcd); + // Kill primary master now. + Thread.sleep(50); + miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception")); + + // It may not be accurate for us to check the schema change status + // using waitForSchemaChangeProcess as our ZK session in MasterSchemachangeTracker will be + // lost when master dies and hence may not be accurate. So relying on old-fashioned + // sleep here. + Thread.sleep(25000); + Put put2 = new Put(row); + put2.add(Bytes.toBytes("family2"), qualifier, value); + ht.put(put2); + + Get get2 = new Get(row); + get2.addColumn(Bytes.toBytes("family2"), qualifier); + Result r2 = ht.get(get2); + byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); + int result2 = Bytes.compareTo(value, tvalue2); + assertEquals(result2, 0); + LOG.info("result2 = " + result2); + LOG.info("end testInstantSchemaChangeWhileMasterFailover()"); + } + + /** + * TEst the master fail over during a schema change request in ZK. + * We create a fake schema change request in ZK and abort the primary master + * mid-flight to simulate a master fail over scenario during a mid-flight + * schema change process. The new master's schema janitor will eventually + * cleanup this fake request after time out. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testInstantSchemaOperationsInZKForMasterFailover() throws IOException, + KeeperException, InterruptedException { + LOG.info("testInstantSchemaOperationsInZKForMasterFailover() "); + String tableName = "testInstantSchemaOperationsInZKForMasterFailover"; + + conf = TEST_UTIL.getConfiguration(); + MasterSchemaChangeTracker activesct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + activesct.createSchemaChangeNode(tableName, 10); + LOG.debug(activesct.getSchemaChangeNodePathForTable(tableName) + + " created"); + assertTrue(activesct.doesSchemaChangeNodeExists(tableName)); + // Kill primary master now. + miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception")); + // wait for 50 secs. This is so that our schema janitor from fail-over master will kick-in and + // cleanup this failed/expired schema change request. + Thread.sleep(50000); + MasterSchemaChangeTracker newmsct = miniHBaseCluster.getMaster().getSchemaChangeTracker(); + assertFalse(newmsct.doesSchemaChangeNodeExists(tableName)); + LOG.debug(newmsct.getSchemaChangeNodePathForTable(tableName) + + " deleted"); + LOG.info("END testInstantSchemaOperationsInZKForMasterFailover() "); + } + + private HTable createTableAndValidate(String tableName) throws IOException { + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start createTableAndValidate()"); + HTableDescriptor[] tables = admin.listTables(); + int numTables = 0; + if (tables != null) { + numTables = tables.length; + } + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("created table = " + tableName); + return ht; + } + +} + + diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 2f20596..8e06e54 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -51,12 +51,15 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; +import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; import org.mockito.Mockito; @@ -147,11 +150,6 @@ public class TestCatalogJanitor { } @Override - public void checkTableModifiable(byte[] tableName) throws IOException { - //no-op - } - - @Override public void createTable(HTableDescriptor desc, byte[][] splitKeys) throws IOException { // no-op @@ -167,6 +165,11 @@ public class TestCatalogJanitor { return null; } + public void checkTableModifiable(byte[] tableName, + EventHandler.EventType eventType) + throws IOException { + } + @Override public MasterFileSystem getMasterFileSystem() { return this.mfs; @@ -251,6 +254,14 @@ public class TestCatalogJanitor { } }; } + + public MasterSchemaChangeTracker getSchemaChangeTracker() { + return null; + } + + public RegionServerTracker getRegionServerTracker() { + return null; + } } @Test diff --git a/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java b/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index 64016f9..0b45ac1 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ b/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; @@ -55,6 +56,13 @@ public class MockRegionServerServices implements RegionServerServices { return this.regions.get(encodedRegionName); } + public List getOnlineRegions(byte[] tableName) throws IOException { + return null; + } + + public void refreshRegion(HRegion hRegion) throws IOException { + } + @Override public void addToOnlineRegions(HRegion r) { this.regions.put(r.getRegionInfo().getEncodedName(), r);