diff --git hbase-protocol/src/main/protobuf/MasterProcedure.proto hbase-protocol/src/main/protobuf/MasterProcedure.proto index 4e9b05e..93af886 100644 --- hbase-protocol/src/main/protobuf/MasterProcedure.proto +++ hbase-protocol/src/main/protobuf/MasterProcedure.proto @@ -58,6 +58,24 @@ message CreateTableStateData { repeated RegionInfo region_info = 3; } +enum ModifyTableState { + MODIFY_TABLE_PREPARE = 1; + MODIFY_TABLE_PRE_OPERATION = 2; + MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR = 3; + MODIFY_TABLE_REMOVE_REPLICA_COLUMN = 4; + MODIFY_TABLE_DELETE_FS_LAYOUT = 5; + MODIFY_TABLE_POST_OPERATION = 6; + MODIFY_TABLE_REOPEN_ALL_REGIONS = 7; +} + +message ModifyTableMessage { + required UserInformation user_info = 1; + optional TableSchema unmodified_table_schema = 2; + required TableSchema modified_table_schema = 3; + required bool delete_column_family_in_modify = 4; + repeated RegionInfo region_info = 5; +} + enum DeleteTableState { DELETE_TABLE_PRE_OPERATION = 1; DELETE_TABLE_REMOVE_FROM_META = 2; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index eb564a6..8672e54 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -18,10 +18,6 @@ */ package org.apache.hadoop.hbase.master; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Constructor; @@ -43,10 +39,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Service; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -94,7 +91,6 @@ import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; -import org.apache.hadoop.hbase.master.handler.ModifyTableHandler; import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; @@ -103,15 +99,17 @@ import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; +import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureResult; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; @@ -147,6 +145,11 @@ import org.mortbay.jetty.Connector; import org.mortbay.jetty.nio.SelectChannelConnector; import org.mortbay.jetty.servlet.Context; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Service; + /** * HMaster is the "master server" for HBase. An HBase cluster has one active * master. If many masters are started, all compete. Whichever wins goes on to @@ -1096,6 +1099,24 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } } + /** + * wait for synchronized procedure to complete. + * @param procId the procedure to wait for + * @throws IOException + */ + private void waitforProcedureToComplete(final long procId) throws IOException { + while (!this.procedureExecutor.isFinished(procId) && this.procedureExecutor.isRunning()) { + // TODO: add a config to make it tunable + // Dev Consideration: are we waiting forever, or we can set up some timeout value? + Threads.sleepWithoutInterrupt(1000); + } + ProcedureResult result = this.procedureExecutor.getResult(procId); + if (result.isFailed()) { + // If the procedure fails, we should always have an exception captured. Throw it. + throw result.getException().unwrapRemoteException(); + } + } + private void stopChores() { if (this.balancerChore != null) { this.balancerChore.cancel(true); @@ -1726,8 +1747,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (cpHost != null) { cpHost.preModifyTable(tableName, descriptor); } + LOG.info(getClientIdAuditPrefix() + " modify " + tableName); - new ModifyTableHandler(tableName, descriptor, this, this).prepare().process(); + + // Execute the operation synchronously - wait for the operation completes before continuing. + long procId = this.procedureExecutor.submitProcedure( + new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor)); + + waitforProcedureToComplete(procId); + if (cpHost != null) { cpHost.postModifyTable(tableName, descriptor); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java new file mode 100644 index 0000000..d07d3c8 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.BulkReOpen; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Helper class for schema change procedures + */ +@InterfaceAudience.Private +public class MasterDDLOperationHelper { + private static final Log LOG = LogFactory.getLog(MasterDDLOperationHelper.class); + + /** + * Check whether online schema change is allowed from config + **/ + public static boolean isOnlineSchemaChangeAllowed(final MasterProcedureEnv env) { + return env.getMasterServices().getConfiguration() + .getBoolean("hbase.online.schema.update.enable", false); + } + + /** + * Check whether a table is modifiable - exists and either offline or online with config set + * @param env MasterProcedureEnv + * @param tableName name of the table + * @throws IOException + */ + public static void checkTableModifiable(final MasterProcedureEnv env, final TableName tableName) + throws IOException { + // Checks whether the table exists + if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) { + throw new TableNotFoundException(tableName); + } + + // We only execute this procedure with table online if online schema change config is set. + if (env.getMasterServices().getAssignmentManager().getTableStateManager() + .isTableState(tableName, TableState.State.ENABLED) + && !MasterDDLOperationHelper.isOnlineSchemaChangeAllowed(env)) { + throw new TableNotDisabledException(tableName); + } + } + + /** + * Get region info list from the META system table. + **/ + public static List getRegionsFromMeta( + final MasterProcedureEnv env, + final TableName tableName) throws IOException { + if (TableName.META_TABLE_NAME.equals(tableName)) { + return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper()); + } else { + return MetaTableAccessor.getTableRegions(env.getMasterServices().getConnection(), tableName); + } + } + + /** + * Remove the column family from the file system + **/ + public static void deleteColumnFamilyFromFileSystem( + final MasterProcedureEnv env, + final TableName tableName, + List regionInfoList, + final byte[] familyName) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing family=" + Bytes.toString(familyName) + " from table=" + tableName); + } + if (regionInfoList == null) { + regionInfoList = getRegionsFromMeta(env, tableName); + } + for (HRegionInfo hri : regionInfoList) { + // Delete the family directory in FS for all the regions one by one + mfs.deleteFamilyFromFS(hri, familyName); + } + } + + /** + * Reopen all regions from a table after a schema change operation. + **/ + public static boolean reOpenAllRegions( + final MasterProcedureEnv env, + final TableName tableName, + final List regionInfoList) throws IOException { + boolean done = false; + LOG.info("Bucketing regions by region server..."); + List regionLocations = null; + Connection connection = env.getMasterServices().getConnection(); + try (RegionLocator locator = connection.getRegionLocator(tableName)) { + regionLocations = locator.getAllRegionLocations(); + } + // Convert List to Map. + NavigableMap hri2Sn = new TreeMap(); + for (HRegionLocation location : regionLocations) { + hri2Sn.put(location.getRegionInfo(), location.getServerName()); + } + TreeMap> serverToRegions = Maps.newTreeMap(); + List reRegions = new ArrayList(); + for (HRegionInfo hri : regionInfoList) { + ServerName sn = hri2Sn.get(hri); + // Skip the offlined split parent region + // See HBASE-4578 for more information. + if (null == sn) { + LOG.info("Skip " + hri); + continue; + } + if (!serverToRegions.containsKey(sn)) { + LinkedList hriList = Lists.newLinkedList(); + serverToRegions.put(sn, hriList); + } + reRegions.add(hri); + serverToRegions.get(sn).add(hri); + } + + LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size() + + " region servers."); + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + am.setRegionsToReopen(reRegions); + BulkReOpen bulkReopen = new BulkReOpen(env.getMasterServices(), serverToRegions, am); + while (true) { + try { + if (bulkReopen.bulkReOpen()) { + done = true; + break; + } else { + LOG.warn("Timeout before reopening all regions"); + } + } catch (InterruptedException e) { + LOG.warn("Reopen was interrupted"); + // Preserve the interrupt. + Thread.currentThread().interrupt(); + break; + } + } + return done; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java new file mode 100644 index 0000000..7d333f3 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -0,0 +1,525 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ModifyTableState; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.security.UserGroupInformation; + +@InterfaceAudience.Private +public class ModifyTableProcedure + extends StateMachineProcedure + implements TableProcedureInterface { + private static final Log LOG = LogFactory.getLog(ModifyTableProcedure.class); + + private AtomicBoolean aborted = new AtomicBoolean(false); + private HTableDescriptor unmodifiedHTableDescriptor = null; + private HTableDescriptor modifiedHTableDescriptor; + private UserGroupInformation user; + private List regionInfoList; + private boolean deleteColumnFamilyInModify; + + private Boolean traceEnabled = null; + + public ModifyTableProcedure() { + initilize(); + } + + public ModifyTableProcedure( + final MasterProcedureEnv env, + final HTableDescriptor htd) throws IOException { + initilize(); + this.modifiedHTableDescriptor = htd; + this.user = env.getRequestUser().getUGI(); + } + + private void initilize() { + this.unmodifiedHTableDescriptor = null; + this.regionInfoList = null; + this.traceEnabled = null; + this.deleteColumnFamilyInModify = false; + } + + protected Flow executeFromState(final MasterProcedureEnv env, ModifyTableState state) { + if (state == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Starting modifying of table: " + getTableName()); + } + state = ModifyTableState.MODIFY_TABLE_PREPARE; + setNextState(state); + } + + if (isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + + try { + switch (state) { + case MODIFY_TABLE_PREPARE: + prepareModify(env); + setNextState(ModifyTableState.MODIFY_TABLE_PRE_OPERATION); + break; + case MODIFY_TABLE_PRE_OPERATION: + preModify(env, state); + setNextState(ModifyTableState.MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR); + break; + case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR: + updateTableDescriptor(env); + setNextState(ModifyTableState.MODIFY_TABLE_REMOVE_REPLICA_COLUMN); + case MODIFY_TABLE_REMOVE_REPLICA_COLUMN: + updateReplicaColumnsIfNeeded(env, unmodifiedHTableDescriptor, modifiedHTableDescriptor); + if (deleteColumnFamilyInModify) { + setNextState(ModifyTableState.MODIFY_TABLE_DELETE_FS_LAYOUT); + } else { + setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION); + } + break; + case MODIFY_TABLE_DELETE_FS_LAYOUT: + deleteFromFs(env, unmodifiedHTableDescriptor, modifiedHTableDescriptor); + setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION); + break; + case MODIFY_TABLE_POST_OPERATION: + postModify(env, state); + setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS); + break; + case MODIFY_TABLE_REOPEN_ALL_REGIONS: + reOpenAllRegionsIfTableIsOnline(env); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (Exception e) { + if (!isRollbackSupported(state)) { + // We reach a state that cannot be rolled back. We just need to keep retry. + LOG.warn("Error trying to modify table=" + getTableName() + " state=" + state, e); + } else { + LOG.error("Error trying to modify table=" + getTableName() + " state=" + state, e); + setFailure("master-modify-table", e); + } + } + return Flow.HAS_MORE_STATE; + } + + protected void rollbackState(final MasterProcedureEnv env, final ModifyTableState state) + throws Exception { + if (isTraceEnabled()) { + LOG.trace(this + " rollback state=" + state); + } + + try { + switch (state) { + case MODIFY_TABLE_REOPEN_ALL_REGIONS: + break; // Nothing to undo. + case MODIFY_TABLE_POST_OPERATION: + // TODO-MAYBE: call the coprocessor event to un-modify? + break; + case MODIFY_TABLE_DELETE_FS_LAYOUT: + // Once we reach to this state - we could NOT rollback - as it is tricky to undelete + // the deleted files. We are not suppose to reach here, throw exception so that we know + // there is a code bug to investigate. + assert deleteColumnFamilyInModify; + throw new UnsupportedOperationException(this + " rollback of state=" + state + + " is unsupported."); + case MODIFY_TABLE_REMOVE_REPLICA_COLUMN: + // Undo the replica column update. + updateReplicaColumnsIfNeeded(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor); + break; + case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR: + restoreTableDescriptor(env); + break; + case MODIFY_TABLE_PRE_OPERATION: + // TODO-MAYBE: call the coprocessor event to un-modify? + break; + case MODIFY_TABLE_PREPARE: + break; // Nothing to undo. + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (Exception e) { + LOG.warn("Fail trying to rollback modify table=" + getTableName() + " state=" + state, e); + throw e; + } + } + + protected ModifyTableState getState(final int stateId) { + return ModifyTableState.valueOf(stateId); + } + + private void setNextState(final ModifyTableState state) { + if (aborted.get()) { + setAbortFailure("modify-table", "abort requested"); + } else { + setNextState(state.getNumber()); + } + } + + @Override + public boolean abort(final MasterProcedureEnv env) { + aborted.set(true); + return true; + } + + @Override + protected boolean acquireLock(final MasterProcedureEnv env) { + return env.getProcedureQueue().tryAcquireTableWrite( + getTableName(), + EventType.C_M_MODIFY_TABLE.toString()); + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureQueue().releaseTableWrite(getTableName()); + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProcedureProtos.ModifyTableMessage.Builder modifyTableMsg = + MasterProcedureProtos.ModifyTableMessage.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(user)) + .setModifiedTableSchema(modifiedHTableDescriptor.convert()) + .setDeleteColumnFamilyInModify(deleteColumnFamilyInModify); + + if (unmodifiedHTableDescriptor != null) { + modifyTableMsg.setUnmodifiedTableSchema(unmodifiedHTableDescriptor.convert()); + } + + if (regionInfoList != null) { + for (HRegionInfo hri : regionInfoList) { + modifyTableMsg.addRegionInfo(HRegionInfo.convert(hri)); + } + } + + modifyTableMsg.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProcedureProtos.ModifyTableMessage modifyTableMsg = + MasterProcedureProtos.ModifyTableMessage.parseDelimitedFrom(stream); + user = MasterProcedureUtil.toUserInfo(modifyTableMsg.getUserInfo()); + modifiedHTableDescriptor = HTableDescriptor.convert(modifyTableMsg.getModifiedTableSchema()); + deleteColumnFamilyInModify = modifyTableMsg.getDeleteColumnFamilyInModify(); + + if (modifyTableMsg.hasUnmodifiedTableSchema()) { + unmodifiedHTableDescriptor = + HTableDescriptor.convert(modifyTableMsg.getUnmodifiedTableSchema()); + } + + if (modifyTableMsg.getRegionInfoCount() == 0) { + regionInfoList = null; + } else { + regionInfoList = new ArrayList(modifyTableMsg.getRegionInfoCount()); + for (HBaseProtos.RegionInfo hri : modifyTableMsg.getRegionInfoList()) { + regionInfoList.add(HRegionInfo.convert(hri)); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()); + sb.append(" (table="); + sb.append(getTableName()); + sb.append(") procId="); + sb.append(getProcId()); + sb.append(" user="); + sb.append(user); + + return sb.toString(); + } + + @Override + public TableName getTableName() { + return modifiedHTableDescriptor.getTableName(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.EDIT; + } + + /** + * Check conditions before any real action of modifying a table. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void prepareModify(final MasterProcedureEnv env) throws IOException { + // Checks whether the table exists + if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), getTableName())) { + throw new TableNotFoundException(getTableName()); + } + + // In order to update the descriptor, we need to retrieve the old descriptor for comparison. + this.unmodifiedHTableDescriptor = + env.getMasterServices().getTableDescriptors().get(getTableName()); + + if (env.getMasterServices().getAssignmentManager().getTableStateManager() + .isTableState(getTableName(), TableState.State.ENABLED)) { + // We only execute this procedure with table online if online schema change config is set. + if (!MasterDDLOperationHelper.isOnlineSchemaChangeAllowed(env)) { + throw new TableNotDisabledException(getTableName()); + } + + if (modifiedHTableDescriptor.getRegionReplication() != unmodifiedHTableDescriptor + .getRegionReplication()) { + throw new IOException("REGION_REPLICATION change is not supported for enabled tables"); + } + } + + // Get the region info list before the real action. + this.regionInfoList = MasterDDLOperationHelper.getRegionsFromMeta(env, getTableName()); + + // Find out whether all column families in unmodifiedHTableDescriptor also exists in + // the modifiedHTableDescriptor. This is to determine whether we are safe to rollback. + final Set oldFamilies = unmodifiedHTableDescriptor.getFamiliesKeys(); + final Set newFamilies = modifiedHTableDescriptor.getFamiliesKeys(); + for (byte[] familyName : oldFamilies) { + if (!newFamilies.contains(familyName)) { + this.deleteColumnFamilyInModify = true; + break; + } + } + } + + /** + * Action before modifying table. + * @param env MasterProcedureEnv + * @param state the procedure state + * @throws IOException + * @throws InterruptedException + */ + private void preModify(final MasterProcedureEnv env, final ModifyTableState state) + throws IOException, InterruptedException { + runCoprocessorAction(env, state); + } + + /** + * Update descriptor + * @param env MasterProcedureEnv + * @throws IOException + **/ + private void updateTableDescriptor(final MasterProcedureEnv env) throws IOException { + env.getMasterServices().getTableDescriptors().add(modifiedHTableDescriptor); + } + + /** + * Undo the descriptor change (for rollback) + * @param env MasterProcedureEnv + * @throws IOException + **/ + private void restoreTableDescriptor(final MasterProcedureEnv env) throws IOException { + env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor); + + // delete any new column families from the modifiedHTableDescriptor. + deleteFromFs(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor); + + // Make sure regions are opened after table descriptor is updated. + reOpenAllRegionsIfTableIsOnline(env); + } + + /** + * Removes from hdfs the families that are not longer present in the new table descriptor. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void deleteFromFs( + final MasterProcedureEnv env, + final HTableDescriptor oldHTableDescriptor, + final HTableDescriptor newHTableDescriptor) throws IOException { + + try { + final Set oldFamilies = oldHTableDescriptor.getFamiliesKeys(); + final Set newFamilies = newHTableDescriptor.getFamiliesKeys(); + for (byte[] familyName : oldFamilies) { + if (!newFamilies.contains(familyName)) { + MasterDDLOperationHelper.deleteColumnFamilyFromFileSystem( + env, + getTableName(), + regionInfoList, + familyName); + } + } + } catch (IOException e) { + LOG.warn("Unable to remove on-disk directories for the removed families", e); + } + } + + /** + * update replica column families if necessary. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void updateReplicaColumnsIfNeeded( + final MasterProcedureEnv env, + final HTableDescriptor oldHTableDescriptor, + final HTableDescriptor newHTableDescriptor) throws IOException { + final int oldReplicaCount = oldHTableDescriptor.getRegionReplication(); + final int newReplicaCount = newHTableDescriptor.getRegionReplication(); + + if (newReplicaCount >= oldReplicaCount) return; + Set tableRows = new HashSet(); + Connection connection = env.getMasterServices().getConnection(); + Scan scan = MetaTableAccessor.getScanForTableName(connection, getTableName()); + scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + + try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { + ResultScanner resScanner = metaTable.getScanner(scan); + for (Result result : resScanner) { + tableRows.add(result.getRow()); + } + MetaTableAccessor.removeRegionReplicasFromMeta( + tableRows, + newReplicaCount, + oldReplicaCount - newReplicaCount, + connection); + } + + // Setup replication for region replicas if needed + if (newReplicaCount > 1 && oldReplicaCount <= 1) { + ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); + } + } + + /** + * Action after modifying table. + * @param env MasterProcedureEnv + * @param state the procedure state + * @throws IOException + * @throws InterruptedException + */ + private void postModify(final MasterProcedureEnv env, final ModifyTableState state) + throws IOException, InterruptedException { + runCoprocessorAction(env, state); + } + + /** + * Last action from the procedure - executed when online schema change is supported. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException { + // This operation only run when the table is enabled. + if (!env.getMasterServices().getAssignmentManager().getTableStateManager() + .isTableState(getTableName(), TableState.State.ENABLED)) { + return; + } + + if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), regionInfoList)) { + LOG.info("Completed modify table operation on table " + getTableName()); + } else { + LOG.warn("Error on reopening the regions on table " + getTableName()); + } + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return traceEnabled whether the trace is enabled + */ + private Boolean isTraceEnabled() { + if (traceEnabled == null) { + traceEnabled = LOG.isTraceEnabled(); + } + return traceEnabled; + } + + /** + * Coprocessor Action. + * @param env MasterProcedureEnv + * @param state the procedure state + * @throws IOException + * @throws InterruptedException + */ + private void runCoprocessorAction(final MasterProcedureEnv env, final ModifyTableState state) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + user.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + switch (state) { + case MODIFY_TABLE_PRE_OPERATION: + cpHost.preModifyTableHandler(getTableName(), modifiedHTableDescriptor); + break; + case MODIFY_TABLE_POST_OPERATION: + cpHost.postModifyTableHandler(getTableName(), modifiedHTableDescriptor); + break; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + return null; + } + }); + } + } + + /* + * Check whether we are in the state that can be rollback + */ + private boolean isRollbackSupported(final ModifyTableState state) { + if (deleteColumnFamilyInModify) { + switch (state) { + case MODIFY_TABLE_DELETE_FS_LAYOUT: + case MODIFY_TABLE_POST_OPERATION: + case MODIFY_TABLE_REOPEN_ALL_REGIONS: + // It is not safe to rollback if we reach to these states. + return false; + default: + break; + } + } + return true; + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestModifyTableProcedure.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestModifyTableProcedure.java new file mode 100644 index 0000000..88c3bda --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestModifyTableProcedure.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import static org.junit.Assert.assertTrue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ModifyTableState; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MasterTests.class, MediumTests.class}) +public class TestModifyTableProcedure { + private static final Log LOG = LogFactory.getLog(TestModifyTableProcedure.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static void setupConf(Configuration conf) { + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + } + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Before + public void setup() throws Exception { + final ProcedureExecutor procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, false); + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, false); + } + + @After + public void tearDown() throws Exception { + resetProcExecutorTestingKillFlag(); + for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + UTIL.deleteTable(htd.getTableName()); + } + } + + private void resetProcExecutorTestingKillFlag() { + final ProcedureExecutor procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, false); + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, false); + } + + @Test(timeout=60000) + public void testModifyTable() throws Exception { + final TableName tableName = TableName.valueOf("testModifyTable"); + final ProcedureExecutor procExec = getMasterProcedureExecutor(); + + MasterProcedureTestingUtility.createTable(procExec, tableName, null, "cf"); + UTIL.getHBaseAdmin().disableTable(tableName); + + // Modify the table descriptor + HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName)); + + // Test 1: Modify 1 property + long newMaxFileSize = htd.getMaxFileSize() * 2; + htd.setMaxFileSize(newMaxFileSize); + + long procId1 = ProcedureTestingUtility.submitAndWait( + procExec, new ModifyTableProcedure(procExec.getEnvironment(), htd)); + ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId1)); + + HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.getMaxFileSize() == newMaxFileSize); + + // Test 2: Modify multiple properties + boolean newReadOnlyOption = htd.isReadOnly() ? false : true; + long newMemStoreFlushSize = htd.getMemStoreFlushSize() * 2; + htd.setReadOnly(newReadOnlyOption); + htd.setMemStoreFlushSize(newMemStoreFlushSize); + + long procId2 = ProcedureTestingUtility.submitAndWait( + procExec, new ModifyTableProcedure(procExec.getEnvironment(), htd)); + ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId2)); + + currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.isReadOnly() == newReadOnlyOption); + assertTrue(currentHtd.getMemStoreFlushSize() == newMemStoreFlushSize); + } + + @Test(timeout = 60000) + public void testModifyTableAddCF() throws Exception { + final TableName tableName = TableName.valueOf("testModifyTableAddCF"); + final ProcedureExecutor procExec = getMasterProcedureExecutor(); + + MasterProcedureTestingUtility.createTable(procExec, tableName, null, "cf1"); + HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.getFamiliesKeys().size() == 1); + + // Test 1: Modify the table descriptor online + String cf2 = "cf2"; + HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName)); + htd.addFamily(new HColumnDescriptor(cf2)); + + long procId = ProcedureTestingUtility.submitAndWait( + procExec, new ModifyTableProcedure(procExec.getEnvironment(), htd)); + ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); + + currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.getFamiliesKeys().size() == 2); + assertTrue(currentHtd.hasFamily(cf2.getBytes())); + + // Test 2: Modify the table descriptor offline + UTIL.getHBaseAdmin().disableTable(tableName); + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + String cf3 = "cf3"; + HTableDescriptor htd2 = + new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName)); + htd2.addFamily(new HColumnDescriptor(cf3)); + + long procId2 = + ProcedureTestingUtility.submitAndWait(procExec, + new ModifyTableProcedure(procExec.getEnvironment(), htd2)); + ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId2)); + + currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.hasFamily(cf3.getBytes())); + assertTrue(currentHtd.getFamiliesKeys().size() == 3); + } + + @Test(timeout = 60000) + public void testModifyTableDeleteCF() throws Exception { + final TableName tableName = TableName.valueOf("testModifyTableAddCF"); + final String cf2 = "cf2"; + final String cf3 = "cf3"; + final ProcedureExecutor procExec = getMasterProcedureExecutor(); + + MasterProcedureTestingUtility.createTable(procExec, tableName, null, "cf1", cf2, cf3); + HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.getFamiliesKeys().size() == 3); + + // Test 1: Modify the table descriptor + HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName)); + htd.removeFamily(cf2.getBytes()); + + long procId = ProcedureTestingUtility.submitAndWait( + procExec, new ModifyTableProcedure(procExec.getEnvironment(), htd)); + ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); + + currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.getFamiliesKeys().size() == 2); + assertTrue(!currentHtd.hasFamily(cf2.getBytes())); + + // Test 2: Modify the table descriptor offline + UTIL.getHBaseAdmin().disableTable(tableName); + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + + HTableDescriptor htd2 = + new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName)); + htd2.removeFamily(cf3.getBytes()); + + long procId2 = + ProcedureTestingUtility.submitAndWait(procExec, + new ModifyTableProcedure(procExec.getEnvironment(), htd2)); + ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId2)); + + currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.getFamiliesKeys().size() == 1); + assertTrue(!currentHtd.hasFamily(cf3.getBytes())); + } + + @Test(timeout=60000) + public void testRecoveryAndDoubleExecutionOffline() throws Exception { + final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecutionOffline"); + final String cf3 = "cf3"; + final ProcedureExecutor procExec = getMasterProcedureExecutor(); + // create the table + MasterProcedureTestingUtility.createTable(getMasterProcedureExecutor(), tableName, null, "cf1", + cf3); + UTIL.getHBaseAdmin().disableTable(tableName); + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true); + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true); + + // Modify multiple properties of the table. + HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName)); + boolean newCompactionEnableOption = htd.isCompactionEnabled() ? false : true; + htd.setCompactionEnabled(newCompactionEnableOption); + htd.addFamily(new HColumnDescriptor("cf2")); + htd.removeFamily(cf3.getBytes()); + + // Start the Modify procedure && kill the executor + long procId = + procExec.submitProcedure(new ModifyTableProcedure(procExec.getEnvironment(), htd)); + + // Restart the executor and execute the step twice + int numberOfSteps = ModifyTableState.values().length - 1; + MasterProcedureTestingUtility.testRecoveryAndDoubleExecution( + procExec, + procId, + numberOfSteps, + ModifyTableState.values()); + + HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.isCompactionEnabled() == newCompactionEnableOption); + assertTrue(currentHtd.getFamiliesKeys().size() == 2); + } + + @Test(timeout = 60000) + public void testRecoveryAndDoubleExecutionOnline() throws Exception { + final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecutionOnline"); + final String cf2 = "cf2"; + final String cf3 = "cf3"; + final ProcedureExecutor procExec = getMasterProcedureExecutor(); + // create the table + MasterProcedureTestingUtility.createTable(getMasterProcedureExecutor(), tableName, null, "cf1", + cf3); + + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true); + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true); + + // Modify multiple properties of the table. + HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName)); + boolean newCompactionEnableOption = htd.isCompactionEnabled() ? false : true; + htd.setCompactionEnabled(newCompactionEnableOption); + htd.addFamily(new HColumnDescriptor("cf2")); + htd.removeFamily(cf3.getBytes()); + + // Start the Modify procedure && kill the executor + long procId = + procExec.submitProcedure(new ModifyTableProcedure(procExec.getEnvironment(), htd)); + + // Restart the executor and execute the step twice + int numberOfSteps = ModifyTableState.values().length - 1; + MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId, numberOfSteps, + ModifyTableState.values()); + + HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName); + assertTrue(currentHtd.isCompactionEnabled() == newCompactionEnableOption); + assertTrue(currentHtd.getFamiliesKeys().size() == 2); + assertTrue(currentHtd.hasFamily(cf2.getBytes())); + assertTrue(!currentHtd.hasFamily(cf3.getBytes())); + } + + @Test(timeout = 60000) + public void testRollbackAndDoubleExecution() throws Exception { + final TableName tableName = TableName.valueOf("testRollbackAndDoubleExecution"); + final String familyName = "cf2"; + final ProcedureExecutor procExec = getMasterProcedureExecutor(); + // create the table + MasterProcedureTestingUtility.createTable(getMasterProcedureExecutor(), tableName, null, "cf1"); + + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true); + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true); + + HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName)); + boolean newCompactionEnableOption = htd.isCompactionEnabled() ? false : true; + htd.setCompactionEnabled(newCompactionEnableOption); + htd.addFamily(new HColumnDescriptor(familyName)); + + // Start the Modify procedure && kill the executor + long procId = + procExec.submitProcedure(new ModifyTableProcedure(procExec.getEnvironment(), htd)); + + // Restart the executor and rollback the step twice + int numberOfSteps = ModifyTableState.values().length - 4; // failing in the middle of proc + MasterProcedureTestingUtility.testRollbackAndDoubleExecution( + procExec, + procId, + numberOfSteps, + ModifyTableState.values()); + } + + private ProcedureExecutor getMasterProcedureExecutor() { + return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + } +}