From 5650b67181b3802e4df21be015dcf189b66b30ff Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Thu, 26 Apr 2018 18:06:40 +0800 Subject: [PATCH] add a replication barrier cleaner --- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 120 ++++++++++++++--- .../TestHBaseFsckCleanReplicationBarriers.java | 142 +++++++++++++++++++++ .../hadoop/hbase/util/hbck/HbckTestingUtil.java | 20 ++- 3 files changed, 262 insertions(+), 20 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 9fcf320..adb47ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -99,7 +100,9 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -115,6 +118,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; @@ -268,11 +275,13 @@ public class HBaseFsck extends Configured implements Closeable { private boolean fixHFileLinks = false; // fix lingering HFileLinks private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows private boolean fixReplication = false; // fix undeleted replication queues for removed peer + private boolean cleanReplicationBarrier = false; // clean replication barriers of a table private boolean fixAny = false; // Set to true if any of the fix is required. // limit checking/fixes to listed tables, if empty attempt to check/fix all // hbase:meta are always checked private Set tablesIncluded = new HashSet<>(); + private TableName cleanReplicationBarrierTable; private int maxMerge = DEFAULT_MAX_MERGE; // maximum number of overlapping regions to merge // maximum number of overlapping regions to sideline private int maxOverlapsToSideline = DEFAULT_OVERLAPS_TO_SIDELINE; @@ -786,6 +795,8 @@ public class HBaseFsck extends Configured implements Closeable { checkAndFixReplication(); + cleanReplicationBarrier(); + // Remove the hbck znode cleanupHbckZnode(); @@ -4118,14 +4129,13 @@ public class HBaseFsck extends Configured implements Closeable { enum ERROR_CODE { UNKNOWN, NO_META_REGION, NULL_META_REGION, NO_VERSION_FILE, NOT_IN_META_HDFS, NOT_IN_META, NOT_IN_META_OR_DEPLOYED, NOT_IN_HDFS_OR_DEPLOYED, NOT_IN_HDFS, SERVER_DOES_NOT_MATCH_META, - NOT_DEPLOYED, - MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE, + NOT_DEPLOYED, MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE, FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS, HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, LINGERING_HFILELINK, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS, - UNSUPPORTED_OPTION + UNSUPPORTED_OPTION, INVALID_TABLE } void clear(); void report(String message); @@ -4557,6 +4567,10 @@ public class HBaseFsck extends Configured implements Closeable { fixAny |= shouldFix; } + public void setCleanReplicationBarrier(boolean shouldClean) { + cleanReplicationBarrier = shouldClean; + } + /** * Check if we should rerun fsck again. This checks if we've tried to * fix something and we should rerun fsck tool again. @@ -4567,7 +4581,7 @@ public class HBaseFsck extends Configured implements Closeable { rerun = true; } - boolean shouldRerun() { + public boolean shouldRerun() { return rerun; } @@ -4848,7 +4862,11 @@ public class HBaseFsck extends Configured implements Closeable { "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles" + "-fixHFileLinks"); out.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles"); - + out.println(""); + out.println(" Replication options"); + out.println(" -fixReplication Deletes replication queues for removed peers"); + out.println(" -cleanReplicationBrarier [tableName] clean the replication barriers " + + "of a specified table, tableName is required"); out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -4908,13 +4926,12 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - long timelag = Long.parseLong(args[i+1]); + long timelag = Long.parseLong(args[++i]); setTimeLag(timelag); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-timelag needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sleepBeforeRerun")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -4922,19 +4939,17 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - sleepBeforeRerun = Long.parseLong(args[i+1]); + sleepBeforeRerun = Long.parseLong(args[++i]); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-sleepBeforeRerun needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sidelineDir")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, "HBaseFsck: -sidelineDir needs a value."); return printUsageAndExit(); } - i++; - setSidelineDir(args[i]); + setSidelineDir(args[++i]); } else if (cmd.equals("-fix")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "This option is deprecated, please use -fixAssignments instead."); @@ -5004,14 +5019,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxOverlapsToSideline = Integer.parseInt(args[i+1]); + int maxOverlapsToSideline = Integer.parseInt(args[++i]); setMaxOverlapsToSideline(maxOverlapsToSideline); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxOverlapsToSideline needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-maxMerge")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -5019,14 +5033,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxMerge = Integer.parseInt(args[i+1]); + int maxMerge = Integer.parseInt(args[++i]); setMaxMerge(maxMerge); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxMerge needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-summary")) { setSummary(); } else if (cmd.equals("-metaonly")) { @@ -5035,6 +5048,12 @@ public class HBaseFsck extends Configured implements Closeable { setRegionBoundariesCheck(); } else if (cmd.equals("-fixReplication")) { setFixReplication(true); + } else if (cmd.equals("-cleanReplicationBarrier")) { + setCleanReplicationBarrier(true); + if(args[++i].startsWith("-")){ + printUsageAndExit(); + } + setCleanReplicationBarrierTable(args[i]); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); @@ -5120,7 +5139,7 @@ public class HBaseFsck extends Configured implements Closeable { boolean result = true; String hbaseServerVersion = status.getHBaseVersion(); Object[] versionComponents = VersionInfo.getVersionComponents(hbaseServerVersion); - if (versionComponents[0] instanceof Integer && ((Integer)versionComponents[0]) >= 2) { + if (versionComponents[0] instanceof Integer && ((Integer) versionComponents[0]) >= 2) { // Process command-line args. for (String arg : args) { if (unsupportedOptionsInV2.contains(arg)) { @@ -5134,6 +5153,75 @@ public class HBaseFsck extends Configured implements Closeable { return result; } + public void setCleanReplicationBarrierTable(String cleanReplicationBarrierTable) { + this.cleanReplicationBarrierTable = TableName.valueOf(cleanReplicationBarrierTable); + } + + public void cleanReplicationBarrier() throws IOException { + if (!cleanReplicationBarrier || cleanReplicationBarrierTable == null) { + return; + } + if (cleanReplicationBarrierTable.isSystemTable() + || admin.getDescriptor(cleanReplicationBarrierTable).hasGlobalReplicationScope()) { + errors.reportError(ERROR_CODE.INVALID_TABLE, + "invalid table: " + cleanReplicationBarrierTable); + return; + } + boolean cleaned = false; + + List regionNames = new ArrayList<>(); + Scan barrierScan = new Scan(); + barrierScan.setCaching(100); + barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); + barrierScan + .withStartRow(MetaTableAccessor.getTableStartRowForMeta(cleanReplicationBarrierTable, + MetaTableAccessor.QueryType.REGION)) + .withStopRow(MetaTableAccessor.getTableStopRowForMeta(cleanReplicationBarrierTable, + MetaTableAccessor.QueryType.REGION)); + Result result; + try (ResultScanner scanner = meta.getScanner(barrierScan)) { + while ((result = scanner.next()) != null) { + regionNames.add(result.getRow()); + } + } + if (regionNames.size() > 0) { + cleaned = true; + } + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); + List peerDescriptions = admin.listReplicationPeers(); + if (peerDescriptions != null && peerDescriptions.size() > 0) { + List peers = peerDescriptions.stream() + .filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(), + cleanReplicationBarrierTable)) + .map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList()); + try { + List batch = new ArrayList<>(); + for (String peer : peers) { + for (byte[] regionName : regionNames) { + batch.add(RegionInfo.encodeRegionName(regionName)); + if (batch.size() % 100 == 0) { + queueStorage.removeLastSequenceIds(peer, batch); + batch.clear(); + } + } + if (batch.size() > 0) { + queueStorage.removeLastSequenceIds(peer, batch); + batch.clear(); + } + } + } catch (ReplicationException re) { + throw new IOException(re); + } + } + for (byte[] regionName : regionNames) { + meta.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); + } + if (cleaned) { + setShouldRerun(); + } + } + /** * ls -r for debugging purposes */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java new file mode 100644 index 0000000..dce827e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestHBaseFsckCleanReplicationBarriers { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHBaseFsckCleanReplicationBarriers.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_1 = "1", PEER_2 = "2"; + + private static ReplicationQueueStorage QUEUE_STORAGE; + + private static String WAL_FILE_NAME = "test.wal"; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + ReplicationPeerStorage peerStorage = ReplicationStorageFactory + .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), + UTIL.getConfiguration()); + createPeer(); + QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_1, + WAL_FILE_NAME); + QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_2, + WAL_FILE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testCleanReplicationBarrier() throws Exception { + TableName tableName = TableName.valueOf("test"); + String cf = "info"; + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build()) + .setReplicationScope(HConstants.REPLICATION_SCOPE_LOCAL).build(); + UTIL.createTable(tableDescriptor, Bytes.split(Bytes.toBytes(1), Bytes.toBytes(256), 123)); + assertTrue(UTIL.getAdmin().getRegions(tableName).size() > 0); + for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { + addStateAndBarrier(region, RegionState.State.OFFLINE, 10, 100); + updatePushedSeqId(region, 10); + assertEquals("check if there is lastPushedId", 10, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1)); + assertEquals("check if there is lastPushedId", 10, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2)); + } + boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertTrue(cleaned); + for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { + assertEquals("check if there is lastPushedId", -1, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1)); + assertEquals("check if there is lastPushedId", -1, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2)); + } + cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertFalse(cleaned); + for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) { + assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), + region.getRegionName()).length); + } + } + + public static void createPeer() throws IOException { + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL.getClusterKey()).setSerial(true).build(); + UTIL.getAdmin().addReplicationPeer(PEER_1, rpc); + UTIL.getAdmin().addReplicationPeer(PEER_2, rpc); + } + + private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) + throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + if (state != null) { + put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, + Bytes.toBytes(state.name())); + } + for (int i = 0; i < barriers.length; i++) { + put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, + put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); + } + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + + private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { + QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), + PEER_1, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); + QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), + PEER_2, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index 99e4f08..1808b5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util.hbck; import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -40,14 +41,14 @@ public class HbckTestingUtil { public static HBaseFsck doFsck( Configuration conf, boolean fix, TableName table) throws Exception { - return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); } public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, - boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks, - boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication, - TableName table) throws Exception { + boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, + boolean fixHFileLinks, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, + boolean fixReplication, boolean cleanReplicationBarrier, TableName table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { HBaseFsck.setDisplayFullReport(); // i.e. -details @@ -63,6 +64,7 @@ public class HbckTestingUtil { fsck.setFixHFileLinks(fixHFileLinks); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); fsck.setFixReplication(fixReplication); + fsck.setCleanReplicationBarrier(cleanReplicationBarrier); if (table != null) { fsck.includeTable(table); } @@ -88,6 +90,16 @@ public class HbckTestingUtil { return hbck; } + public static boolean cleanReplicationBarrier(Configuration conf, TableName table) + throws IOException, ClassNotFoundException { + HBaseFsck hbck = new HBaseFsck(conf, null); + hbck.setCleanReplicationBarrierTable(table.getNameAsString()); + hbck.setCleanReplicationBarrier(true); + hbck.connect(); + hbck.cleanReplicationBarrier(); + return hbck.shouldRerun(); + } + public static boolean inconsistencyFound(HBaseFsck fsck) throws Exception { List errs = fsck.getErrors().getErrorList(); return (errs != null && !errs.isEmpty()); -- 2.7.4