Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1324864) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision ) @@ -354,6 +354,8 @@ */ private ObjectName mxBean = null; + private long regionCloseOpenThrottleTime; + /** * Starts a HRegionServer at the default location * @@ -429,6 +431,7 @@ "hbase.regionserver.kerberos.principal", this.isa.getHostName()); regionServerAccounting = new RegionServerAccounting(); cacheConfig = new CacheConfig(conf); + this.regionCloseOpenThrottleTime = conf.getLong("hbase.instant.schema.throttle.time", 500); } /** @@ -3670,7 +3673,15 @@ HLog wal = this.getWAL(); return wal.rollWriter(true); } - + + private void throttleRefreshRegion() { + try { + Thread.sleep(this.regionCloseOpenThrottleTime); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting for a region refresh", e); + } + } + /** * Refresh schema changes for given region. * @param hRegion HRegion to refresh @@ -3689,6 +3700,7 @@ HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName()); LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString() + " Is = " + htd ); + throttleRefreshRegion(); HRegion region = HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf, this, null); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision 1324864) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision ) @@ -29,8 +29,6 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Writable; @@ -95,18 +93,15 @@ } String msg = "Master seeing following tables undergoing schema change " + "process. Tables = " + tables; - MonitoredTask status = TaskMonitor.get().createStatus(msg); LOG.debug(msg); for (String table : tables) { LOG.debug("Processing table = "+ table); - status.setStatus("Processing table = "+ table); try { processTableNode(table); } catch (IOException e) { String errmsg = "IOException while processing completed schema changes." + " Cause = " + e.getCause(); LOG.error(errmsg, e); - status.setStatus(errmsg); } } } @@ -222,36 +217,25 @@ " request = " + alterStatus.getProcessedHosts() + " Total number of regions = " + alterStatus.getNumberOfRegionsToProcess() + " Processed regions = " + alterStatus.getNumberOfRegionsProcessed(); - MonitoredTask status = TaskMonitor.get().createStatus( - "Checking alter schema request status for table = " + tableName); - status.markComplete(msg); LOG.debug(msg); cleanProcessedTableNode(getSchemaChangeNodePathForTable(tableName)); } else { if (alterStatus.getErrorCause() != null && alterStatus.getErrorCause().trim().length() > 0) { - String msg = "Alter schema change failed " + LOG.debug("Alter schema change failed " + "for table = " + tableName + " Number of online regions = " + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = " + alterStatus.getNumberOfRegionsProcessed() + " Original list = " + alterStatus.hostsToProcess + " Processed servers = " - + servers - + " Error Cause = " + alterStatus.getErrorCause(); - MonitoredTask status = TaskMonitor.get().createStatus( - "Checking alter schema request status for table = " + tableName); - // we have errors. - LOG.debug(msg); - status.abort(msg); + + servers + " Error Cause = " + alterStatus.getErrorCause()); } else { - String msg = "Not all region servers have processed the schema changes" + LOG.debug("Not all region servers have processed the schema changes" + "for table = " + tableName + " Number of online regions = " + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = " + alterStatus.getNumberOfRegionsProcessed() + " Original list = " + alterStatus.hostsToProcess + " Processed servers = " + servers + " Alter STate = " - + alterStatus.getCurrentAlterStatus(); - LOG.debug(msg); - // status.setStatus(msg); + + alterStatus.getCurrentAlterStatus()); } } } @@ -288,7 +272,6 @@ String statmsg = "Cleaning failed or expired schema change requests. " + "current tables undergoing " + "schema change process = " + tables; - MonitoredTask status = TaskMonitor.get().createStatus(statmsg); LOG.debug(statmsg); if (hasSchemaChangeExpiredFor(table)) { // time out.. currently, we abandon the in-flight schema change due to @@ -297,30 +280,26 @@ // attempt a retry of the schema change and see if it succeeds, and // another could be to simply rollback the schema change effort and // see if it succeeds. - String msg = "Schema change for table = " + table + " has expired." + LOG.debug("Schema change for table = " + table + " has expired." + " Schema change for this table has been in progress for " + + schemaChangeTimeoutMillis + - "Deleting the node now."; - LOG.debug(msg); + "Deleting the node now."); ZKUtil.deleteNodeRecursively(this.watcher, getSchemaChangeNodePathForTable(table)); } else { String msg = "Schema change request is in progress for " + " table = " + table; LOG.debug(msg); - status.setStatus(msg); } } } catch (IOException e) { String msg = "IOException during handleFailedExpiredSchemaChanges." + e.getCause(); LOG.error(msg, e); - TaskMonitor.get().createStatus(msg); } catch (KeeperException ke) { String msg = "KeeperException during handleFailedExpiredSchemaChanges." + ke.getCause(); LOG.error(msg, ke); - TaskMonitor.get().createStatus(msg); } } @@ -356,19 +335,16 @@ */ public void excludeRegionServerForSchemaChanges(String serverName) { try { - MonitoredTask status = TaskMonitor.get().createStatus( - "Processing schema change exclusion for region server = " + serverName); List tables = ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode); if (tables == null || tables.isEmpty()) { String msg = "No schema change in progress. Skipping exclusion for " + "server = "+ serverName; LOG.debug(msg); - status.setStatus(msg); return ; } for (String tableName : tables) { - excludeRegionServer(tableName, serverName, status); + excludeRegionServer(tableName, serverName); } } catch(KeeperException ke) { LOG.error("KeeperException during excludeRegionServerForSchemaChanges", ke); @@ -403,14 +379,12 @@ * @throws KeeperException * @throws IOException */ - private void excludeRegionServer(String tableName, String serverName, - MonitoredTask status) + private void excludeRegionServer(String tableName, String serverName) throws KeeperException, IOException { if (isSchemaChangeApplicableFor(tableName, serverName)) { String msg = "Excluding RS " + serverName + " from schema change process" + " for table = " + tableName; LOG.debug(msg); - status.setStatus(msg); SchemaChangeTracker.SchemaAlterStatus sas = getRSSchemaAlterStatus(tableName, serverName); if (sas == null) { @@ -447,8 +421,6 @@ public void createSchemaChangeNode(String tableName, int numberOfRegions) throws KeeperException, IOException { - MonitoredTask status = TaskMonitor.get().createStatus( - "Creating schema change node for table = " + tableName); LOG.debug("Creating schema change node for table = " + tableName + " Path = " + getSchemaChangeNodePathForTable(tableName)); @@ -479,7 +451,6 @@ String msg = "Master is not seeing any online region servers. Aborting the " + "schema change processing by region servers."; LOG.debug(msg); - status.abort(msg); } else { LOG.debug("Master is seeing " + rsCount + " region servers online before " + "the schema change process."); @@ -488,8 +459,6 @@ LOG.debug("Master creating the master alter status = " + mas); ZKUtil.createSetData(this.watcher, getSchemaChangeNodePathForTable(tableName), Writables.getBytes(mas)); - status.markComplete("Created the ZK node for schema change. Current Alter Status = " - + mas.toString()); ZKUtil.listChildrenAndWatchThem(this.watcher, getSchemaChangeNodePathForTable(tableName)); } @@ -552,19 +521,11 @@ tableName = path.substring(path.lastIndexOf("/")+1, path.length()); processTableNode(tableName); } catch (KeeperException e) { - TaskMonitor.get().createStatus( - "MasterSchemaChangeTracker: ZK exception while processing " + - " nodeChildrenChanged() event for table = " + tableName - + " Cause = " + e.getCause()); - LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" - + " schema change nodes", e); + LOG.error("MasterSchemaChangeTracker: ZK exception while processing " + + " nodeChildrenChanged() event for table = " + tableName, e); } catch(IOException ioe) { - TaskMonitor.get().createStatus( - "MasterSchemaChangeTracker: ZK exception while processing " + - " nodeChildrenChanged() event for table = " + tableName - + " Cause = " + ioe.getCause()); - LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting" - + " schema change nodes", ioe); + LOG.error("MasterSchemaChangeTracker: ZK exception while processing " + + " nodeChildrenChanged() event for table = " + tableName, ioe); } } } @@ -586,20 +547,11 @@ tableName = paths[3]; processTableNode(tableName); } catch (KeeperException e) { - TaskMonitor.get().createStatus( - "MasterSchemaChangeTracker: ZK exception while processing " + - " nodeDataChanged() event for table = " + tableName - + " Cause = " + e.getCause()); LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" + " schema change nodes", e); } catch(IOException ioe) { - TaskMonitor.get().createStatus( - "MasterSchemaChangeTracker: IO exception while processing " + - " nodeDataChanged() event for table = " + tableName - + " Cause = " + ioe.getCause()); LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting" + " schema change nodes", ioe); - } } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision 1324864) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision ) @@ -22,18 +22,17 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.zookeeper.KeeperException; -import org.apache.hadoop.hbase.util.Writables; - import org.apache.hadoop.io.Writable; -import java.io.*; +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; + import java.util.List; /** @@ -101,7 +100,6 @@ String errmsg = "KeeperException while handling nodeChildrenChanged for path = " + path + " Cause = " + ke.getCause(); LOG.error(errmsg, ke); - TaskMonitor.get().createStatus(errmsg); } } } @@ -117,15 +115,14 @@ private void handleSchemaChange(String tableName) { int refreshedRegionsCount = 0, onlineRegionsCount = 0; - MonitoredTask status = null; try { List onlineRegions = regionServer.getOnlineRegions(Bytes.toBytes(tableName)); if (onlineRegions != null && !onlineRegions.isEmpty()) { - status = TaskMonitor.get().createStatus("Region server " + LOG.debug("Region server " - + regionServer.getServerName().getServerName() - + " handling schema change for table = " + tableName - + " number of online regions = " + onlineRegions.size()); + + regionServer.getServerName().getServerName() + + " handling schema change for table = " + tableName + + " number of online regions = " + onlineRegions.size()); onlineRegionsCount = onlineRegions.size(); createStateNode(tableName, onlineRegions.size()); for (HRegion hRegion : onlineRegions) { @@ -140,7 +137,6 @@ + " online Regions = " + onlineRegions.size() + " refreshed Regions = " + refreshedRegionsCount; LOG.debug(msg); - status.setStatus(msg); } else { LOG.debug("Server " + regionServer.getServerName().getServerName() + " has no online regions for table = " + tableName @@ -148,10 +144,10 @@ } } catch (IOException ioe) { reportAndLogSchemaRefreshError(tableName, onlineRegionsCount, - refreshedRegionsCount, ioe, status); + refreshedRegionsCount, ioe); } catch (KeeperException ke) { reportAndLogSchemaRefreshError(tableName, onlineRegionsCount, - refreshedRegionsCount, ke, status); + refreshedRegionsCount, ke); } } @@ -162,8 +158,7 @@ private void reportAndLogSchemaRefreshError(String tableName, int onlineRegionsCount, int refreshedRegionsCount, - Throwable exception, - MonitoredTask status) { + Throwable exception) { try { String errmsg = " Region Server " + regionServer.getServerName().getServerName() @@ -178,14 +173,8 @@ regionServer.getServerName().getServerName()); ZKUtil.updateExistingNodeData(this.watcher, nodePath, Writables.getBytes(alterStatus), getZKNodeVersion(nodePath)); - LOG.info("reportAndLogSchemaRefreshError() " + - " Updated child ZKNode with SchemaAlterStatus = " + LOG.info(" Updated child ZKNode with SchemaAlterStatus = " + alterStatus + " for table = " + tableName); - if (status == null) { - status = TaskMonitor.get().createStatus(errmsg); - } else { - status.setStatus(errmsg); - } } catch (KeeperException e) { // Retry ? String errmsg = "KeeperException while updating the schema change node with " @@ -194,7 +183,6 @@ + regionServer.getServerName().getServerName() + " Cause = " + e.getCause(); LOG.error(errmsg, e); - TaskMonitor.get().createStatus(errmsg); } catch(IOException ioe) { // retry ?? String errmsg = "IOException while updating the schema change node with " @@ -202,7 +190,6 @@ + tableName + " server = " + regionServer.getServerName().getServerName() + " Cause = " + ioe.getCause(); - TaskMonitor.get().createStatus(errmsg); LOG.error(errmsg, ioe); } } @@ -226,7 +213,6 @@ + regionServer.getServerName().getServerName() + " Message = " + ke.getCause(); LOG.error(errmsg, ke); - TaskMonitor.get().createStatus(errmsg); } } @@ -264,14 +250,12 @@ String msg = "Schema change tracker completed for table = " + tableName + " status = " + schemaAlterStatus; LOG.debug(msg); - TaskMonitor.get().createStatus(msg); } catch (KeeperException.NoNodeException e) { String errmsg = "KeeperException.NoNodeException while updating the schema " + "change node with server name for table = " + tableName + " server = " + regionServer.getServerName().getServerName() + " Cause = " + e.getCause(); - TaskMonitor.get().createStatus(errmsg); LOG.error(errmsg, e); } catch (KeeperException e) { // Retry ? @@ -281,7 +265,6 @@ + regionServer.getServerName().getServerName() + " Cause = " + e.getCause(); LOG.error(errmsg, e); - TaskMonitor.get().createStatus(errmsg); } catch(IOException ioe) { String errmsg = "IOException while updating the schema change node with " + "server name for table = " @@ -289,7 +272,6 @@ + regionServer.getServerName().getServerName() + " Cause = " + ioe.getCause(); LOG.error(errmsg, ioe); - TaskMonitor.get().createStatus(errmsg); } } Index: src/main/resources/hbase-default.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/main/resources/hbase-default.xml (revision 1324864) +++ src/main/resources/hbase-default.xml (revision ) @@ -809,10 +809,16 @@ hbase.instant.schema.alter.timeout - 60000 + 120000 Timeout in millis after which any pending schema alter request will be considered as failed. + + + + hbase.instant.schema.throttle.time + 100 + Throttle time in millis while closing/re opening impacted regions - + hbase.online.schema.update.enable