Index: src/main/ruby/hbase/admin.rb =================================================================== --- src/main/ruby/hbase/admin.rb (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/main/ruby/hbase/admin.rb (revision ) @@ -273,9 +273,6 @@ # Table should exist raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name) - # Table should be disabled - raise(ArgumentError, "Table #{table_name} is enabled. Disable it first before altering.") if enabled?(table_name) - # There should be at least one argument raise(ArgumentError, "There should be at least one argument but the table name") if args.empty? Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision ) @@ -85,6 +85,8 @@ public String clusterIdZNode; // znode used for log splitting work assignment public String splitLogZNode; + // znode used to record table schema changes + public String schemaZNode; private final Configuration conf; @@ -140,6 +142,7 @@ ZKUtil.createAndFailSilent(this, rsZNode); ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); + ZKUtil.createAndFailSilent(this, schemaZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -187,6 +190,9 @@ conf.get("zookeeper.znode.clusterId", "hbaseid")); splitLogZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); + schemaZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.schema", "schema")); + } /** Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (revision ) @@ -51,7 +51,7 @@ super(server, eventType); this.masterServices = masterServices; this.tableName = tableName; - this.masterServices.checkTableModifiable(tableName); + this.masterServices.checkTableModifiable(tableName, eventType); this.tableNameStr = Bytes.toString(this.tableName); } @@ -64,6 +64,7 @@ MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); handleTableOperation(hris); + handleInstantSchemaChanges(); } catch (IOException e) { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); } catch (KeeperException e) { @@ -71,6 +72,17 @@ } } + protected void handleInstantSchemaChanges() { + if (eventType.isSchemaChangeEvent()) { + try { + this.masterServices.getSchemaChangeTracker() + .createSchemaChangeNode(Bytes.toString(tableName)); + } catch (KeeperException e) { + LOG.warn("Instant schema change failed for table " + tableName, e); + } + } + } + protected abstract void handleTableOperation(List regions) throws IOException, KeeperException; } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/master/MasterServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (revision ) @@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; /** * Services Master supplies @@ -55,12 +57,19 @@ * Check table is modifiable; i.e. exists and is offline. * @param tableName Name of table to check. * @throws TableNotDisabledException - * @throws TableNotFoundException + * @throws TableNotFoundException */ - public void checkTableModifiable(final byte [] tableName) throws IOException; + public void checkTableModifiable(final byte [] tableName, + EventHandler.EventType eventType) throws IOException; /** * @return Return table descriptors implementation. */ public TableDescriptors getTableDescriptors(); + + /** + * Get Master Schema change tracker + * @return + */ + public MasterSchemaChangeTracker getSchemaChangeTracker(); } Index: src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (revision ) @@ -46,12 +46,14 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; import org.mockito.Mockito; @@ -128,11 +130,6 @@ } @Override - public void checkTableModifiable(byte[] tableName) throws IOException { - //no-op - } - - @Override public AssignmentManager getAssignmentManager() { return this.asm; } @@ -143,6 +140,12 @@ } @Override + public void checkTableModifiable(byte[] tableName, + EventHandler.EventType eventType) + throws IOException { + } + + @Override public MasterFileSystem getMasterFileSystem() { return this.mfs; } @@ -223,7 +226,11 @@ } }; } + + public MasterSchemaChangeTracker getSchemaChangeTracker() { + return null; - } + } + } @Test public void testGetHRegionInfo() throws IOException { Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision ) @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.ipc.HBaseRPC; @@ -88,6 +89,7 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MapWritable; @@ -156,6 +158,9 @@ // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; + // Schema change tracker + private MasterSchemaChangeTracker schemaChangeTracker; + // This flag is for stopping this Master instance. Its set when we are // stopping or aborting private volatile boolean stopped = false; @@ -368,6 +373,11 @@ boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); + // initialize schema change tracker + this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(), + this); + this.schemaChangeTracker.start(); + LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + @@ -595,6 +605,12 @@ return this.tableDescriptors; } + @Override + public MasterSchemaChangeTracker getSchemaChangeTracker() { + return this.schemaChangeTracker; + } + + /** @return InfoServer object. Maybe null.*/ public InfoServer getInfoServer() { return this.infoServer; @@ -1160,8 +1176,20 @@ } @Override - public void checkTableModifiable(final byte [] tableName) + public void checkTableModifiable(final byte [] tableName, + EventHandler.EventType eventType) throws IOException { + preCheckTableModifiable(tableName); + if (!eventType.isSchemaChangeEvent()) { + if (!getAssignmentManager().getZKTable(). + isDisabledTable(Bytes.toString(tableName))) { + throw new TableNotDisabledException(tableName); + } + } + } + + private void preCheckTableModifiable(final byte[] tableName) + throws IOException { String tableNameStr = Bytes.toString(tableName); if (isCatalogTable(tableName)) { throw new IOException("Can't modify catalog tables"); @@ -1169,11 +1197,7 @@ if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) { throw new TableNotFoundException(tableNameStr); } - if (!getAssignmentManager().getZKTable(). - isDisabledTable(Bytes.toString(tableName))) { - throw new TableNotDisabledException(tableName); - } + } - } public void clearFromTransition(HRegionInfo hri) { if (this.assignmentManager.isRegionInTransition(hri) != null) { Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision ) @@ -135,7 +135,10 @@ public HRegion getFromOnlineRegions(String encodedRegionName) { return this.regions.get(encodedRegionName); } - + + public void refreshSchema(byte[] tableName) { + } + @Override public void addToOnlineRegions(HRegion r) { this.regions.put(r.getRegionInfo().getEncodedName(), r); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision ) @@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -267,6 +268,9 @@ // Cluster Status Tracker private ClusterStatusTracker clusterStatusTracker; + // Schema change Tracker + private SchemaChangeTracker schemaChangeTracker; + // Log Splitting Worker private SplitLogWorker splitLogWorker; @@ -545,6 +549,12 @@ this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); + + // Schema change tracker + this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper, + this, this); + this.schemaChangeTracker.start(); + } /** @@ -3033,6 +3043,84 @@ } /** + * Refresh schema changes for given regions. + * @param onlineRegionsOfTable + * @throws IOException + */ + private void reopenRegions(List onlineRegionsOfTable) throws IOException { + + if (onlineRegionsOfTable != null && onlineRegionsOfTable.size() > 0) { + for (HRegion hRegion : onlineRegionsOfTable) { + HRegionInfo regionInfo = hRegion.getRegionInfo(); + // Close the region + hRegion.close(); + // Remove from online regions + removeFromOnlineRegions(regionInfo.getEncodedName()); + // Get new HTD + HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName()); + LOG.info("HTD for region = " + regionInfo.getRegionNameAsString() + + " Is = " + htd ); + HRegion region = + HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf); + // Add new region to the onlineRegions + addToOnlineRegions(region); + } + } + } + + /** + * Gets the online regions of the specified table. + * This method looks at the in-memory onlineRegions. It does not go to .META.. + * Only returns online regions. If a region on this table has been + * closed during a disable, etc., it will not be included in the returned list. + * So, the returned list may not necessarily be ALL regions in this table, its + * all the ONLINE regions in the table. + * @param tableName + * @return Online regions from tableName + */ + private List getOnlineRegionsForTable(byte[] tableName) { + List tableRegions = new ArrayList(); + synchronized (this.onlineRegions) { + for (HRegion region: this.onlineRegions.values()) { + HRegionInfo regionInfo = region.getRegionInfo(); + if(Bytes.equals(regionInfo.getTableName(), tableName)) { + tableRegions.add(region); + } + } + } + return tableRegions; + + } + + /** + * Refresh schema changes to all online regions of given table. + * @param tableName + * @return true on successful schema refresh. + */ + public void refreshSchema(byte[] tableName) throws IOException { + List onlineRegionsForTable = null; + try { + onlineRegionsForTable = getOnlineRegionsForTable(tableName); + if (onlineRegionsForTable != null && onlineRegionsForTable.size() > 0) { + LOG.info("refreshSchema found " + onlineRegionsForTable + + " online regions for table = " + Bytes.toString(tableName) + + " Refreshing them now.."); + reopenRegions(onlineRegionsForTable); + } else { + LOG.info("refreshSchema found no onlineRegions for table = " + + Bytes.toString(tableName) + ". Skipping refreshSchema..."); + } + } catch (IOException ioe) { + LOG.info("refreshSchema failed with exception " + + " for table = " + Bytes.toString(tableName) + + " Number of regions online = " + + onlineRegionsForTable == null ? 0 : onlineRegionsForTable.size()); + throw ioe; + } + } + + + /** * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine */ public static void main(String[] args) throws Exception { Index: src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (revision ) @@ -21,6 +21,8 @@ import org.apache.hadoop.hbase.Server; +import java.io.IOException; + /** * Interface to Map of online regions. In the Map, the key is the region's * encoded name and the value is an {@link HRegion} instance. @@ -49,4 +51,11 @@ * null if named region is not member of the online regions. */ public HRegion getFromOnlineRegions(String encodedRegionName); + + /** + * Refresh schema for online regions of a table. + * @param tableName + * @return + */ + public void refreshSchema(byte[] tableName) throws IOException; } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision 37c811a249792b1b450245f595251e531df54b2d) +++ src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision ) @@ -139,8 +139,16 @@ * Constructor */ EventType(int value) {} + + public boolean isSchemaChangeEvent() { + return ( + this.equals(EventType.C_M_ADD_FAMILY) || + this.equals(EventType.C_M_MODIFY_FAMILY) || + this.equals(EventType.C_M_MODIFY_TABLE)); - } + } + } + /** * Default base class constructor. */ Index: src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision ) +++ src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision ) @@ -0,0 +1,238 @@ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestInstantSchemaChange { + + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private HBaseAdmin admin; + private static MiniHBaseCluster miniHBaseCluster = null; + private Configuration conf; + private ZooKeeperWatcher zkw; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + miniHBaseCluster = TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } + + @Test + public void testInstantSchemaChangeForModifyTable() throws IOException, KeeperException { + + conf = TEST_UTIL.getConfiguration(); + LOG.info("testInstantSchemaChangeForModifyTable()"); + + HTableDescriptor[] tables = admin.listTables(); + int numTables = tables.length; + HTable ht = TEST_UTIL.createTable(Bytes.toBytes("testSchemachange"), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachange created"); + + final byte [] row = Bytes.toBytes("row"); + final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + Put put = new Put(row); + put.add(HConstants.CATALOG_FAMILY, qualifier, value); + ht.put(put); + Get get = new Get(row); + get.addColumn(HConstants.CATALOG_FAMILY, qualifier); + Result r1 = ht.get(get); + byte[] tvalue1 = r1.getValue(HConstants.CATALOG_FAMILY, qualifier); + int result1 = Bytes.compareTo(value, tvalue1); + assertEquals(result1, 0); + + String newFamily = "newFamily"; + HTableDescriptor htd = new HTableDescriptor("testSchemachange"); + htd.addFamily(new HColumnDescriptor(newFamily)); + + admin.modifyTable(Bytes.toBytes("testSchemachange"), htd); + // Take a mini nap for changes to take effect. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + } + + @Test + public void testInstantSchemaChangeForAddColumn() throws IOException, KeeperException { + LOG.info("testInstantSchemaChangeForAddColumn() "); + + HTableDescriptor[] tables = admin.listTables(); + int numTables = tables.length; + HTable ht = TEST_UTIL.createTable(Bytes.toBytes("testSchemachangeForAddColumn"), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachangeForAddColumn created"); + + final byte [] row = Bytes.toBytes("row"); + final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + Put put = new Put(row); + put.add(HConstants.CATALOG_FAMILY, qualifier, value); + ht.put(put); + Get get = new Get(row); + get.addColumn(HConstants.CATALOG_FAMILY, qualifier); + Result r1 = ht.get(get); + byte[] tvalue1 = r1.getValue(HConstants.CATALOG_FAMILY, qualifier); + int result1 = Bytes.compareTo(value, tvalue1); + assertEquals(result1, 0); + + String newFamily = "newFamily"; + HColumnDescriptor hcd = new HColumnDescriptor("newFamily"); + + admin.addColumn(Bytes.toBytes("testSchemachangeForAddColumn"), hcd); + + // Take a mini nap for changes to take effect. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + LOG.info("******** Put into new column family "); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + LOG.info(" Value put = " + value + " value from table = " + tvalue); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + } + + @Test + public void testInstantSchemaChangeForModifyColumn() throws IOException, KeeperException { + LOG.info("testInstantSchemaChangeForModifyColumn() "); + + HTableDescriptor[] tables = admin.listTables(); + int numTables = tables.length; + HTable ht = TEST_UTIL.createTable(Bytes.toBytes("testSchemachangeForModifyColumn"), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachangeForModifyColumn created"); + + final byte [] row = Bytes.toBytes("row"); + final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + Put put = new Put(row); + put.add(HConstants.CATALOG_FAMILY, qualifier, value); + ht.put(put); + Get get = new Get(row); + get.addColumn(HConstants.CATALOG_FAMILY, qualifier); + Result r1 = ht.get(get); + byte[] tvalue1 = r1.getValue(HConstants.CATALOG_FAMILY, qualifier); + int result1 = Bytes.compareTo(value, tvalue1); + assertEquals(result1, 0); + + HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); + hcd.setMaxVersions(99); + hcd.setBlockCacheEnabled(false); + + admin.modifyColumn(Bytes.toBytes("testSchemachangeForModifyColumn"), hcd); + + // Take a mini nap for changes to take effect. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + List onlineRegions + = miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn")); + for (HRegion onlineRegion : onlineRegions) { + HTableDescriptor htd = onlineRegion.getTableDesc(); + HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY); + assertTrue(tableHcd.isBlockCacheEnabled() == false); + assertEquals(tableHcd.getMaxVersions(), 99); + } + } + + @Test + public void testInstantSchemaOperationsInZK() throws IOException, KeeperException { + LOG.info("testInstantSchemaOperationsInZK() "); + + conf = TEST_UTIL.getConfiguration(); + zkw = new ZooKeeperWatcher(conf, "instant-schema-change-tests", null); + ZKUtil.createAndFailSilent(zkw, zkw.schemaZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.schemaZNode) != -1); + LOG.debug(zkw.schemaZNode + " created"); + + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + msct.createSchemaChangeNode("testSchemachangeNode"); + LOG.debug(msct.getSchemaChangeNodePathForTable("testSchemachangeNode") + + " created"); + + String nodePath = msct.getSchemaChangeNodePathForTable("testSchemachangeNode"); + assertTrue(ZKUtil.checkExists(zkw, nodePath) != -1); + + // Take a mini nap for changes to take effect. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1); + LOG.debug(msct.getSchemaChangeNodePathForTable("testSchemachangeNode") + + " deleted"); + + } + +} + Index: src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision ) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision ) @@ -0,0 +1,101 @@ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.List; + +public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker { + public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class); + + /** + * Constructs a new ZK node tracker. + *

+ *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param node + * @param abortable + */ + public MasterSchemaChangeTracker(ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, watcher.schemaZNode, abortable); + } + + + @Override + public void start() { + try { + ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode); + watcher.registerListener(this); + } catch (KeeperException e) { + LOG.error("MasterSchemaChangeTracker startup failed.", e); + } + } + + /** + * Create a new schema change ZK node. + * @param tableName + * @throws KeeperException + */ + public void createSchemaChangeNode(String tableName) throws KeeperException { + LOG.info("Creating schema change node for table = " + + tableName + " Path = " + + getSchemaChangeNodePathForTable(tableName)); + ZKUtil.createAndFailSilent(this.watcher, + getSchemaChangeNodePathForTable(tableName)); + int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode); + ZKUtil.setData(this.watcher, getSchemaChangeNodePathForTable(tableName), + Bytes.toBytes(rsCount)); + ZKUtil.listChildrenAndWatchThem(this.watcher, + getSchemaChangeNodePathForTable(tableName)); + } + + @Override + public void nodeChildrenChanged(String path) { + if (path.startsWith(watcher.schemaZNode) + && !path.equals(watcher.schemaZNode)) { + try { + List servers = + ZKUtil.listChildrenAndWatchThem(watcher, path); + String tableName = path.substring(path.lastIndexOf("/")+1, path.length()); + LOG.debug("Master.SchemaChangeTracker.nodeChildrenChanged. " + + " Current table == " + tableName + + " List of servers which processed schema change = " + servers); + byte[] rsCountBytes = ZKUtil.getData(watcher, + getSchemaChangeNodePathForTable(tableName)); + int rsCount = 0; + if (rsCountBytes != null) { + rsCount = Bytes.toInt(rsCountBytes); + } + //int rsCount = Bytes.toInt(rsCountBytes); + if (rsCount != 0 && (servers != null && servers.size() >= rsCount)) { + LOG.debug("All region servers have successfully processed the " + + "schema changes for table = " + tableName + + " . Deleting the schema change node = " + + path); + ZKUtil.deleteNodeRecursively(this.watcher, path); + } else { + LOG.debug("Not all region servers have processed the schema changes" + + "for table = " + tableName + " rs count = " + + rsCount + " processed count = " + servers); + } + } catch (KeeperException e) { + LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" + + " schema change nodes", e); + } + } else { + LOG.debug("Not processing Master.nodeChildrenChanged for path = " + + path); + } + } + + public String getSchemaChangeNodePathForTable(String tableName) { + return ZKUtil.joinZNode(watcher.schemaZNode, tableName); + } +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision ) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision ) @@ -0,0 +1,137 @@ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.logging.Logger; + +public class SchemaChangeTracker extends ZooKeeperNodeTracker { + public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class); + private RegionServerServices regionServer = null; + + + /** + * Constructs a new ZK node tracker. + *

+ *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param node + * @param abortable + */ + public SchemaChangeTracker(ZooKeeperWatcher watcher, + Abortable abortable, + RegionServerServices regionServer) { + super(watcher, watcher.schemaZNode, abortable); + this.regionServer = regionServer; + } + + @Override + public void start() { + try { + ZKUtil.listChildrenAndWatchThem(watcher, node); + watcher.registerListener(this); + } catch (KeeperException e) { + LOG.error("RegionServer SchemaChangeTracker startup failed with " + + "KeeperException.", e); + } + } + + // whenever new schema change occur this event will get triggered + @Override + public void nodeChildrenChanged(String path) { + if (path.equals(watcher.schemaZNode)) { + try { + List tables = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode); + LOG.debug("RS.SchemaChangeTracker: " + + "Current list of tables with schema change = " + tables); + if (tables != null) { + handleSchemaChange(tables); + } else { + LOG.error("No tables found for schema change event." + + " Skipping instant schema refresh"); + } + } catch (KeeperException ke) { + LOG.error("KeeperException handling schema change event.", ke); + + } + } + } + + private void handleSchemaChange(String tableName) throws IOException { + if (tableName != null) { + regionServer.refreshSchema(Bytes.toBytes(tableName)); + updateZKNode(tableName); + LOG.info("Refresh schema completed for table name = " + tableName + + " server = " + regionServer.getServerName().getServerName()); + } + } + + private boolean hasHandledSchemaChange(List servers) { + return (servers != null && + servers.contains(regionServer.getServerName().getServerName())); + } + + private void handleSchemaChange(List tables) { + for (String tableName : tables) { + try { + List servers = ZKUtil.listChildrenNoWatch(watcher, + getSchemaChangeNodePathForTable(tableName)); + if (!hasHandledSchemaChange(servers)) { + handleSchemaChange(tableName); + } else { + LOG.info("Schema change for table " + tableName + + " already addressed by server = " + + regionServer.getServerName().getServerName() + + " skipping refresh schema"); + } + } catch (KeeperException ke) { + LOG.error("KeeperException handling schema change event for " + + " table = " + tableName, ke); + } catch (IOException ioe) { + LOG.error("IOException handling schema change event for table = " + + tableName, ioe); + } + } + } + + private void updateZKNode(String tableName) { + try { + ZKUtil.createAndFailSilent(this.watcher, + getSchemaChangeNodePathForTableAndServer(tableName, + regionServer.getServerName().getServerName())); + LOG.info("updateZKNode() Created child ZKNode with server name = " + + regionServer.getServerName().getServerName() + " for table = " + + tableName); + } catch (KeeperException.NoNodeException e) { + LOG.error("KeeperException.NoNodeException while updating the schema " + + "change node with server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName(), e); + } catch (KeeperException e) { + LOG.error("KeeperException while updating the schema change node with " + + "server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName(), e); + } + } + + private String getSchemaChangeNodePathForTable(String tableName) { + return ZKUtil.joinZNode(watcher.schemaZNode, tableName); + } + + private String getSchemaChangeNodePathForTableAndServer( + String tableName, String regionServerName) { + return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName), + regionServerName); + } + +}