Index: hbase/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java (revision ) @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -36,8 +37,23 @@ final MasterServices masterServices) throws IOException { super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices); this.htd = htd; + validateTable(tableName, htd); + } + + public ModifyTableHandler(final byte [] tableName, + final HTableDescriptor htd, final Server server, + final MasterServices masterServices, + final HMasterInterface masterInterface) throws IOException { + super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices, + masterInterface); + this.htd = htd; + validateTable(tableName, htd); + } + + private void validateTable(final byte[] tableName, HTableDescriptor htd) + throws IOException { if (!Bytes.equals(tableName, htd.getName())) { - throw new IOException("TableDescriptor name & tableName must match: " + throw new IOException("TableDescriptor name & tableName must match: " + htd.getNameAsString() + " vs " + Bytes.toString(tableName)); } } Index: hbase/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (revision ) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -37,9 +38,10 @@ private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class); public DeleteTableHandler(byte [] tableName, Server server, - final MasterServices masterServices) + final MasterServices masterServices, HMasterInterface masterInterface) throws IOException { - super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices); + super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices, + masterInterface); } @Override Index: hbase/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision ) @@ -892,9 +892,9 @@ * @param column column descriptor of column to be added * @throws IOException if a remote or network exception occurs */ - public void addColumn(final String tableName, HColumnDescriptor column) + public void addColumn(final String tableName, HColumnDescriptor column, boolean instantAdd) throws IOException { - addColumn(Bytes.toBytes(tableName), column); + addColumn(Bytes.toBytes(tableName), column, instantAdd); } /** @@ -905,11 +905,11 @@ * @param column column descriptor of column to be added * @throws IOException if a remote or network exception occurs */ - public void addColumn(final byte [] tableName, HColumnDescriptor column) + public void addColumn(final byte [] tableName, HColumnDescriptor column, boolean instantAdd) throws IOException { HTableDescriptor.isLegalTableName(tableName); try { - getMaster().addColumn(tableName, column); + getMaster().addColumn(tableName, column, instantAdd); } catch (RemoteException e) { throw RemoteExceptionHandler.decodeRemoteException(e); } @@ -923,9 +923,9 @@ * @param columnName name of column to be deleted * @throws IOException if a remote or network exception occurs */ - public void deleteColumn(final String tableName, final String columnName) + public void deleteColumn(final String tableName, final String columnName, boolean instantDelete) throws IOException { - deleteColumn(Bytes.toBytes(tableName), Bytes.toBytes(columnName)); + deleteColumn(Bytes.toBytes(tableName), Bytes.toBytes(columnName), instantDelete); } /** @@ -936,10 +936,10 @@ * @param columnName name of column to be deleted * @throws IOException if a remote or network exception occurs */ - public void deleteColumn(final byte [] tableName, final byte [] columnName) + public void deleteColumn(final byte [] tableName, final byte [] columnName, boolean instantDelete) throws IOException { try { - getMaster().deleteColumn(tableName, columnName); + getMaster().deleteColumn(tableName, columnName, instantDelete); } catch (RemoteException e) { throw RemoteExceptionHandler.decodeRemoteException(e); } @@ -956,9 +956,9 @@ * @deprecated The columnName is redundant. Use {@link #addColumn(String, HColumnDescriptor)} */ public void modifyColumn(final String tableName, final String columnName, - HColumnDescriptor descriptor) + HColumnDescriptor descriptor, boolean instantModify) throws IOException { - modifyColumn(tableName, descriptor); + modifyColumn(tableName, descriptor, instantModify); } /** @@ -969,9 +969,10 @@ * @param descriptor new column descriptor to use * @throws IOException if a remote or network exception occurs */ - public void modifyColumn(final String tableName, HColumnDescriptor descriptor) + public void modifyColumn(final String tableName, HColumnDescriptor descriptor, + boolean instantModify) throws IOException { - modifyColumn(Bytes.toBytes(tableName), descriptor); + modifyColumn(Bytes.toBytes(tableName), descriptor, instantModify); } /** @@ -985,9 +986,9 @@ * @deprecated The columnName is redundant. Use {@link #modifyColumn(byte[], HColumnDescriptor)} */ public void modifyColumn(final byte [] tableName, final byte [] columnName, - HColumnDescriptor descriptor) + HColumnDescriptor descriptor, boolean instantModify) throws IOException { - modifyColumn(tableName, descriptor); + modifyColumn(tableName, descriptor, instantModify); } /** @@ -998,10 +999,11 @@ * @param descriptor new column descriptor to use * @throws IOException if a remote or network exception occurs */ - public void modifyColumn(final byte [] tableName, HColumnDescriptor descriptor) + public void modifyColumn(final byte [] tableName, HColumnDescriptor descriptor, + boolean instantModify) throws IOException { try { - getMaster().modifyColumn(tableName, descriptor); + getMaster().modifyColumn(tableName, descriptor, instantModify); } catch (RemoteException re) { // Convert RE exceptions in here; client shouldn't have to deal with them, // at least w/ the type of exceptions that come out of this method: @@ -1444,10 +1446,10 @@ * @param htd modified description of the table * @throws IOException if a remote or network exception occurs */ - public void modifyTable(final byte [] tableName, HTableDescriptor htd) + public void modifyTable(final byte [] tableName, HTableDescriptor htd, boolean instantModify) throws IOException { try { - getMaster().modifyTable(tableName, htd); + getMaster().modifyTable(tableName, htd, instantModify); } catch (RemoteException re) { // Convert RE exceptions in here; client shouldn't have to deal with them, // at least w/ the type of exceptions that come out of this method: Index: hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java (revision ) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -45,6 +46,15 @@ this.familyDesc = familyDesc; } + public TableModifyFamilyHandler(byte[] tableName, + HColumnDescriptor familyDesc, Server server, + final MasterServices masterServices, + HMasterInterface masterInterface) throws IOException { + super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices, + masterInterface); + this.familyDesc = familyDesc; + } + @Override protected void handleTableOperation(List regions) throws IOException { AssignmentManager am = this.masterServices.getAssignmentManager(); Index: hbase/src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision ) @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.ipc.HBaseRPC; @@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MapWritable; @@ -157,7 +159,10 @@ private CatalogTracker catalogTracker; // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; - + + // Schema change tracker + private MasterSchemaChangeTracker schemaChangeTracker; + // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting // operations/debugging. @@ -377,6 +382,11 @@ boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); + // initialize schema change tracker + this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(), + this); + this.schemaChangeTracker.start(); + LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + @@ -605,6 +615,11 @@ return this.tableDescriptors; } + @Override + public MasterSchemaChangeTracker getSchemaChangeTracker() { + return this.schemaChangeTracker; + } + /** @return InfoServer object. Maybe null.*/ public InfoServer getInfoServer() { return this.infoServer; @@ -987,7 +1002,7 @@ if (cpHost != null) { cpHost.preDeleteTable(tableName); } - this.executorService.submit(new DeleteTableHandler(tableName, this, this)); + this.executorService.submit(new DeleteTableHandler(tableName, this, this, this)); if (cpHost != null) { cpHost.postDeleteTable(tableName); @@ -1006,40 +1021,54 @@ return this.assignmentManager.getReopenStatus(tableName); } - public void addColumn(byte [] tableName, HColumnDescriptor column) + public void addColumn(byte [] tableName, HColumnDescriptor column, boolean insantAdd) throws IOException { if (cpHost != null) { if (cpHost.preAddColumn(tableName, column)) { return; } } + if (insantAdd) { + new TableAddFamilyHandler(tableName, column, this, this, this).process(); + } else { - new TableAddFamilyHandler(tableName, column, this, this).process(); + new TableAddFamilyHandler(tableName, column, this, this).process(); + } if (cpHost != null) { cpHost.postAddColumn(tableName, column); } } - public void modifyColumn(byte [] tableName, HColumnDescriptor descriptor) + public void modifyColumn(byte [] tableName, HColumnDescriptor descriptor, boolean instantModify) throws IOException { if (cpHost != null) { if (cpHost.preModifyColumn(tableName, descriptor)) { return; } } + if (instantModify) { + new TableModifyFamilyHandler(tableName, descriptor, this, this, this).process(); + + } else { - new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); + new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); + } + if (cpHost != null) { cpHost.postModifyColumn(tableName, descriptor); } } - public void deleteColumn(final byte [] tableName, final byte [] c) + public void deleteColumn(final byte [] tableName, final byte [] c, boolean instantDelete) throws IOException { if (cpHost != null) { if (cpHost.preDeleteColumn(tableName, c)) { return; } } + if (instantDelete) { + new TableDeleteFamilyHandler(tableName, c, this, this, this).process(); + } else { - new TableDeleteFamilyHandler(tableName, c, this, this).process(); + new TableDeleteFamilyHandler(tableName, c, this, this).process(); + } if (cpHost != null) { cpHost.postDeleteColumn(tableName, c); } @@ -1105,20 +1134,48 @@ } @Override - public void modifyTable(final byte[] tableName, HTableDescriptor htd) + public void modifyTable(final byte[] tableName, HTableDescriptor htd, boolean instantModify) throws IOException { if (cpHost != null) { cpHost.preModifyTable(tableName, htd); } - + if (instantModify) { - this.executorService.submit(new ModifyTableHandler(tableName, htd, this, + this.executorService.submit(new ModifyTableHandler(tableName, htd, this, + this, this)); + } else { + this.executorService.submit(new ModifyTableHandler(tableName, htd, this, - this)); + this)); + } if (cpHost != null) { cpHost.postModifyTable(tableName, htd); } } + public void checkTableModifiable(final byte [] tableName, + EventHandler.EventType eventType) + throws IOException { + preCheckTableModifiable(tableName); + if (!eventType.isSchemaChangeEvent()) { + if (!getAssignmentManager().getZKTable(). + isDisabledTable(Bytes.toString(tableName))) { + throw new TableNotDisabledException(tableName); + } + } + } + + private void preCheckTableModifiable(final byte[] tableName) + throws IOException { + String tableNameStr = Bytes.toString(tableName); + if (isCatalogTable(tableName)) { + throw new IOException("Can't modify catalog tables"); + } + if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) { + throw new TableNotFoundException(tableNameStr); + } + } + + @Override public void checkTableModifiable(final byte [] tableName) throws IOException { Index: hbase/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (revision ) @@ -91,7 +91,7 @@ * @param column column descriptor * @throws IOException e */ - public void addColumn(final byte [] tableName, HColumnDescriptor column) + public void addColumn(final byte [] tableName, HColumnDescriptor column, boolean instantAdd) throws IOException; /** @@ -100,7 +100,7 @@ * @param descriptor new column descriptor * @throws IOException e */ - public void modifyColumn(final byte [] tableName, HColumnDescriptor descriptor) + public void modifyColumn(final byte [] tableName, HColumnDescriptor descriptor, boolean instantModify) throws IOException; @@ -110,7 +110,7 @@ * @param columnName column family to remove * @throws IOException e */ - public void deleteColumn(final byte [] tableName, final byte [] columnName) + public void deleteColumn(final byte [] tableName, final byte [] columnName, boolean instantDelete) throws IOException; /** @@ -135,7 +135,7 @@ * @param htd new descriptor for table * @throws IOException e */ - public void modifyTable(byte[] tableName, HTableDescriptor htd) + public void modifyTable(byte[] tableName, HTableDescriptor htd, boolean instantModify) throws IOException; /** Index: hbase/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision ) @@ -140,14 +140,14 @@ * Constructor */ EventType(int value) {} - public boolean isOnlineSchemaChangeSupported() { + public boolean isSchemaChangeEvent() { return ( - this.equals(EventType.C_M_ADD_FAMILY) || - this.equals(EventType.C_M_DELETE_FAMILY) || - this.equals(EventType.C_M_MODIFY_FAMILY) || + this.equals(EventType.C_M_ADD_FAMILY) || + this.equals(EventType.C_M_DELETE_FAMILY) || + this.equals(EventType.C_M_MODIFY_FAMILY) || - this.equals(EventType.C_M_MODIFY_TABLE) - ); + this.equals(EventType.C_M_MODIFY_TABLE)); } + } /** Index: hbase/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java =================================================================== --- hbase/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (revision ) @@ -271,7 +271,7 @@ final byte [] hcdName = hcd.getName(); expectedException = false; try { - this.admin.modifyColumn(tableName, hcd); + this.admin.modifyColumn(tableName, hcd, false); } catch (TableNotDisabledException re) { expectedException = true; } @@ -288,7 +288,7 @@ xtracol.setValue(xtracolName, xtracolName); expectedException = false; try { - this.admin.addColumn(tableName, xtracol); + this.admin.addColumn(tableName, xtracol, false); } catch (TableNotDisabledException re) { expectedException = true; } @@ -300,7 +300,7 @@ assertTrue(hcd.getValue(xtracolName).equals(xtracolName)); // Delete the just-added column. - this.admin.deleteColumn(tableName, xtracol.getName()); + this.admin.deleteColumn(tableName, xtracol.getName(), false); modifiedHtd = this.admin.getTableDescriptor(tableName); hcd = modifiedHtd.getFamily(xtracol.getName()); assertTrue(hcd == null); @@ -324,7 +324,7 @@ ExecutorService executor = services.getExecutorService(); AtomicBoolean done = new AtomicBoolean(false); executor.registerListener(EventType.C_M_MODIFY_TABLE, new DoneListener(done)); - this.admin.modifyTable(tableName, htd); + this.admin.modifyTable(tableName, htd, false); while (!done.get()) { synchronized (done) { try { @@ -753,15 +753,15 @@ } catch (org.apache.hadoop.hbase.client.RegionOfflineException e) { // Expected } - this.admin.addColumn(tableName, new HColumnDescriptor("col2")); + this.admin.addColumn(tableName, new HColumnDescriptor("col2"), false); this.admin.enableTable(tableName); try { - this.admin.deleteColumn(tableName, Bytes.toBytes("col2")); + this.admin.deleteColumn(tableName, Bytes.toBytes("col2"), false); } catch(TableNotDisabledException e) { // Expected } this.admin.disableTable(tableName); - this.admin.deleteColumn(tableName, Bytes.toBytes("col2")); + this.admin.deleteColumn(tableName, Bytes.toBytes("col2"), false); this.admin.deleteTable(tableName); } Index: hbase/src/main/ruby/hbase/admin.rb =================================================================== --- hbase/src/main/ruby/hbase/admin.rb (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/ruby/hbase/admin.rb (revision ) @@ -327,13 +327,13 @@ # If column already exist, then try to alter it. Create otherwise. if htd.hasFamily(column_name.to_java_bytes) - @admin.modifyColumn(table_name, column_name, descriptor) + @admin.modifyColumn(table_name, column_name, descriptor, false) if wait == true puts "Updating all regions with the new schema..." alter_status(table_name) end else - @admin.addColumn(table_name, descriptor) + @admin.addColumn(table_name, descriptor, false) if wait == true puts "Updating all regions with the new schema..." alter_status(table_name) @@ -345,7 +345,7 @@ # Delete column family if method == "delete" raise(ArgumentError, "NAME parameter missing for delete method") unless arg[NAME] - @admin.deleteColumn(table_name, arg[NAME]) + @admin.deleteColumn(table_name, arg[NAME], false) if wait == true puts "Updating all regions with the new schema..." alter_status(table_name) @@ -359,7 +359,7 @@ htd.setReadOnly(JBoolean.valueOf(arg[READONLY])) if arg[READONLY] htd.setMemStoreFlushSize(JLong.valueOf(arg[MEMSTORE_FLUSHSIZE])) if arg[MEMSTORE_FLUSHSIZE] htd.setDeferredLogFlush(JBoolean.valueOf(arg[DEFERRED_LOG_FLUSH])) if arg[DEFERRED_LOG_FLUSH] - @admin.modifyTable(table_name.to_java_bytes, htd) + @admin.modifyTable(table_name.to_java_bytes, htd, false) if wait == true puts "Updating all regions with the new schema..." alter_status(table_name) @@ -372,6 +372,68 @@ end end + #---------------------------------------------------------------------------------------------- + # Change table structure or table options + def alter_instant(table_name, wait = true, *args) + # Table name should be a string + raise(ArgumentError, "Table name must be of type String") unless table_name.kind_of?(String) + + # Table should exist + raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name) + + # There should be at least one argument + raise(ArgumentError, "There should be at least one argument but the table name") if args.empty? + + # Get table descriptor + htd = @admin.getTableDescriptor(table_name.to_java_bytes) + + # Process all args + args.each do |arg| + # Normalize args to support column name only alter specs + arg = { NAME => arg } if arg.kind_of?(String) + + # Normalize args to support shortcut delete syntax + arg = { METHOD => 'delete', NAME => arg['delete'] } if arg['delete'] + + # No method parameter, try to use the args as a column definition + unless method = arg.delete(METHOD) + descriptor = hcd(arg, htd) + if arg[COMPRESSION_COMPACT] + descriptor.setValue(COMPRESSION_COMPACT, arg[COMPRESSION_COMPACT]) + end + column_name = descriptor.getNameAsString + + # If column already exist, then try to alter it. Create otherwise. + if htd.hasFamily(column_name.to_java_bytes) + @admin.modifyColumn(table_name, column_name, descriptor, true) + else + @admin.addColumn(table_name, descriptor, true) + end + next + end + + # Delete column family + if method == "delete" + raise(ArgumentError, "NAME parameter missing for delete method") unless arg[NAME] + @admin.deleteColumn(table_name, arg[NAME], true) + next + end + + # Change table attributes + if method == "table_att" + htd.setMaxFileSize(JLong.valueOf(arg[MAX_FILESIZE])) if arg[MAX_FILESIZE] + htd.setReadOnly(JBoolean.valueOf(arg[READONLY])) if arg[READONLY] + htd.setMemStoreFlushSize(JLong.valueOf(arg[MEMSTORE_FLUSHSIZE])) if arg[MEMSTORE_FLUSHSIZE] + htd.setDeferredLogFlush(JBoolean.valueOf(arg[DEFERRED_LOG_FLUSH])) if arg[DEFERRED_LOG_FLUSH] + @admin.modifyTable(table_name.to_java_bytes, htd, true) + next + end + + # Unknown method + raise ArgumentError, "Unknown method: #{method}" + end + end + def status(format) status = @admin.getClusterStatus() if format == "detailed" Index: hbase/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java =================================================================== --- hbase/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision ) @@ -135,7 +135,11 @@ public HRegion getFromOnlineRegions(String encodedRegionName) { return this.regions.get(encodedRegionName); } - + + public void refreshSchema(byte[] tableName) throws IOException { + + } + @Override public void addToOnlineRegions(HRegion r) { this.regions.put(r.getRegionInfo().getEncodedName(), r); Index: hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java (revision ) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -43,6 +44,13 @@ this.familyDesc = familyDesc; } + public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, + Server server, final MasterServices masterServices, + HMasterInterface masterInterface) throws IOException { + super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices, masterInterface); + this.familyDesc = familyDesc; + } + @Override protected void handleTableOperation(List hris) throws IOException { Index: hbase/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java =================================================================== --- hbase/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java (revision ) @@ -547,7 +547,7 @@ cp.wasModifyTableCalled()); // add a column family - admin.addColumn(TEST_TABLE, new HColumnDescriptor(TEST_FAMILY2)); + admin.addColumn(TEST_TABLE, new HColumnDescriptor(TEST_FAMILY2), false); assertTrue("New column family shouldn't have been added to test table", cp.preAddColumnCalledOnly()); @@ -589,7 +589,7 @@ cp.wasModifyTableCalled()); // add a column family - admin.addColumn(TEST_TABLE, new HColumnDescriptor(TEST_FAMILY2)); + admin.addColumn(TEST_TABLE, new HColumnDescriptor(TEST_FAMILY2), false); assertTrue("New column family should have been added to test table", cp.wasAddColumnCalled()); @@ -613,7 +613,7 @@ // delete column assertFalse("No column family deleted yet", cp.wasDeleteColumnCalled()); - admin.deleteColumn(TEST_TABLE, TEST_FAMILY2); + admin.deleteColumn(TEST_TABLE, TEST_FAMILY2, false); HTableDescriptor tableDesc = admin.getTableDescriptor(TEST_TABLE); assertNull("'"+Bytes.toString(TEST_FAMILY2)+"' should have been removed", tableDesc.getFamily(TEST_FAMILY2)); Index: hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java (revision ) @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -38,11 +39,21 @@ private final byte [] familyName; public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, - Server server, final MasterServices masterServices) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); + Server server, final MasterServices masterServices + ) throws IOException { + super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices); this.familyName = familyName; } + public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, + Server server, final MasterServices masterServices, + HMasterInterface masterInterface) throws IOException { + super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices, + masterInterface); + this.familyName = familyName; + } + + @Override protected void handleTableOperation(List hris) throws IOException { AssignmentManager am = this.masterServices.getAssignmentManager(); Index: hbase/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (revision ) @@ -21,6 +21,8 @@ import org.apache.hadoop.hbase.Server; +import java.io.IOException; + /** * Interface to Map of online regions. In the Map, the key is the region's * encoded name and the value is an {@link HRegion} instance. @@ -49,4 +51,11 @@ * null if named region is not member of the online regions. */ public HRegion getFromOnlineRegions(String encodedRegionName); + + /** + * Refresh schema for online regions of a table. + * @param tableName + * @return + */ + public void refreshSchema(byte[] tableName) throws IOException; } \ No newline at end of file Index: hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision ) @@ -85,6 +85,8 @@ public String clusterIdZNode; // znode used for log splitting work assignment public String splitLogZNode; + // znode used to record table schema changes + public String schemaZNode; private final Configuration conf; @@ -140,6 +142,7 @@ ZKUtil.createAndFailSilent(this, rsZNode); ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); + ZKUtil.createAndFailSilent(this, schemaZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -187,6 +190,9 @@ conf.get("zookeeper.znode.clusterId", "hbaseid")); splitLogZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); + schemaZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.schema", "schema")); + } /** Index: hbase/src/main/ruby/shell.rb =================================================================== --- hbase/src/main/ruby/shell.rb (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/ruby/shell.rb (revision ) @@ -230,6 +230,7 @@ show_filters alter_status alter_async + alter_instant ] ) Index: hbase/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java =================================================================== --- hbase/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (revision ) @@ -46,12 +46,14 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; import org.mockito.Mockito; @@ -128,11 +130,6 @@ } @Override - public void checkTableModifiable(byte[] tableName) throws IOException { - //no-op - } - - @Override public AssignmentManager getAssignmentManager() { return this.asm; } @@ -142,7 +139,17 @@ return null; } + public void checkTableModifiable(byte[] tableName) throws IOException { + + } + @Override + public void checkTableModifiable(byte[] tableName, + EventHandler.EventType eventType) + throws IOException { + } + + @Override public MasterFileSystem getMasterFileSystem() { return this.mfs; } @@ -223,7 +230,11 @@ } }; } + + public MasterSchemaChangeTracker getSchemaChangeTracker() { + return null; - } + } + } @Test public void testGetHRegionInfo() throws IOException { Index: hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (revision ) @@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -52,8 +54,10 @@ public abstract class TableEventHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(TableEventHandler.class); protected final MasterServices masterServices; + protected HMasterInterface master = null; protected final byte [] tableName; protected final String tableNameStr; + protected boolean instantAction = false; public TableEventHandler(EventType eventType, byte [] tableName, Server server, MasterServices masterServices) @@ -61,19 +65,18 @@ super(server, eventType); this.masterServices = masterServices; this.tableName = tableName; - try { - this.masterServices.checkTableModifiable(tableName); - } catch (TableNotDisabledException ex) { - if (eventType.isOnlineSchemaChangeSupported()) { - LOG.debug("Ignoring table not disabled exception " + - "for supporting online schema changes."); - } else { - throw ex; - } - } + this.masterServices.checkTableModifiable(tableName, eventType); this.tableNameStr = Bytes.toString(this.tableName); } + public TableEventHandler(EventType eventType, byte [] tableName, Server server, + MasterServices masterServices, HMasterInterface masterInterface) + throws IOException { + this(eventType, tableName, server, masterServices); + this.instantAction = true; + this.master = masterInterface; + } + @Override public void process() { try { @@ -83,23 +86,37 @@ MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); handleTableOperation(hris); - if (eventType.isOnlineSchemaChangeSupported() && this.masterServices. + handleSchemaChanges(hris); + } catch (IOException e) { + LOG.error("Error manipulating table " + Bytes.toString(tableName), e); + } catch (KeeperException e) { + LOG.error("Error manipulating table " + Bytes.toString(tableName), e); + } + } + + private void handleSchemaChanges(List regions) + throws IOException { + if (instantAction) { + handleInstantSchemaChanges(); + } else { + handleRegularSchemaChanges(regions); + } + } + + private void handleRegularSchemaChanges(List regions) + throws IOException { + if (eventType.isSchemaChangeEvent() && this.masterServices. - getAssignmentManager().getZKTable(). - isEnabledTable(Bytes.toString(tableName))) { + getAssignmentManager().getZKTable(). + isEnabledTable(Bytes.toString(tableName))) { - this.masterServices.getAssignmentManager().setRegionsToReopen(hris); - if (reOpenAllRegions(hris)) { + this.masterServices.getAssignmentManager().setRegionsToReopen(regions); + if (reOpenAllRegions(regions)) { - LOG.info("Completed table operation " + eventType + " on table " + - Bytes.toString(tableName)); - } else { - LOG.warn("Error on reopening the regions"); - } - } + LOG.info("Completed table operation " + eventType + " on table " + + Bytes.toString(tableName)); + } else { + LOG.warn("Error on reopening the regions"); + } + } - } catch (IOException e) { - LOG.error("Error manipulating table " + Bytes.toString(tableName), e); - } catch (KeeperException e) { - LOG.error("Error manipulating table " + Bytes.toString(tableName), e); - } + } - } public boolean reOpenAllRegions(List regions) throws IOException { boolean done = false; @@ -107,7 +124,8 @@ HTable table = new HTable(masterServices.getConfiguration(), tableName); TreeMap> serverToRegions = Maps .newTreeMap(); - NavigableMap hriHserverMapping = table.getRegionLocations(); + NavigableMap hriHserverMapping + = table.getRegionLocations(); for (HRegionInfo hri : regions) { ServerName rsLocation = hriHserverMapping.get(hri); @@ -138,6 +156,32 @@ } return done; } + + protected void handleInstantSchemaChanges() { + if (eventType.isSchemaChangeEvent()) { + try { + MasterSchemaChangeTracker masterSchemaChangeTracker = + this.masterServices.getSchemaChangeTracker(); + // Turn the load balancer off if necessary. + boolean currentBalanceSwitch = master.balanceSwitch(false); + masterSchemaChangeTracker + .createSchemaChangeNode(Bytes.toString(tableName)); + while(true) { + if (!masterSchemaChangeTracker.isSchemaChangeNodeExists( + Bytes.toString(tableName))) { + break; + } + } + if (currentBalanceSwitch) { + LOG.info("Schema change operation completed. Enabling load balancer now."); + this.master.balanceSwitch(true); + } + } catch (KeeperException e) { + LOG.warn("Instant schema change failed for table " + tableName, e); + } + } + } + protected abstract void handleTableOperation(List regions) throws IOException, KeeperException; } Index: hbase/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (revision ) @@ -26,7 +26,10 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; + /** * Services Master supplies */ @@ -52,15 +55,26 @@ public ExecutorService getExecutorService(); /** - * Check table is modifiable; i.e. exists and is offline. - * @param tableName Name of table to check. - * @throws TableNotDisabledException - * @throws TableNotFoundException + * Check table modifiable. i.e not ROOT or META and offlined for all commands except + * alter commands + * @param tableName + * @param eventType + * @throws IOException */ - public void checkTableModifiable(final byte [] tableName) throws IOException; + public void checkTableModifiable(final byte [] tableName, + EventHandler.EventType eventType) + throws IOException; + /** * @return Return table descriptors implementation. */ public TableDescriptors getTableDescriptors(); + + /** + * Get Master Schema change tracker + * @return + */ + public MasterSchemaChangeTracker getSchemaChangeTracker(); + } Index: hbase/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision fe2c2b8d16b4668d9770a237cb57042504a8274e) +++ hbase/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision ) @@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -269,6 +270,9 @@ // Cluster Status Tracker private ClusterStatusTracker clusterStatusTracker; + // Schema change Tracker + private SchemaChangeTracker schemaChangeTracker; + // Log Splitting Worker private SplitLogWorker splitLogWorker; @@ -543,6 +547,11 @@ this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); + + // Schema change tracker + this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper, + this, this); + this.schemaChangeTracker.start(); } /** @@ -3114,4 +3123,82 @@ HLog wal = this.getWAL(); return wal.rollWriter(true); } + + /** + * Refresh schema changes for given regions. + * @param onlineRegionsOfTable + * @throws IOException + */ + private void reopenRegions(List onlineRegionsOfTable) throws IOException { + + if (onlineRegionsOfTable != null && onlineRegionsOfTable.size() > 0) { + for (HRegion hRegion : onlineRegionsOfTable) { + HRegionInfo regionInfo = hRegion.getRegionInfo(); + // Close the region + hRegion.close(); + // Remove from online regions + removeFromOnlineRegions(regionInfo.getEncodedName()); + // Get new HTD + HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName()); + LOG.info("HTD for region = " + regionInfo.getRegionNameAsString() + + " Is = " + htd ); + HRegion region = + HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf); + // Add new region to the onlineRegions + addToOnlineRegions(region); -} + } + } + } + + /** + * Gets the online regions of the specified table. + * This method looks at the in-memory onlineRegions. It does not go to .META.. + * Only returns online regions. If a region on this table has been + * closed during a disable, etc., it will not be included in the returned list. + * So, the returned list may not necessarily be ALL regions in this table, its + * all the ONLINE regions in the table. + * @param tableName + * @return Online regions from tableName + */ + private List getOnlineRegionsForTable(byte[] tableName) { + List tableRegions = new ArrayList(); + synchronized (this.onlineRegions) { + for (HRegion region: this.onlineRegions.values()) { + HRegionInfo regionInfo = region.getRegionInfo(); + if(Bytes.equals(regionInfo.getTableName(), tableName)) { + tableRegions.add(region); + } + } + } + return tableRegions; + + } + + /** + * Refresh schema changes to all online regions of given table. + * @param tableName + * @return true on successful schema refresh. + */ + public void refreshSchema(byte[] tableName) throws IOException { + List onlineRegionsForTable = null; + try { + onlineRegionsForTable = getOnlineRegionsForTable(tableName); + if (onlineRegionsForTable != null && onlineRegionsForTable.size() > 0) { + LOG.info("refreshSchema found " + onlineRegionsForTable + + " online regions for table = " + Bytes.toString(tableName) + + " Refreshing them now.."); + reopenRegions(onlineRegionsForTable); + } else { + LOG.info("refreshSchema found no onlineRegions for table = " + + Bytes.toString(tableName) + ". Skipping refreshSchema..."); + } + } catch (IOException ioe) { + LOG.info("refreshSchema failed with exception " + + " for table = " + Bytes.toString(tableName) + + " Number of regions online = " + + onlineRegionsForTable == null ? 0 : onlineRegionsForTable.size()); + throw ioe; + } + } + +} Index: hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision ) +++ hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java (revision ) @@ -0,0 +1,143 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.List; + +public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker { + public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class); + + /** + * Constructs a new ZK node tracker. + *

+ *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param node + * @param abortable + */ + public MasterSchemaChangeTracker(ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, watcher.schemaZNode, abortable); + } + + + @Override + public void start() { + try { + ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode); + watcher.registerListener(this); + } catch (KeeperException e) { + LOG.error("MasterSchemaChangeTracker startup failed.", e); + } + } + + /** + * Create a new schema change ZK node. + * @param tableName + * @throws KeeperException + */ + public void createSchemaChangeNode(String tableName) throws KeeperException { + LOG.info("Creating schema change node for table = " + + tableName + " Path = " + + getSchemaChangeNodePathForTable(tableName)); + if (isSchemaChangeNodeExists(tableName)) { + LOG.info("Schema change node already exists for table = " + tableName + + " Deleting the schema change node."); + deleteSchemaChangeNode(tableName); + } + ZKUtil.createAndFailSilent(this.watcher, + getSchemaChangeNodePathForTable(tableName)); + int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode); + ZKUtil.setData(this.watcher, getSchemaChangeNodePathForTable(tableName), + Bytes.toBytes(rsCount)); + ZKUtil.listChildrenAndWatchThem(this.watcher, + getSchemaChangeNodePathForTable(tableName)); + } + + private void deleteSchemaChangeNode(String tableName) throws KeeperException { + ZKUtil.deleteNodeRecursively(watcher, + getSchemaChangeNodePathForTable(tableName)); + } + + /** + * Create a new schema change ZK node. + * @param tableName + * @throws KeeperException + */ + public boolean isSchemaChangeNodeExists(String tableName) + throws KeeperException { + return ZKUtil.checkExists(watcher, + getSchemaChangeNodePathForTable(tableName)) != -1; + } + + + @Override + public void nodeChildrenChanged(String path) { + if (path.startsWith(watcher.schemaZNode) + && !path.equals(watcher.schemaZNode)) { + try { + List servers = + ZKUtil.listChildrenAndWatchThem(watcher, path); + String tableName = path.substring(path.lastIndexOf("/")+1, path.length()); + LOG.debug("Master.SchemaChangeTracker.nodeChildrenChanged. " + + " Current table == " + tableName + + " List of servers which processed schema change = " + servers); + byte[] rsCountBytes = ZKUtil.getData(watcher, + getSchemaChangeNodePathForTable(tableName)); + int rsCount = 0; + if (rsCountBytes != null) { + rsCount = Bytes.toInt(rsCountBytes); + } + //int rsCount = Bytes.toInt(rsCountBytes); + if (rsCount != 0 && (servers != null && servers.size() >= rsCount)) { + LOG.debug("All region servers have successfully processed the " + + "schema changes for table = " + tableName + + " . Deleting the schema change node = " + + path); + ZKUtil.deleteNodeRecursively(this.watcher, path); + } else { + LOG.debug("Not all region servers have processed the schema changes" + + "for table = " + tableName + " rs count = " + + rsCount + " processed count = " + servers); + } + } catch (KeeperException e) { + LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" + + " schema change nodes", e); + } + } else { + LOG.debug("Not processing Master.nodeChildrenChanged for path = " + + path); + } + } + + public String getSchemaChangeNodePathForTable(String tableName) { + return ZKUtil.joinZNode(watcher.schemaZNode, tableName); + } +} Index: hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java =================================================================== --- hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision ) +++ hbase/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision ) @@ -0,0 +1,157 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.logging.Logger; + +public class SchemaChangeTracker extends ZooKeeperNodeTracker { + public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class); + private RegionServerServices regionServer = null; + + + /** + * Constructs a new ZK node tracker. + *

+ *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param node + * @param abortable + */ + public SchemaChangeTracker(ZooKeeperWatcher watcher, + Abortable abortable, + RegionServerServices regionServer) { + super(watcher, watcher.schemaZNode, abortable); + this.regionServer = regionServer; + } + + @Override + public void start() { + try { + ZKUtil.listChildrenAndWatchThem(watcher, node); + watcher.registerListener(this); + } catch (KeeperException e) { + LOG.error("RegionServer SchemaChangeTracker startup failed with " + + "KeeperException.", e); + } + } + + // whenever new schema change occur this event will get triggered + @Override + public void nodeChildrenChanged(String path) { + if (path.equals(watcher.schemaZNode)) { + try { + List tables = + ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode); + LOG.debug("RS.SchemaChangeTracker: " + + "Current list of tables with schema change = " + tables); + if (tables != null) { + handleSchemaChange(tables); + } else { + LOG.error("No tables found for schema change event." + + " Skipping instant schema refresh"); + } + } catch (KeeperException ke) { + LOG.error("KeeperException handling schema change event.", ke); + + } + } + } + + private void handleSchemaChange(String tableName) throws IOException { + if (tableName != null) { + regionServer.refreshSchema(Bytes.toBytes(tableName)); + updateZKNode(tableName); + LOG.info("Refresh schema completed for table name = " + tableName + + " server = " + regionServer.getServerName().getServerName()); + } + } + + private boolean hasHandledSchemaChange(List servers) { + return (servers != null && + servers.contains(regionServer.getServerName().getServerName())); + } + + private void handleSchemaChange(List tables) { + for (String tableName : tables) { + try { + List servers = ZKUtil.listChildrenNoWatch(watcher, + getSchemaChangeNodePathForTable(tableName)); + if (!hasHandledSchemaChange(servers)) { + handleSchemaChange(tableName); + } else { + LOG.info("Schema change for table " + tableName + + " already addressed by server = " + + regionServer.getServerName().getServerName() + + " skipping refresh schema"); + } + } catch (KeeperException ke) { + LOG.error("KeeperException handling schema change event for " + + " table = " + tableName, ke); + } catch (IOException ioe) { + LOG.error("IOException handling schema change event for table = " + + tableName, ioe); + } + } + } + + private void updateZKNode(String tableName) { + try { + ZKUtil.createAndFailSilent(this.watcher, + getSchemaChangeNodePathForTableAndServer(tableName, + regionServer.getServerName().getServerName())); + LOG.info("updateZKNode() Created child ZKNode with server name = " + + regionServer.getServerName().getServerName() + " for table = " + + tableName); + } catch (KeeperException.NoNodeException e) { + LOG.error("KeeperException.NoNodeException while updating the schema " + + "change node with server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName(), e); + } catch (KeeperException e) { + LOG.error("KeeperException while updating the schema change node with " + + "server name for table = " + + tableName + " server = " + + regionServer.getServerName().getServerName(), e); + } + } + + private String getSchemaChangeNodePathForTable(String tableName) { + return ZKUtil.joinZNode(watcher.schemaZNode, tableName); + } + + private String getSchemaChangeNodePathForTableAndServer( + String tableName, String regionServerName) { + return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName), + regionServerName); + } + +} Index: hbase/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java =================================================================== --- hbase/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision ) +++ hbase/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision ) @@ -0,0 +1,323 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestInstantSchemaChange { + + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private HBaseAdmin admin; + private static MiniHBaseCluster miniHBaseCluster = null; + private Configuration conf; + private ZooKeeperWatcher zkw; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + miniHBaseCluster = TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } + + @Test + public void testInstantSchemaChangeForModifyTable() throws IOException, KeeperException { + + String tableName = "testSchemachange"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testInstantSchemaChangeForModifyTable()"); + + HTableDescriptor[] tables = admin.listTables(); + int numTables = 0; + if (tables != null) { + numTables = tables.length; + } + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachange created"); + + final byte [] row = Bytes.toBytes("row"); + final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + Put put = new Put(row); + put.add(HConstants.CATALOG_FAMILY, qualifier, value); + ht.put(put); + Get get = new Get(row); + get.addColumn(HConstants.CATALOG_FAMILY, qualifier); + Result r1 = ht.get(get); + byte[] tvalue1 = r1.getValue(HConstants.CATALOG_FAMILY, qualifier); + int result1 = Bytes.compareTo(value, tvalue1); + assertEquals(result1, 0); + + String newFamily = "newFamily"; + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(newFamily)); + + admin.modifyTable(Bytes.toBytes(tableName), htd, true); + waitForSchemaChangeProcess(tableName); + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + LOG.info("END testInstantSchemaChangeForModifyTable()"); + + } + + private void waitForSchemaChangeProcess(String tableName) + throws KeeperException { + LOG.info("Waiting for ZK node creation for table = " + tableName); + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + while(true) { + if (msct.isSchemaChangeNodeExists(tableName)) { + break; + } + } + + LOG.info("Waiting for ZK node deletion for table = " + tableName); + + while(true) { + if (!msct.isSchemaChangeNodeExists(tableName)) { + break; + } + } + } + + @Test + public void testInstantSchemaChangeForAddColumn() throws IOException, KeeperException { + LOG.info("Start testInstantSchemaChangeForAddColumn() "); + String tableName = "testSchemachangeForAddColumn"; + HTableDescriptor[] tables = admin.listTables(); + int numTables = 0; + if (tables != null) { + numTables = tables.length; + } + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachangeForAddColumn created"); + + final byte [] row = Bytes.toBytes("row"); + final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + Put put = new Put(row); + put.add(HConstants.CATALOG_FAMILY, qualifier, value); + ht.put(put); + Get get = new Get(row); + get.addColumn(HConstants.CATALOG_FAMILY, qualifier); + Result r1 = ht.get(get); + byte[] tvalue1 = r1.getValue(HConstants.CATALOG_FAMILY, qualifier); + int result1 = Bytes.compareTo(value, tvalue1); + assertEquals(result1, 0); + + String newFamily = "newFamily"; + HColumnDescriptor hcd = new HColumnDescriptor("newFamily"); + + admin.addColumn(Bytes.toBytes(tableName), hcd, true); + //waitForSchemaChangeProcess(tableName); + + // Take a mini nap for changes to take effect. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + LOG.info("******** Put into new column family "); + ht.put(put1); + + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + LOG.info(" Value put = " + value + " value from table = " + tvalue); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + LOG.info("End testInstantSchemaChangeForAddColumn() "); + + } + + @Test + public void testInstantSchemaChangeForModifyColumn() throws IOException, + KeeperException { + LOG.info("Start testInstantSchemaChangeForModifyColumn() "); + String tableName = "testSchemachangeForModifyColumn"; + int numTables = 0; + HTableDescriptor[] tables = admin.listTables(); + if (tables != null) { + numTables = tables.length; + } + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + HConstants.CATALOG_FAMILY); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachangeForModifyColumn created"); + + final byte [] row = Bytes.toBytes("row"); + final byte [] qualifier = Bytes.toBytes("qualifier"); + final byte [] value = Bytes.toBytes("value"); + + Put put = new Put(row); + put.add(HConstants.CATALOG_FAMILY, qualifier, value); + ht.put(put); + Get get = new Get(row); + get.addColumn(HConstants.CATALOG_FAMILY, qualifier); + Result r1 = ht.get(get); + byte[] tvalue1 = r1.getValue(HConstants.CATALOG_FAMILY, qualifier); + int result1 = Bytes.compareTo(value, tvalue1); + assertEquals(result1, 0); + + HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); + hcd.setMaxVersions(99); + hcd.setBlockCacheEnabled(false); + + admin.modifyColumn(Bytes.toBytes(tableName), hcd, true); + //waitForSchemaChangeProcess(tableName); + + // Take a mini nap for changes to take effect. + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + List onlineRegions + = miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn")); + for (HRegion onlineRegion : onlineRegions) { + HTableDescriptor htd = onlineRegion.getTableDesc(); + HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY); + assertTrue(tableHcd.isBlockCacheEnabled() == false); + assertEquals(tableHcd.getMaxVersions(), 99); + } + LOG.info("End testInstantSchemaChangeForModifyColumn() "); + + } + + @Test + public void testInstantSchemaChangeForDeleteColumn() throws IOException, + KeeperException { + LOG.info("Start testInstantSchemaChangeForDeleteColumn() "); + String tableName = "testSchemachangeForDeleteColumn"; + int numTables = 0; + HTableDescriptor[] tables = admin.listTables(); + if (tables != null) { + numTables = tables.length; + } + + byte[][] FAMILIES = new byte[][] { + Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; + + HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), + FAMILIES); + tables = this.admin.listTables(); + assertEquals(numTables + 1, tables.length); + LOG.info("Table testSchemachangeForDeleteColumn created"); + + admin.deleteColumn(tableName, "C", true); + + //waitForSchemaChangeProcess(tableName); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(Bytes.toBytes(tableName)); + HColumnDescriptor hcd = modifiedHtd.getFamily(Bytes.toBytes("C")); + assertTrue(hcd == null); + LOG.info("End testInstantSchemaChangeForDeleteColumn() "); + } + + @Test + public void testInstantSchemaOperationsInZK() throws IOException, KeeperException { + LOG.info("testInstantSchemaOperationsInZK() "); + + conf = TEST_UTIL.getConfiguration(); + zkw = new ZooKeeperWatcher(conf, "instant-schema-change-tests", null); + ZKUtil.createAndFailSilent(zkw, zkw.schemaZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.schemaZNode) != -1); + LOG.debug(zkw.schemaZNode + " created"); + + MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + msct.createSchemaChangeNode("testSchemachangeNode"); + LOG.debug(msct.getSchemaChangeNodePathForTable("testSchemachangeNode") + + " created"); + + String nodePath = msct.getSchemaChangeNodePathForTable("testSchemachangeNode"); + assertTrue(ZKUtil.checkExists(zkw, nodePath) != -1); + waitForSchemaChangeProcess("testSchemachangeNode"); + + assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1); + LOG.debug(msct.getSchemaChangeNodePathForTable("testSchemachangeNode") + + " deleted"); + + } + +} +