Index: src/test/java/org/apache/hadoop/hbase/regionserver/handler/MockRegionServerServices.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/handler/MockRegionServerServices.java (revision 1171786) +++ src/test/java/org/apache/hadoop/hbase/regionserver/handler/MockRegionServerServices.java (working copy) @@ -54,6 +54,10 @@ return this.regions.get(encodedRegionName); } + public void refreshSchema(byte[] tableName) throws IOException { + // do nothing + } + @Override public void addToOnlineRegions(HRegion r) { this.regions.put(r.getRegionInfo().getEncodedName(), r); Index: src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (revision 1171786) +++ src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (working copy) @@ -48,12 +48,14 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; import org.mockito.Mockito; @@ -128,11 +130,6 @@ } @Override - public void checkTableModifiable(byte[] tableName) throws IOException { - //no-op - } - - @Override public AssignmentManager getAssignmentManager() { return this.asm; } @@ -143,6 +140,12 @@ } @Override + public void checkTableModifiable(byte[] tableName, + EventHandler.EventType eventType) + throws IOException { + } + + @Override public MasterFileSystem getMasterFileSystem() { return this.mfs; } @@ -221,6 +224,10 @@ } }; } + + public MasterSchemaChangeTracker getSchemaChangeTracker() { + return null; + } } @Test Index: src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision 0) @@ -0,0 +1,467 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestInstantSchemaChange { + + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private HBaseAdmin admin; + private static MiniHBaseCluster miniHBaseCluster = null; + private Configuration conf; + private ZooKeeperWatcher zkw; + + private final byte [] row = Bytes.toBytes("row"); + private final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true); + miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } + + + @Test + public void testInstantSchemaChangeForModifyTable() throws IOException, + KeeperException, InterruptedException { + + String tableName = "testSchemachange"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testInstantSchemaChangeForModifyTable()"); + HTable ht = createTableAndValidate(tableName); + + String newFamily = "newFamily"; + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(newFamily)); + + admin.modifyTable(Bytes.toBytes(tableName), htd); + waitForSchemaChangeProcess(tableName); + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + LOG.info("END testInstantSchemaChangeForModifyTable()"); + + } + + private void waitForSchemaChangeProcess(String tableName) + throws KeeperException, InterruptedException { + LOG.info("Waiting for ZK node creation for table = " + tableName); + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + while(!msct.doesSchemaChangeNodeExists(tableName)) { + Thread.sleep(20); + } + LOG.info("Waiting for ZK node deletion for table = " + tableName); + while(msct.doesSchemaChangeNodeExists(tableName)) { + Thread.sleep(20); + } + } + + @Test + public void testInstantSchemaChangeForAddColumn() throws IOException, KeeperException { + LOG.info("Start testInstantSchemaChangeForAddColumn() "); + String tableName = "testSchemachangeForAddColumn"; + HTable ht = createTableAndValidate(tableName); + String newFamily = "newFamily"; + HColumnDescriptor hcd = new HColumnDescriptor("newFamily"); + + admin.addColumn(Bytes.toBytes(tableName), hcd); + //waitForSchemaChangeProcess(tableName); + + // Take a mini nap for changes to take effect. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + LOG.info("******** Put into new column family "); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + LOG.info(" Value put = " + value + " value from table = " + tvalue); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + LOG.info("End testInstantSchemaChangeForAddColumn() "); + + } + + @Test + public void testInstantSchemaChangeForModifyColumn() throws IOException, + KeeperException { + LOG.info("Start testInstantSchemaChangeForModifyColumn() "); + String tableName = "testSchemachangeForModifyColumn"; + HTable ht = createTableAndValidate(tableName); + + HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); + hcd.setMaxVersions(99); + hcd.setBlockCacheEnabled(false); + + admin.modifyColumn(Bytes.toBytes(tableName), hcd); + //waitForSchemaChangeProcess(tableName); + + // Sleep a while for changes to take effect. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + List onlineRegions + = miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn")); + for (HRegion onlineRegion : onlineRegions) { + HTableDescriptor htd = onlineRegion.getTableDesc(); + HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY); + assertTrue(tableHcd.isBlockCacheEnabled() == false); + assertEquals(tableHcd.getMaxVersions(), 99); + } + LOG.info("End testInstantSchemaChangeForModifyColumn() "); + + } + + @Test + public void testInstantSchemaChangeForDeleteColumn() throws IOException, + KeeperException { + LOG.info("Start testInstantSchemaChangeForDeleteColumn() "); + String tableName = "testSchemachangeForDeleteColumn"; + int numTables = 0; + HTableDescriptor[] tables = admin.listTables(); + if (tables != null) { + numTables = tables.length; + } + + byte[][] FAMILIES = new byte[][] { + Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; + + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + FAMILIES); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachangeForDeleteColumn created"); + + admin.deleteColumn(tableName, "C"); + + //waitForSchemaChangeProcess(tableName); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(Bytes.toBytes(tableName)); + HColumnDescriptor hcd = modifiedHtd.getFamily(Bytes.toBytes("C")); + assertTrue(hcd == null); + LOG.info("End testInstantSchemaChangeForDeleteColumn() "); + } + + @Test + public void testInstantSchemaChangeWhenTableIsNotEnabled() throws IOException, + KeeperException { + final String tableName = "testInstantSchemaChangeWhenTableIsDisabled"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testInstantSchemaChangeWhenTableIsDisabled()"); + HTable ht = createTableAndValidate(tableName); + // Disable table + admin.disableTable("testInstantSchemaChangeWhenTableIsDisabled"); + // perform schema changes + HColumnDescriptor hcd = new HColumnDescriptor("newFamily"); + admin.addColumn(Bytes.toBytes(tableName), hcd); + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + assertTrue(msct.doesSchemaChangeNodeExists(tableName) == false); + } + + /** + * Test that when concurrent alter requests are received for a table we don't miss any. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testConcurrentInstantSchemaChangeForAddColumn() throws IOException, + KeeperException, InterruptedException { + final String tableName = "testConcurrentInstantSchemaChangeForModifyTable"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testConcurrentInstantSchemaChangeForModifyTable()"); + HTable ht = createTableAndValidate(tableName); + + Runnable run1 = new Runnable() { + public void run() { + HColumnDescriptor hcd = new HColumnDescriptor("family1"); + try { + admin.addColumn(Bytes.toBytes(tableName), hcd); + } catch (IOException ioe) { + ioe.printStackTrace(); + + } + } + }; + Runnable run2 = new Runnable() { + public void run() { + HColumnDescriptor hcd = new HColumnDescriptor("family2"); + try { + admin.addColumn(Bytes.toBytes(tableName), hcd); + } catch (IOException ioe) { + ioe.printStackTrace(); + + } + } + }; + + run1.run(); + run2.run(); + + waitForSchemaChangeProcess(tableName); + + Put put1 = new Put(row); + put1.add(Bytes.toBytes("family1"), qualifier, value); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes("family1"), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes("family1"), qualifier); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + + Put put2 = new Put(row); + put2.add(Bytes.toBytes("family2"), qualifier, value); + ht.put(put2); + + Get get2 = new Get(row); + get2.addColumn(Bytes.toBytes("family2"), qualifier); + Result r2 = ht.get(get2); + byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); + int result2 = Bytes.compareTo(value, tvalue2); + assertEquals(result2, 0); + + LOG.info("END testConcurrentInstantSchemaChangeForModifyTable()"); + } + + @Test + public void testInstantSchemaOperationsInZK() throws IOException, KeeperException, InterruptedException { + LOG.info("testInstantSchemaOperationsInZK() "); + + conf = TEST_UTIL.getConfiguration(); + zkw = new ZooKeeperWatcher(conf, "instant-schema-change-tests", null); + ZKUtil.createAndFailSilent(zkw, zkw.schemaZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.schemaZNode) != -1); + LOG.debug(zkw.schemaZNode + " created"); + + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + msct.createSchemaChangeNode("testSchemachangeNode"); + LOG.debug(msct.getSchemaChangeNodePathForTable("testSchemachangeNode") + + " created"); + + String nodePath = msct.getSchemaChangeNodePathForTable("testSchemachangeNode"); + assertTrue(ZKUtil.checkExists(zkw, nodePath) != -1); + waitForSchemaChangeProcess("testSchemachangeNode"); + + assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1); + LOG.debug(msct.getSchemaChangeNodePathForTable("testSchemachangeNode") + + " deleted"); + + } + + + /** + * Kill on the RS and see that the schema change can succeed. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testInstantSchemaChangeWhileRSCrash() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeWhileRSCrash()"); + + final byte [] row = Bytes.toBytes("row"); + final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + final String tableName = "TestRSCrashDuringSchemaChange"; + HTable ht = createTableAndValidate(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("family2"); + admin.addColumn(Bytes.toBytes(tableName), hcd); + + miniHBaseCluster.getRegionServer(0).abort("Killing while instant schema change"); + waitForSchemaChangeProcess(tableName); + Put put2 = new Put(row); + put2.add(Bytes.toBytes("family2"), qualifier, value); + ht.put(put2); + + Get get2 = new Get(row); + get2.addColumn(Bytes.toBytes("family2"), qualifier); + Result r2 = ht.get(get2); + byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); + int result2 = Bytes.compareTo(value, tvalue2); + assertEquals(result2, 0); + LOG.info("result2 = " + result2); + LOG.info("end testInstantSchemaChangeWhileRSCrash()"); + } + + /** + * Randomly bring down/up RS servers while schema change is in progress. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + @Test + public void testInstantSchemaChangeWhileRandomRSCrashAndStart() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeWhileRandomRSCrashAndStart()"); + miniHBaseCluster.getRegionServer(0).abort("Killing while instant schema change 000"); + // Start a new RS before schema change + miniHBaseCluster.startRegionServer(); + + final String tableName = "testInstantSchemaChangeWhileRandomRSCrashAndStart"; + HTable ht = createTableAndValidate(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("family2"); + admin.addColumn(Bytes.toBytes(tableName), hcd); + // Kill 2 RS now. + miniHBaseCluster.getRegionServer(1).abort("Killing while instant schema change 111"); + miniHBaseCluster.abortRegionServer(2); + // We will be left with only one RS. + waitForSchemaChangeProcess(tableName); + Put put2 = new Put(row); + put2.add(Bytes.toBytes("family2"), qualifier, value); + ht.put(put2); + + Get get2 = new Get(row); + get2.addColumn(Bytes.toBytes("family2"), qualifier); + Result r2 = ht.get(get2); + byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); + int result2 = Bytes.compareTo(value, tvalue2); + assertEquals(result2, 0); + LOG.info("result2 = " + result2); + LOG.info("end testInstantSchemaChangeWhileRandomRSCrashAndStart()"); + } + + /** + * Test scenario where primary master is brought down while processing the alter. + * This is harder one as it is very difficult the time this. Need to add more fine grained tests. + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + + @Test + public void testInstantSchemaChangeWhileMasterFailover() throws IOException, + KeeperException, InterruptedException { + LOG.info("Start testInstantSchemaChangeWhileMasterFailover()"); + + final String tableName = "testInstantSchemaChangeWhileMasterFailover"; + HTable ht = createTableAndValidate(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("family2"); + admin.addColumn(Bytes.toBytes(tableName), hcd); + // Kill primary master now. + miniHBaseCluster.abortMaster(0); + // We will be left with only one master. + waitForSchemaChangeProcess(tableName); + Put put2 = new Put(row); + put2.add(Bytes.toBytes("family2"), qualifier, value); + ht.put(put2); + + Get get2 = new Get(row); + get2.addColumn(Bytes.toBytes("family2"), qualifier); + Result r2 = ht.get(get2); + byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); + int result2 = Bytes.compareTo(value, tvalue2); + assertEquals(result2, 0); + LOG.info("result2 = " + result2); + LOG.info("end testInstantSchemaChangeWhileMasterFailover()"); + } + + private HTable createTableAndValidate(String tableName) throws IOException { + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start createTableAndValidate()"); + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + HTableDescriptor[] tables = admin.listTables(); + int numTables = 0; + if (tables != null) { + numTables = tables.length; + } + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("created table = " + tableName); + return ht; + } + +} + Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -85,6 +85,8 @@ public String clusterIdZNode; // znode used for log splitting work assignment public String splitLogZNode; + // znode used to record table schema changes + public String schemaZNode; private final Configuration conf; @@ -140,6 +142,7 @@ ZKUtil.createAndFailSilent(this, rsZNode); ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); + ZKUtil.createAndFailSilent(this, schemaZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -187,6 +190,9 @@ conf.get("zookeeper.znode.clusterId", "hbaseid")); splitLogZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); + schemaZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.schema", "schema")); + } /** Index: src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision 0) @@ -0,0 +1,133 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.logging.Logger; + +public class SchemaChangeTracker extends ZooKeeperNodeTracker { + public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class); + private RegionServerServices regionServer = null; + + + /** + * Constructs a new ZK node tracker. + *

+ *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param node + * @param abortable + */ + public SchemaChangeTracker(ZooKeeperWatcher watcher, + Abortable abortable, + RegionServerServices regionServer) { + super(watcher, watcher.schemaZNode, abortable); + this.regionServer = regionServer; + } + + @Override + public void start() { + try { + watcher.registerListener(this); + ZKUtil.listChildrenAndWatchThem(watcher, node); + } catch (KeeperException e) { + LOG.error("RegionServer SchemaChangeTracker startup failed with " + + "KeeperException.", e); + } + } + + // whenever new schema change occur this event will get triggered + @Override + public void nodeChildrenChanged(String path) { + if (path.equals(watcher.schemaZNode)) { + try { + List tables = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode); + LOG.debug("RS.SchemaChangeTracker: " + + "Current list of tables with schema change = " + tables); + if (tables != null) { + handleSchemaChange(tables); + } else { + LOG.error("No tables found for schema change event." + + " Skipping instant schema refresh"); + } + } catch (KeeperException ke) { + LOG.error("KeeperException handling schema change event.", ke); + + } catch (IOException e) { + LOG.error("IOException handling schema change event.", e); + } + } + } + + private void handleSchemaChange(List tables) throws IOException { + for (String tableName : tables) { + if (tableName != null) { + regionServer.refreshSchema(Bytes.toBytes(tableName)); + updateZKNode(tableName); + LOG.info("Refresh schema completed for table name = " + tableName + + " server = " + regionServer.getServerName().getServerName()); + } + } + } + + private void updateZKNode(String tableName) { + try { + ZKUtil.createAndFailSilent(this.watcher, + getSchemaChangeNodePathForTableAndServer(tableName, + regionServer.getServerName().getServerName())); + LOG.info("updateZKNode() Created child ZKNode with server name = " + + regionServer.getServerName().getServerName() + " for table = " + + tableName); + } catch (KeeperException.NoNodeException e) { + LOG.error("KeeperException.NoNodeException while updating the schema " + + "change node with server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName(), e); + } catch (KeeperException e) { + LOG.error("KeeperException while updating the schema change node with " + + "server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName(), e); + } + } + + private String getSchemaChangeNodePathForTable(String tableName) { + return ZKUtil.joinZNode(watcher.schemaZNode, tableName); + } + + private String getSchemaChangeNodePathForTableAndServer( + String tableName, String regionServerName) { + return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName), + regionServerName); + } + +} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision 0) @@ -0,0 +1,182 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import java.util.List; + +public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker { + public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class); + + /** + * Constructs a new ZK node tracker. + *

+ *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param abortable + */ + public MasterSchemaChangeTracker(ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, watcher.schemaZNode, abortable); + } + + + @Override + public void start() { + try { + watcher.registerListener(this); + List tables = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode); + processCompletedSchemaChanges(tables); + } catch (KeeperException e) { + LOG.error("MasterSchemaChangeTracker startup failed.", e); + abortable.abort("MasterSchemaChangeTracker startup failed", e); + } + } + + /** + * When a primary master crashes and the secondary master takes over + * mid-flight during an alter process, the secondary should cleanup any completed + * schema changes not handled by the master. + * @param tables + * @throws KeeperException + */ + private void processCompletedSchemaChanges(List tables) throws KeeperException { + if (tables.isEmpty()) { + LOG.debug("No current schema change is progress. Skipping cleanup"); + } + LOG.debug("Master seeing following tables undergoing schema change " + + "process. Tables = " + tables); + + for (String table : tables) { + LOG.debug("Processing table = "+ table); + processTableNode(table); + } + } + + /** + * If schema alter is handled for this table, then delete all the ZK nodes created + * for this table. + * @param tableName + * @throws KeeperException + */ + private void processTableNode(String tableName) throws KeeperException { + List servers = + ZKUtil.listChildrenAndWatchThem(watcher, + getSchemaChangeNodePathForTable(tableName)); + String path = getSchemaChangeNodePathForTable(tableName); + LOG.debug("Master.SchemaChangeTracker.processTableNode. " + + " Current table == " + tableName + + " List of servers which processed schema change = " + servers); + byte[] rsCountBytes = ZKUtil.getData(watcher, path); + int rsCount = 0; + if (rsCountBytes != null) { + rsCount = Bytes.toInt(rsCountBytes); + } + if (rsCount != 0 && (servers != null && servers.size() >= rsCount)) { + LOG.debug("All region servers have successfully processed the " + + "schema changes for table = " + tableName + + " . Deleting the schema change node = " + + path + " Number of region servers processed the schema change" + + " request = " + rsCount); + ZKUtil.deleteNodeRecursively(this.watcher, path); + } else { + LOG.debug("Not all region servers have processed the schema changes" + + "for table = " + tableName + " rs count = " + + rsCount + " processed count = " + servers); + } + } + + /** + * Create a new schema change ZK node. + * @param tableName Table name that is getting altered + * @throws KeeperException + */ + public void createSchemaChangeNode(String tableName) throws KeeperException { + LOG.debug("Creating schema change node for table = " + + tableName + " Path = " + + getSchemaChangeNodePathForTable(tableName)); + if (doesSchemaChangeNodeExists(tableName)) { + LOG.debug("Schema change node already exists for table = " + tableName + + " Deleting the schema change node."); + // If we already see a schema change node for this table we wait till the previous + // alter process is complete. Ideally, we need not wait and we could simply delete + // existing schema change node for this table and create new one. But then the + // RS cloud will not be able to process concurrent schema updates for the same table + // as they will be working with same set of online regions for this table. Meaning the + // second alter change will not see any online regions (as they were being closed and + // re opened by the first change) and will miss the second one. + // We either handle this at the RS level by explicit locks while processing a table + // or do it here. I prefer doing it here as it seems much cleaner. + while(doesSchemaChangeNodeExists(tableName)) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + + } + } + } + + int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode); + LOG.debug("Master is seeing " + rsCount + " region servers online before " + + "the schema change process. Master will wait for " + rsCount + + " region servers to acknowledge the schema change."); + ZKUtil.createSetData(this.watcher, + getSchemaChangeNodePathForTable(tableName), Bytes.toBytes(rsCount)); + ZKUtil.listChildrenAndWatchThem(this.watcher, + getSchemaChangeNodePathForTable(tableName)); + } + + /** + * Create a new schema change ZK node. + * @param tableName + * @throws KeeperException + */ + public boolean doesSchemaChangeNodeExists(String tableName) + throws KeeperException { + return ZKUtil.checkExists(watcher, + getSchemaChangeNodePathForTable(tableName)) != -1; + } + + @Override + public void nodeChildrenChanged(String path) { + if (path.startsWith(watcher.schemaZNode) && + !path.equals(watcher.schemaZNode)) { + try { + String tableName = path.substring(path.lastIndexOf("/")+1, path.length()); + processTableNode(tableName); + } catch (KeeperException e) { + LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" + + " schema change nodes", e); + } + } + } + + public String getSchemaChangeNodePathForTable(String tableName) { + return ZKUtil.joinZNode(watcher.schemaZNode, tableName); + } +} Index: src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (working copy) @@ -141,14 +141,14 @@ * Constructor */ EventType(int value) {} - public boolean isOnlineSchemaChangeSupported() { + public boolean isSchemaChangeEvent() { return ( - this.equals(EventType.C_M_ADD_FAMILY) || - this.equals(EventType.C_M_DELETE_FAMILY) || - this.equals(EventType.C_M_MODIFY_FAMILY) || - this.equals(EventType.C_M_MODIFY_TABLE) - ); + this.equals(EventType.C_M_ADD_FAMILY) || + this.equals(EventType.C_M_DELETE_FAMILY) || + this.equals(EventType.C_M_MODIFY_FAMILY) || + this.equals(EventType.C_M_MODIFY_TABLE)); } + } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (working copy) @@ -21,6 +21,8 @@ import org.apache.hadoop.hbase.Server; +import java.io.IOException; + /** * Interface to Map of online regions. In the Map, the key is the region's * encoded name and the value is an {@link HRegion} instance. @@ -49,4 +51,10 @@ * null if named region is not member of the online regions. */ public HRegion getFromOnlineRegions(String encodedRegionName); + + /** + * Refresh schema for online regions of a table. + * @param tableName + */ + public void refreshSchema(byte[] tableName) throws IOException; } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -269,6 +270,9 @@ // Cluster Status Tracker private ClusterStatusTracker clusterStatusTracker; + // Schema change Tracker + private SchemaChangeTracker schemaChangeTracker; + // Log Splitting Worker private SplitLogWorker splitLogWorker; @@ -543,6 +547,11 @@ this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); + + // Schema change tracker + this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper, + this, this); + this.schemaChangeTracker.start(); } /** @@ -3114,4 +3123,81 @@ HLog wal = this.getWAL(); return wal.rollWriter(true); } + + /** + * Refresh schema changes for given regions. + * @param onlineRegionsOfTable + * @throws IOException + */ + private void reopenRegions(List onlineRegionsOfTable) throws IOException { + + if (!onlineRegionsOfTable.isEmpty()) { + for (HRegion hRegion : onlineRegionsOfTable) { + HRegionInfo regionInfo = hRegion.getRegionInfo(); + // Close the region + hRegion.close(); + // Remove from online regions + removeFromOnlineRegions(regionInfo.getEncodedName()); + // Get new HTD + HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName()); + LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString() + + " Is = " + htd ); + HRegion region = + HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf); + // Add new region to the onlineRegions + addToOnlineRegions(region); + } + } + } + + /** + * Gets the online regions of the specified table. + * This method looks at the in-memory onlineRegions. It does not go to .META.. + * Only returns online regions. If a region on this table has been + * closed during a disable, etc., it will not be included in the returned list. + * So, the returned list may not necessarily be ALL regions in this table, its + * all the ONLINE regions in the table. + * @param tableName + * @return Online regions from tableName + */ + private List getOnlineRegionsForTable(byte[] tableName) { + List tableRegions = new ArrayList(); + synchronized (this.onlineRegions) { + for (HRegion region: this.onlineRegions.values()) { + HRegionInfo regionInfo = region.getRegionInfo(); + if(Bytes.equals(regionInfo.getTableName(), tableName)) { + tableRegions.add(region); + } + } + } + return tableRegions; + + } + + /** + * Refresh schema changes to all online regions of given table. + * @param tableName + */ + public void refreshSchema(byte[] tableName) throws IOException { + List onlineRegionsForTable = null; + try { + onlineRegionsForTable = getOnlineRegionsForTable(tableName); + if (!onlineRegionsForTable.isEmpty()) { + LOG.debug("refreshSchema found " + onlineRegionsForTable + + " online regions for table = " + Bytes.toString(tableName) + + " Refreshing them now.."); + reopenRegions(onlineRegionsForTable); + } else { + LOG.debug("refreshSchema found no onlineRegions for table = " + + Bytes.toString(tableName) + ". Skipping refreshSchema..."); + } + } catch (IOException ioe) { + LOG.warn("refreshSchema failed with exception " + + " for table = " + Bytes.toString(tableName) + + " Number of regions online = " + + onlineRegionsForTable == null ? 0 : onlineRegionsForTable.size(), ioe); + throw ioe; + } + } + } Index: src/main/java/org/apache/hadoop/hbase/master/MasterServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (working copy) @@ -26,7 +26,10 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; + /** * Services Master supplies */ @@ -52,15 +55,26 @@ public ExecutorService getExecutorService(); /** - * Check table is modifiable; i.e. exists and is offline. - * @param tableName Name of table to check. - * @throws TableNotDisabledException - * @throws TableNotFoundException + * Check table modifiable. i.e not ROOT or META and offlined for all commands except + * alter commands + * @param tableName + * @param eventType + * @throws IOException */ - public void checkTableModifiable(final byte [] tableName) throws IOException; + public void checkTableModifiable(final byte [] tableName, + EventHandler.EventType eventType) + throws IOException; + /** * @return Return table descriptors implementation. */ public TableDescriptors getTableDescriptors(); + + /** + * Get Master Schema change tracker + * @return + */ + public MasterSchemaChangeTracker getSchemaChangeTracker(); + } Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -321,11 +321,17 @@ } } + // private void excludeRegionServerFromSchemaChanges(final ServerName serverName) { + // this.services.getSchemaChangeTracker() + // .excludeRegionServerForSchemaChanges(serverName.getServerName()); + //} + /* * Expire the passed server. Add it to list of deadservers and queue a * shutdown processing. */ public synchronized void expireServer(final ServerName serverName) { + //excludeRegionServerFromSchemaChanges(serverName); if (!this.onlineServers.containsKey(serverName)) { LOG.warn("Received expiration of " + serverName + " but server is not currently online"); Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.ipc.HBaseRPC; @@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MapWritable; @@ -157,7 +159,10 @@ private CatalogTracker catalogTracker; // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; - + + // Schema change tracker + private MasterSchemaChangeTracker schemaChangeTracker; + // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting // operations/debugging. @@ -188,6 +193,8 @@ private final ServerName serverName; private TableDescriptors tableDescriptors; + + private boolean supportInstantSchemaChanges = false; /** * Initializes the HMaster. The steps are as follows: @@ -249,6 +256,17 @@ } this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true); this.metrics = new MasterMetrics(getServerName().toString()); + // initialize instant schema change settings + this.supportInstantSchemaChanges = conf.getBoolean( + "hbase.instant.schema.alter.enabled", false); + if (supportInstantSchemaChanges) { + LOG.info("Instant schema change enabled. All schema alter operations will " + + "happen through ZK."); + } + else { + LOG.info("Instant schema change disabled. All schema alter operations will " + + "happen normally."); + } } /** @@ -377,6 +395,11 @@ boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); + // initialize schema change tracker + this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(), + this); + this.schemaChangeTracker.start(); + LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + @@ -605,6 +628,11 @@ return this.tableDescriptors; } + @Override + public MasterSchemaChangeTracker getSchemaChangeTracker() { + return this.schemaChangeTracker; + } + /** @return InfoServer object. Maybe null.*/ public InfoServer getInfoServer() { return this.infoServer; @@ -994,7 +1022,7 @@ if (cpHost != null) { cpHost.preDeleteTable(tableName); } - this.executorService.submit(new DeleteTableHandler(tableName, this, this)); + this.executorService.submit(new DeleteTableHandler(tableName, this, this, this)); if (cpHost != null) { cpHost.postDeleteTable(tableName); @@ -1020,7 +1048,8 @@ return; } } - new TableAddFamilyHandler(tableName, column, this, this).process(); + new TableAddFamilyHandler(tableName, column, this, this, + this, supportInstantSchemaChanges).process(); if (cpHost != null) { cpHost.postAddColumn(tableName, column); } @@ -1033,7 +1062,8 @@ return; } } - new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); + new TableModifyFamilyHandler(tableName, descriptor, this, this, + this, supportInstantSchemaChanges).process(); if (cpHost != null) { cpHost.postModifyColumn(tableName, descriptor); } @@ -1046,7 +1076,8 @@ return; } } - new TableDeleteFamilyHandler(tableName, c, this, this).process(); + new TableDeleteFamilyHandler(tableName, c, this, this, + this, supportInstantSchemaChanges).process(); if (cpHost != null) { cpHost.postDeleteColumn(tableName, c); } @@ -1113,22 +1144,32 @@ @Override public void modifyTable(final byte[] tableName, HTableDescriptor htd) - throws IOException { + throws IOException { if (cpHost != null) { cpHost.preModifyTable(tableName, htd); } - this.executorService.submit(new ModifyTableHandler(tableName, htd, this, - this)); - + this, this, supportInstantSchemaChanges)); if (cpHost != null) { cpHost.postModifyTable(tableName, htd); } } @Override - public void checkTableModifiable(final byte [] tableName) + public void checkTableModifiable(final byte [] tableName, + EventHandler.EventType eventType) throws IOException { + preCheckTableModifiable(tableName); + if (!eventType.isSchemaChangeEvent()) { + if (!getAssignmentManager().getZKTable(). + isDisabledTable(Bytes.toString(tableName))) { + throw new TableNotDisabledException(tableName); + } + } + } + + private void preCheckTableModifiable(final byte[] tableName) + throws IOException { String tableNameStr = Bytes.toString(tableName); if (isCatalogTable(tableName)) { throw new IOException("Can't modify catalog tables"); @@ -1136,12 +1177,9 @@ if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) { throw new TableNotFoundException(tableNameStr); } - if (!getAssignmentManager().getZKTable(). - isDisabledTable(Bytes.toString(tableName))) { - throw new TableNotDisabledException(tableName); - } } + public void clearFromTransition(HRegionInfo hri) { if (this.assignmentManager.isRegionInTransition(hri) != null) { this.assignmentManager.clearRegionFromTransition(hri); Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -40,8 +41,11 @@ public TableModifyFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, Server server, - final MasterServices masterServices) throws IOException { - super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices); + final MasterServices masterServices, + HMasterInterface masterInterface, + boolean instantChange) throws IOException { + super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices, + masterInterface, instantChange); this.familyDesc = familyDesc; } Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -38,8 +39,10 @@ private final HColumnDescriptor familyDesc; public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, - Server server, final MasterServices masterServices) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + Server server, final MasterServices masterServices, + HMasterInterface masterInterface, boolean instantChange) throws IOException { + super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices, + masterInterface, instantChange); this.familyDesc = familyDesc; } Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -38,11 +39,15 @@ private final byte [] familyName; public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, - Server server, final MasterServices masterServices) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + Server server, final MasterServices masterServices, + HMasterInterface masterInterface, + boolean instantChange) throws IOException { + super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices, + masterInterface, instantChange); this.familyName = familyName; } + @Override protected void handleTableOperation(List hris) throws IOException { AssignmentManager am = this.masterServices.getAssignmentManager(); Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (working copy) @@ -30,14 +30,16 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKTable; import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -52,26 +54,22 @@ public abstract class TableEventHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(TableEventHandler.class); protected final MasterServices masterServices; + protected HMasterInterface master = null; protected final byte [] tableName; protected final String tableNameStr; + protected boolean instantAction = false; public TableEventHandler(EventType eventType, byte [] tableName, Server server, - MasterServices masterServices) + MasterServices masterServices, HMasterInterface masterInterface, + boolean instantSchemaChange) throws IOException { super(server, eventType); this.masterServices = masterServices; this.tableName = tableName; - try { - this.masterServices.checkTableModifiable(tableName); - } catch (TableNotDisabledException ex) { - if (eventType.isOnlineSchemaChangeSupported()) { - LOG.debug("Ignoring table not disabled exception " + - "for supporting online schema changes."); - } else { - throw ex; - } - } + this.masterServices.checkTableModifiable(tableName, eventType); this.tableNameStr = Bytes.toString(this.tableName); + this.instantAction = instantSchemaChange; + this.master = masterInterface; } @Override @@ -83,17 +81,7 @@ MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); handleTableOperation(hris); - if (eventType.isOnlineSchemaChangeSupported() && this.masterServices. - getAssignmentManager().getZKTable(). - isEnabledTable(Bytes.toString(tableName))) { - this.masterServices.getAssignmentManager().setRegionsToReopen(hris); - if (reOpenAllRegions(hris)) { - LOG.info("Completed table operation " + eventType + " on table " + - Bytes.toString(tableName)); - } else { - LOG.warn("Error on reopening the regions"); - } - } + handleSchemaChanges(hris); } catch (IOException e) { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); } catch (KeeperException e) { @@ -101,13 +89,46 @@ } } + private void handleSchemaChanges(List regions) + throws IOException { + if (instantAction) { + handleInstantSchemaChanges(); + } else { + handleRegularSchemaChanges(regions); + } + } + + /** + * Perform schema changes only if the table is in enabled state. + * @return + */ + private boolean canPerformSchemaChange() { + return (eventType.isSchemaChangeEvent() && this.masterServices. + getAssignmentManager().getZKTable(). + isEnabledTable(Bytes.toString(tableName))); + } + + private void handleRegularSchemaChanges(List regions) + throws IOException { + if (canPerformSchemaChange()) { + this.masterServices.getAssignmentManager().setRegionsToReopen(regions); + if (reOpenAllRegions(regions)) { + LOG.info("Completed table operation " + eventType + " on table " + + Bytes.toString(tableName)); + } else { + LOG.warn("Error on reopening the regions"); + } + } + } + public boolean reOpenAllRegions(List regions) throws IOException { boolean done = false; LOG.info("Bucketing regions by region server..."); HTable table = new HTable(masterServices.getConfiguration(), tableName); TreeMap> serverToRegions = Maps .newTreeMap(); - NavigableMap hriHserverMapping = table.getRegionLocations(); + NavigableMap hriHserverMapping + = table.getRegionLocations(); for (HRegionInfo hri : regions) { ServerName rsLocation = hriHserverMapping.get(hri); @@ -138,6 +159,34 @@ } return done; } + + protected void handleInstantSchemaChanges() { + if (canPerformSchemaChange()) { + try { + MasterSchemaChangeTracker masterSchemaChangeTracker = + this.masterServices.getSchemaChangeTracker(); + // Turn the load balancer off if necessary. + boolean currentBalanceSwitch = master.balanceSwitch(false); + masterSchemaChangeTracker + .createSchemaChangeNode(Bytes.toString(tableName)); + while(!masterSchemaChangeTracker.doesSchemaChangeNodeExists( + Bytes.toString(tableName))) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + + } + } + if (currentBalanceSwitch) { + LOG.info("Schema change operation completed. Enabling load balancer now."); + this.master.balanceSwitch(true); + } + } catch (KeeperException e) { + LOG.warn("Instant schema change failed for table " + tableName, e); + } + } + } + protected abstract void handleTableOperation(List regions) throws IOException, KeeperException; } Index: src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java (working copy) @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -33,11 +34,19 @@ public ModifyTableHandler(final byte [] tableName, final HTableDescriptor htd, final Server server, - final MasterServices masterServices) throws IOException { - super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices); + final MasterServices masterServices, + final HMasterInterface masterInterface, + boolean instantModify) throws IOException { + super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices, + masterInterface, instantModify); this.htd = htd; + validateTable(tableName, htd); + } + + private void validateTable(final byte[] tableName, HTableDescriptor htd) + throws IOException { if (!Bytes.equals(tableName, htd.getName())) { - throw new IOException("TableDescriptor name & tableName must match: " + throw new IOException("TableDescriptor name & tableName must match: " + htd.getNameAsString() + " vs " + Bytes.toString(tableName)); } } Index: src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (revision 1171786) +++ src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (working copy) @@ -237,13 +237,6 @@ public HTableDescriptor[] getHTableDescriptors(); /** - * Get current HTD for a given tablename - * @param tableName - * @return HTableDescriptor for the table - */ - //public HTableDescriptor getHTableDescriptor(final byte[] tableName); - - /** * Get array of HTDs for requested tables. * @param tableNames * @return array of HTableDescriptor Index: src/main/ruby/hbase/admin.rb =================================================================== --- src/main/ruby/hbase/admin.rb (revision 1171786) +++ src/main/ruby/hbase/admin.rb (working copy) @@ -372,6 +372,9 @@ end end + #---------------------------------------------------------------------------------------------- + # Change table structure or table options + def status(format) status = @admin.getClusterStatus() if format == "detailed" Index: src/main/resources/hbase-default.xml =================================================================== --- src/main/resources/hbase-default.xml (revision 1171786) +++ src/main/resources/hbase-default.xml (working copy) @@ -513,6 +513,15 @@ used for client / server RPC call marshalling. + + hbase.instant.schema.alter.enabled + false + Whether or not to handle alter schema changes instantly or not. + If enabled, all schema change alter operations will be instant, as the master will not + explicitly unassign/assign the impacted regions and instead will rely on Region servers to + refresh their schema changes. + +