From b8fd57d8e9557028a96eeaf15ccb9d5a7659603a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 27 Mar 2018 16:06:00 +0800 Subject: [PATCH] HBASE-20191 Cleanup replication barrier for delete table in the chore --- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 42 +++--- .../src/main/protobuf/MasterProcedure.proto | 4 + .../hbase/replication/ReplicationQueueStorage.java | 9 ++ .../replication/ZKReplicationQueueStorage.java | 15 ++ .../master/procedure/DeleteTableProcedure.java | 154 +++++++++++++++------ .../master/procedure/ModifyTableProcedure.java | 92 ++++++------ .../master/procedure/TruncateTableProcedure.java | 13 +- .../hadoop/hbase/client/TestEnableTable.java | 8 +- .../replication/TestSerialReplicationFailover.java | 9 +- .../TestSerialReplicationModifyTable.java | 114 +++++++++++++++ 10 files changed, 345 insertions(+), 115 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationModifyTable.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 4cc46c8..73d7753 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -523,25 +523,24 @@ public class MetaTableAccessor { } /** - * This method creates a Scan object that will only scan catalog rows that - * belong to the specified table. It doesn't specify any columns. - * This is a better alternative to just using a start row and scan until - * it hits a new table since that requires parsing the HRI to get the table - * name. + *

+ * This method creates a Scan object that will only scan catalog rows that belong to the specified + * table. It doesn't specify any columns. + *

+ *

+ * This is a better alternative to just using a start row and scan until it hits a new table since + * that requires parsing the HRI to get the table name. + *

* @param tableName bytes of table's name * @return configured Scan object */ - @Deprecated - public static Scan getScanForTableName(Connection connection, TableName tableName) { + public static Scan getScanForTable(Connection connection, TableName tableName, + QueryType queryType) { // Start key is just the table name with delimiters - byte[] startKey = getTableStartRowForMeta(tableName, QueryType.REGION); + byte[] startKey = getTableStartRowForMeta(tableName, queryType); // Stop key appends the smallest possible char to the table name - byte[] stopKey = getTableStopRowForMeta(tableName, QueryType.REGION); - - Scan scan = getMetaScan(connection, -1); - scan.setStartRow(startKey); - scan.setStopRow(stopKey); - return scan; + byte[] stopKey = getTableStopRowForMeta(tableName, queryType); + return getMetaScan(connection, -1).withStartRow(startKey).withStopRow(stopKey); } private static Scan getMetaScan(Connection connection, int rowUpperLimit) { @@ -1831,7 +1830,7 @@ public class MetaTableAccessor { * @param connection connection we're using * @param regionsInfo list of regions to be deleted from META */ - public static void deleteRegions(Connection connection, List regionsInfo, long ts) + private static void deleteRegions(Connection connection, List regionsInfo, long ts) throws IOException { List deletes = new ArrayList<>(regionsInfo.size()); for (RegionInfo hri : regionsInfo) { @@ -1973,7 +1972,7 @@ public class MetaTableAccessor { byte[] value = getParentsBytes(parents); put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER) - .setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build()); + .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build()); } private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts) @@ -1988,7 +1987,7 @@ public class MetaTableAccessor { .setRow(put.getRow()) .setFamily(HConstants.REPLICATION_BARRIER_FAMILY) .setQualifier(HConstants.SEQNUM_QUALIFIER) - .setTimestamp(put.getTimeStamp()) + .setTimestamp(put.getTimestamp()) .setType(Type.Put) .setValue(Bytes.toBytes(openSeqNum)) .build()); @@ -2128,6 +2127,15 @@ public class MetaTableAccessor { return list; } + public static void deleteReplicationBarriers(Connection conn, List regions) + throws IOException { + long ts = EnvironmentEdgeManager.currentTime(); + List deletes = regions.stream().map(r -> r.getRegionName()).map(Delete::new) + .map(d -> d.addFamily(HConstants.REPLICATION_BARRIER_FAMILY, ts)) + .collect(Collectors.toList()); + deleteFromMetaTable(conn, deletes); + } + private static void debugLogMutations(List mutations) throws IOException { if (!METALOG.isDebugEnabled()) { return; diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index b37557c..22a14b1 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -71,6 +71,7 @@ enum ModifyTableState { MODIFY_TABLE_DELETE_FS_LAYOUT = 5; MODIFY_TABLE_POST_OPERATION = 6; MODIFY_TABLE_REOPEN_ALL_REGIONS = 7; + MODIFY_TABLE_CLEANUP_REPLICATION_STUFFS = 8; } message ModifyTableStateData { @@ -88,6 +89,7 @@ enum TruncateTableState { TRUNCATE_TABLE_ADD_TO_META = 5; TRUNCATE_TABLE_ASSIGN_REGIONS = 6; TRUNCATE_TABLE_POST_OPERATION = 7; + TRUNCATE_TABLE_CLEANUP_REPLICATION_STUFFS = 8; } message TruncateTableStateData { @@ -105,12 +107,14 @@ enum DeleteTableState { DELETE_TABLE_UPDATE_DESC_CACHE = 4; DELETE_TABLE_UNASSIGN_REGIONS = 5; DELETE_TABLE_POST_OPERATION = 6; + DELETE_TABLE_CLEANUP_REPLICATION_STUFFS = 7; } message DeleteTableStateData { required UserInformation user_info = 1; required TableName table_name = 2; repeated RegionInfo region_info = 3; + optional TableSchema table_descriptor = 4; } enum CreateNamespaceState { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index cd37ac2..84653ad 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -91,6 +91,15 @@ public interface ReplicationQueueStorage { * @param peerId peer id */ void removeLastSequenceIds(String peerId) throws ReplicationException; + + /** + * Remove the max sequence id record for the given peer and regions. + * @param peerId peer id + * @param encodedRegionNames the encoded region names + */ + void removeLastSequenceIds(String peerId, List encodedRegionNames) + throws ReplicationException; + /** * Get the current position for a specific WAL in a given queue for a given regionserver. * @param serverName the name of the regionserver diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index a629da3..479812f 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -348,6 +349,20 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } @Override + public void removeLastSequenceIds(String peerId, List encodedRegionNames) + throws ReplicationException { + try { + List listOfOps = + encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId)) + .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList()); + ZKUtil.multiOrSequential(zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId + + ", encodedRegionNames.size=" + encodedRegionNames.size(), e); + } + } + + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { byte[] bytes; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index 487bb27..c408f5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -21,11 +21,12 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; @@ -37,17 +38,23 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @@ -58,8 +65,11 @@ public class DeleteTableProcedure extends AbstractStateMachineTableProcedure { private static final Logger LOG = LoggerFactory.getLogger(DeleteTableProcedure.class); + private static final int REMOVE_LAST_SEQ_ID_BATCH_SIZE = 1000; + private List regions; private TableName tableName; + private TableDescriptor tableDesc; public DeleteTableProcedure() { // Required by the Procedure framework to create the procedure on replay @@ -79,9 +89,7 @@ public class DeleteTableProcedure @Override protected Flow executeFromState(final MasterProcedureEnv env, DeleteTableState state) throws InterruptedException { - if (LOG.isTraceEnabled()) { - LOG.trace(this + " execute state=" + state); - } + LOG.trace("{} execute state={}", this, state); try { switch (state) { case DELETE_TABLE_PRE_OPERATION: @@ -94,45 +102,52 @@ public class DeleteTableProcedure } // TODO: Move out... in the acquireLock() - LOG.debug("Waiting for '" + getTableName() + "' regions in transition"); + LOG.debug("Waiting for '{}' regions in transition", getTableName()); regions = env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName()); assert regions != null && !regions.isEmpty() : "unexpected 0 regions"; ProcedureSyncWait.waitRegionInTransition(env, regions); - // Call coprocessors preDelete(env); setNextState(DeleteTableState.DELETE_TABLE_REMOVE_FROM_META); break; case DELETE_TABLE_REMOVE_FROM_META: - LOG.debug("delete '" + getTableName() + "' regions from META"); - DeleteTableProcedure.deleteFromMeta(env, getTableName(), regions); + LOG.debug("delete '{}' regions from META", getTableName()); + tableDesc = env.getMasterServices().getTableDescriptors().get(tableName); + deleteFromMeta(env, getTableName(), regions); + setNextState(DeleteTableState.DELETE_TABLE_CLEANUP_REPLICATION_STUFFS); + break; + case DELETE_TABLE_CLEANUP_REPLICATION_STUFFS: + LOG.debug("delete '{}' replication stuffs", getTableName()); + if (tableDesc.hasGlobalReplicationScope()) { + deleteReplicationBarriersAndLastPushedSeqIds(env, getTableName(), regions); + } setNextState(DeleteTableState.DELETE_TABLE_CLEAR_FS_LAYOUT); break; case DELETE_TABLE_CLEAR_FS_LAYOUT: - LOG.debug("delete '" + getTableName() + "' from filesystem"); - DeleteTableProcedure.deleteFromFs(env, getTableName(), regions, true); - setNextState(DeleteTableState.DELETE_TABLE_UPDATE_DESC_CACHE); + LOG.debug("delete '{}' from filesystem", getTableName()); + deleteFromFs(env, getTableName(), regions, true); regions = null; + setNextState(DeleteTableState.DELETE_TABLE_UPDATE_DESC_CACHE); break; case DELETE_TABLE_UPDATE_DESC_CACHE: - LOG.debug("delete '" + getTableName() + "' descriptor"); - DeleteTableProcedure.deleteTableDescriptorCache(env, getTableName()); + LOG.debug("delete '{}' descriptor", getTableName()); + deleteTableDescriptorCache(env, getTableName()); setNextState(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS); break; case DELETE_TABLE_UNASSIGN_REGIONS: - LOG.debug("delete '" + getTableName() + "' assignment state"); - DeleteTableProcedure.deleteAssignmentState(env, getTableName()); + LOG.debug("delete '{}' assignment state", getTableName()); + deleteAssignmentState(env, getTableName()); setNextState(DeleteTableState.DELETE_TABLE_POST_OPERATION); break; case DELETE_TABLE_POST_OPERATION: postDelete(env); - LOG.debug("delete '" + getTableName() + "' completed"); + LOG.debug("delete '{}' completed", getTableName()); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); } - } catch (IOException e) { + } catch (IOException | ReplicationException e) { if (isRollbackSupported(state)) { setFailure("master-delete-table", e); } else { @@ -177,7 +192,7 @@ public class DeleteTableProcedure @Override protected DeleteTableState getState(final int stateId) { - return DeleteTableState.valueOf(stateId); + return DeleteTableState.forNumber(stateId); } @Override @@ -214,6 +229,9 @@ public class DeleteTableProcedure state.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); } } + if (tableDesc != null) { + state.setTableDescriptor(ProtobufUtil.toTableSchema(tableDesc)); + } serializer.serialize(state.build()); } @@ -234,6 +252,9 @@ public class DeleteTableProcedure regions.add(ProtobufUtil.toRegionInfo(hri)); } } + if (state.hasTableDescriptor()) { + tableDesc = ProtobufUtil.toTableDescriptor(state.getTableDescriptor()); + } } private boolean prepareDelete(final MasterProcedureEnv env) throws IOException { @@ -267,7 +288,7 @@ public class DeleteTableProcedure } } - protected static void deleteFromFs(final MasterProcedureEnv env, + static void deleteFromFs(final MasterProcedureEnv env, final TableName tableName, final List regions, final boolean archive) throws IOException { final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); @@ -297,7 +318,9 @@ public class DeleteTableProcedure FileStatus[] files = fs.listStatus(tempdir); if (files != null && files.length > 0) { for (int i = 0; i < files.length; ++i) { - if (!files[i].isDir()) continue; + if (!files[i].isDirectory()) { + continue; + } HFileArchiver.archiveRegion(fs, mfs.getRootDir(), tempTableDir, files[i].getPath()); } } @@ -310,18 +333,17 @@ public class DeleteTableProcedure // Archive regions from FS (temp directory) if (archive) { for (RegionInfo hri : regions) { - LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS"); + LOG.debug("Archiving region {} from FS", hri.getRegionNameAsString()); HFileArchiver.archiveRegion(fs, mfs.getRootDir(), tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName())); } - LOG.debug("Table '" + tableName + "' archived!"); + LOG.debug("Table '{}' archived!", tableName); } // Archive mob data - Path mobTableDir = FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), - tableName); - Path regionDir = - new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName()); + Path mobTableDir = + FSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), tableName); + Path regionDir = new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName()); if (fs.exists(regionDir)) { HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir); } @@ -340,15 +362,15 @@ public class DeleteTableProcedure } /** - * There may be items for this table still up in hbase:meta in the case where the - * info:regioninfo column was empty because of some write error. Remove ALL rows from hbase:meta - * that have to do with this table. See HBASE-12980. - * @throws IOException + * There may be items for this table still up in hbase:meta in the case where the info:regioninfo + * column was empty because of some write error. Remove ALL rows from hbase:meta that have to do + * with this table. See HBASE-12980. */ - private static void cleanAnyRemainingRows(final MasterProcedureEnv env, - final TableName tableName) throws IOException { + private static void cleanAnyRemainingRows(final MasterProcedureEnv env, final TableName tableName) + throws IOException { Connection connection = env.getMasterServices().getConnection(); - Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName); + Scan tableScan = MetaTableAccessor.getScanForTable(connection, tableName, QueryType.REGION) + .addFamily(HConstants.CATALOG_FAMILY).addFamily(HConstants.TABLE_FAMILY); try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { List deletes = new ArrayList<>(); try (ResultScanner resScanner = metaTable.getScanner(tableScan)) { @@ -357,14 +379,14 @@ public class DeleteTableProcedure } } if (!deletes.isEmpty()) { - LOG.warn("Deleting some vestigial " + deletes.size() + " rows of " + tableName + - " from " + TableName.META_TABLE_NAME); + LOG.warn("Deleting some vestigial {} rows of {} from {}", deletes.size(), tableName, + TableName.META_TABLE_NAME); metaTable.delete(deletes); } } } - protected static void deleteFromMeta(final MasterProcedureEnv env, + static void deleteFromMeta(final MasterProcedureEnv env, final TableName tableName, List regions) throws IOException { MetaTableAccessor.deleteRegions(env.getMasterServices().getConnection(), regions); @@ -381,27 +403,75 @@ public class DeleteTableProcedure } } - protected static void deleteAssignmentState(final MasterProcedureEnv env, + static void deleteAssignmentState(final MasterProcedureEnv env, final TableName tableName) throws IOException { // Clean up regions of the table in RegionStates. - LOG.debug("Removing '" + tableName + "' from region states."); + LOG.debug("Removing '{}' from region states.", tableName); env.getMasterServices().getAssignmentManager().deleteTable(tableName); // If entry for this table states, remove it. - LOG.debug("Marking '" + tableName + "' as deleted."); + LOG.debug("Marking '{}' as deleted.", tableName); env.getMasterServices().getTableStateManager().setDeletedTable(tableName); } - protected static void deleteTableDescriptorCache(final MasterProcedureEnv env, + private static void deleteTableDescriptorCache(final MasterProcedureEnv env, final TableName tableName) throws IOException { - LOG.debug("Removing '" + tableName + "' descriptor."); + LOG.debug("Removing '{}' descriptor.", tableName); env.getMasterServices().getTableDescriptors().remove(tableName); } - protected static void deleteTableStates(final MasterProcedureEnv env, final TableName tableName) + static void deleteTableStates(final MasterProcedureEnv env, final TableName tableName) throws IOException { if (!tableName.isSystemTable()) { ProcedureSyncWait.getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName); } } + + private static void addToList(List encodedRegionNames, String encodedRegionName, + List peerIds, ReplicationQueueStorage queueStorage) throws ReplicationException { + encodedRegionNames.add(encodedRegionName); + if (encodedRegionNames.size() >= REMOVE_LAST_SEQ_ID_BATCH_SIZE) { + for (String peerId : peerIds) { + queueStorage.removeLastSequenceIds(peerId, encodedRegionNames); + } + encodedRegionNames.clear(); + } + } + + static void deleteReplicationBarriersAndLastPushedSeqIds(MasterProcedureEnv env, + TableName tableName, List regions) throws IOException, ReplicationException { + Connection conn = env.getMasterServices().getConnection(); + MetaTableAccessor.deleteReplicationBarriers(conn, regions); + ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); + List peerIds = env.getReplicationPeerManager().getSerialPeerIdsBelongsTo(tableName); + List encodedRegionNames = new ArrayList<>(); + for (RegionInfo region : regions) { + addToList(encodedRegionNames, region.getEncodedName(), peerIds, queueStorage); + } + // There could be some records left since we may still have barriers for split/merged regions + Scan scan = MetaTableAccessor.getScanForTable(conn, tableName, QueryType.REPLICATION) + .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()); + long ts = EnvironmentEdgeManager.currentTime(); + try (Table table = MetaTableAccessor.getMetaHTable(conn); + ResultScanner scanner = table.getScanner(scan)) { + List deletes = new ArrayList<>(); + for (;;) { + Result result = scanner.next(); + if (result == null) { + break; + } + byte[] row = result.getRow(); + deletes.add(new Delete(row).addFamily(HConstants.REPLICATION_BARRIER_FAMILY, ts)); + addToList(encodedRegionNames, RegionInfo.encodeRegionName(row), peerIds, queueStorage); + } + if (!deletes.isEmpty()) { + table.delete(deletes); + } + } + if (!encodedRegionNames.isEmpty()) { + for (String peerId : peerIds) { + queueStorage.removeLastSequenceIds(peerId, encodedRegionNames); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index 1f1ba3c..3588458 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Connection; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -94,44 +96,51 @@ public class ModifyTableProcedure try { switch (state) { - case MODIFY_TABLE_PREPARE: - prepareModify(env); - setNextState(ModifyTableState.MODIFY_TABLE_PRE_OPERATION); - break; - case MODIFY_TABLE_PRE_OPERATION: - preModify(env, state); - setNextState(ModifyTableState.MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR); - break; - case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR: - updateTableDescriptor(env); - setNextState(ModifyTableState.MODIFY_TABLE_REMOVE_REPLICA_COLUMN); - break; - case MODIFY_TABLE_REMOVE_REPLICA_COLUMN: - updateReplicaColumnsIfNeeded(env, unmodifiedTableDescriptor, modifiedTableDescriptor); - if (deleteColumnFamilyInModify) { - setNextState(ModifyTableState.MODIFY_TABLE_DELETE_FS_LAYOUT); - } else { + case MODIFY_TABLE_PREPARE: + prepareModify(env); + setNextState(ModifyTableState.MODIFY_TABLE_PRE_OPERATION); + break; + case MODIFY_TABLE_PRE_OPERATION: + preModify(env, state); + setNextState(ModifyTableState.MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR); + break; + case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR: + updateTableDescriptor(env); + setNextState(ModifyTableState.MODIFY_TABLE_REMOVE_REPLICA_COLUMN); + break; + case MODIFY_TABLE_REMOVE_REPLICA_COLUMN: + updateReplicaColumnsIfNeeded(env, unmodifiedTableDescriptor, modifiedTableDescriptor); + if (deleteColumnFamilyInModify) { + setNextState(ModifyTableState.MODIFY_TABLE_DELETE_FS_LAYOUT); + } else { + setNextState(ModifyTableState.MODIFY_TABLE_CLEANUP_REPLICATION_STUFFS); + } + break; + case MODIFY_TABLE_DELETE_FS_LAYOUT: + deleteFromFs(env, unmodifiedTableDescriptor, modifiedTableDescriptor); + setNextState(ModifyTableState.MODIFY_TABLE_CLEANUP_REPLICATION_STUFFS); + break; + case MODIFY_TABLE_CLEANUP_REPLICATION_STUFFS: + if (unmodifiedTableDescriptor.hasGlobalReplicationScope() && + !modifiedTableDescriptor.hasGlobalReplicationScope()) { + DeleteTableProcedure.deleteReplicationBarriersAndLastPushedSeqIds(env, getTableName(), getRegionInfoList(env)); + } setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION); - } - break; - case MODIFY_TABLE_DELETE_FS_LAYOUT: - deleteFromFs(env, unmodifiedTableDescriptor, modifiedTableDescriptor); - setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION); - break; - case MODIFY_TABLE_POST_OPERATION: - postModify(env, state); - setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS); - break; - case MODIFY_TABLE_REOPEN_ALL_REGIONS: - if (env.getAssignmentManager().isTableEnabled(getTableName())) { - addChildProcedure(env.getAssignmentManager() - .createReopenProcedures(getRegionInfoList(env))); - } - return Flow.NO_MORE_STATE; - default: - throw new UnsupportedOperationException("unhandled state=" + state); + break; + case MODIFY_TABLE_POST_OPERATION: + postModify(env, state); + setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS); + break; + case MODIFY_TABLE_REOPEN_ALL_REGIONS: + if (env.getAssignmentManager().isTableEnabled(getTableName())) { + addChildProcedure( + env.getAssignmentManager().createReopenProcedures(getRegionInfoList(env))); + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); } - } catch (IOException e) { + } catch (IOException | ReplicationException e) { if (isRollbackSupported(state)) { setFailure("master-modify-table", e); } else { @@ -174,7 +183,7 @@ public class ModifyTableProcedure @Override protected ModifyTableState getState(final int stateId) { - return ModifyTableState.valueOf(stateId); + return ModifyTableState.forNumber(stateId); } @Override @@ -347,7 +356,7 @@ public class ModifyTableProcedure if (newReplicaCount < oldReplicaCount) { Set tableRows = new HashSet<>(); Connection connection = env.getMasterServices().getConnection(); - Scan scan = MetaTableAccessor.getScanForTableName(connection, getTableName()); + Scan scan = MetaTableAccessor.getScanForTable(connection, getTableName(), QueryType.REGION); scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { @@ -355,11 +364,8 @@ public class ModifyTableProcedure for (Result result : resScanner) { tableRows.add(result.getRow()); } - MetaTableAccessor.removeRegionReplicasFromMeta( - tableRows, - newReplicaCount, - oldReplicaCount - newReplicaCount, - connection); + MetaTableAccessor.removeRegionReplicasFromMeta(tableRows, newReplicaCount, + oldReplicaCount - newReplicaCount, connection); } } if (newReplicaCount > oldReplicaCount) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java index 4b2c21f..489b853 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.util.ModifyRegionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -47,6 +48,7 @@ public class TruncateTableProcedure private static final Logger LOG = LoggerFactory.getLogger(TruncateTableProcedure.class); private boolean preserveSplits; + private List oldRegions; private List regions; private TableDescriptor tableDescriptor; private TableName tableName; @@ -102,6 +104,13 @@ public class TruncateTableProcedure .get(tableName); DeleteTableProcedure.deleteFromMeta(env, getTableName(), regions); DeleteTableProcedure.deleteAssignmentState(env, getTableName()); + setNextState(TruncateTableState.TRUNCATE_TABLE_CLEANUP_REPLICATION_STUFFS); + break; + case TRUNCATE_TABLE_CLEANUP_REPLICATION_STUFFS: + if (tableDescriptor.hasGlobalReplicationScope()) { + DeleteTableProcedure.deleteReplicationBarriersAndLastPushedSeqIds(env, getTableName(), + regions); + } setNextState(TruncateTableState.TRUNCATE_TABLE_CLEAR_FS_LAYOUT); break; case TRUNCATE_TABLE_CLEAR_FS_LAYOUT: @@ -144,7 +153,7 @@ public class TruncateTableProcedure default: throw new UnsupportedOperationException("unhandled state=" + state); } - } catch (IOException e) { + } catch (IOException | ReplicationException e) { if (isRollbackSupported(state)) { setFailure("master-truncate-table", e); } else { @@ -184,7 +193,7 @@ public class TruncateTableProcedure @Override protected TruncateTableState getState(final int stateId) { - return TruncateTableState.valueOf(stateId); + return TruncateTableState.forNumber(stateId); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java index 3b807aa..ad08d27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.MetaTableAccessor.QueryType; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; @@ -166,7 +167,8 @@ public class TestEnableTable { // content from a few of the rows. try (Table metaTable = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { try (ResultScanner scanner = metaTable.getScanner( - MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { + MetaTableAccessor.getScanForTable(TEST_UTIL.getConnection(), tableName, + QueryType.REGION))) { for (Result result : scanner) { // Just delete one row. Delete d = new Delete(result.getRow()); @@ -186,8 +188,8 @@ public class TestEnableTable { fail("Got an exception while deleting " + tableName); } int rowCount = 0; - try (ResultScanner scanner = - metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { + try (ResultScanner scanner = metaTable.getScanner(MetaTableAccessor + .getScanForTable(TEST_UTIL.getConnection(), tableName, QueryType.REGION))) { for (Result result : scanner) { LOG.info("Found when none expected: " + result); rowCount++; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java index 324a69f..c07533d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java @@ -19,12 +19,9 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -51,11 +48,7 @@ public class TestSerialReplicationFailover extends SerialReplicationTestBase { @Test public void testKillRS() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); - UTIL.getAdmin().createTable( - TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); - UTIL.waitTableAvailable(tableName); + TableName tableName = createTable(); try (Table table = UTIL.getConnection().getTable(tableName)) { for (int i = 0; i < 100; i++) { table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationModifyTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationModifyTable.java new file mode 100644 index 0000000..9db079a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationModifyTable.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.function.Consumer; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestSerialReplicationModifyTable extends SerialReplicationTestBase { + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + setupWALWriter(); + addPeer(true); + } + + private void testModifyTable(Consumer modifyTable) throws Exception { + TableName tableName = createTable(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + UTIL.getAdmin().disableReplicationPeer(PEER_ID); + String encodedRegionName = + UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName(); + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0); + modifyTable.accept(tableName); + // confirm that we delete the last pushed sequence id + assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID)); + } + + @Test + public void testDeleteTable() throws Exception { + testModifyTable(tn -> { + try { + UTIL.getAdmin().disableTable(tn); + UTIL.getAdmin().deleteTable(tn); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + @Test + public void testTruncateTable() throws Exception { + testModifyTable(tn -> { + try { + UTIL.getAdmin().disableTable(tn); + UTIL.getAdmin().truncateTable(tn, false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + @Test + public void testTruncateTablePreserveSplits() throws Exception { + testModifyTable(tn -> { + try { + UTIL.getAdmin().disableTable(tn); + UTIL.getAdmin().truncateTable(tn, true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + @Test + public void testRemoveGlobalScope() throws Exception { + testModifyTable(tn -> { + try { + UTIL.getAdmin().modifyColumnFamily(tn, ColumnFamilyDescriptorBuilder.of(CF)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } +} -- 2.7.4