From 01d1b212bfbf57db92f0edf1adff0b26a6f172b2 Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Wed, 18 Apr 2018 21:21:08 +0800 Subject: [PATCH] add a barrier cleaner --- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 94 +++++++++++++-- .../TestHBaseFsckCleanReplicationBarriers.java | 127 +++++++++++++++++++++ .../hadoop/hbase/util/hbck/HbckTestingUtil.java | 24 +++- 3 files changed, 229 insertions(+), 16 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index fcbb032..4c5ca08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -115,6 +116,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; @@ -261,11 +266,13 @@ public class HBaseFsck extends Configured implements Closeable { private boolean fixHFileLinks = false; // fix lingering HFileLinks private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows private boolean fixReplication = false; // fix undeleted replication queues for removed peer + private boolean cleanReplicationBarrier = false; // clean replication barriers of a table private boolean fixAny = false; // Set to true if any of the fix is required. // limit checking/fixes to listed tables, if empty attempt to check/fix all // hbase:meta are always checked private Set tablesIncluded = new HashSet<>(); + private TableName cleanReplicationBarrierTable; private int maxMerge = DEFAULT_MAX_MERGE; // maximum number of overlapping regions to merge // maximum number of overlapping regions to sideline private int maxOverlapsToSideline = DEFAULT_OVERLAPS_TO_SIDELINE; @@ -779,6 +786,8 @@ public class HBaseFsck extends Configured implements Closeable { checkAndFixReplication(); + cleanReplicationBarrier(); + // Remove the hbck znode cleanupHbckZnode(); @@ -4117,7 +4126,7 @@ public class HBaseFsck extends Configured implements Closeable { HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, LINGERING_HFILELINK, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, - ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS + ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS, INVALID_TABLE } void clear(); void report(String message); @@ -4549,6 +4558,10 @@ public class HBaseFsck extends Configured implements Closeable { fixAny |= shouldFix; } + public void setCleanReplicationBarrier(boolean shouldClean) { + cleanReplicationBarrier = shouldClean; + } + /** * Check if we should rerun fsck again. This checks if we've tried to * fix something and we should rerun fsck tool again. @@ -4559,7 +4572,7 @@ public class HBaseFsck extends Configured implements Closeable { rerun = true; } - boolean shouldRerun() { + public boolean shouldRerun() { return rerun; } @@ -4827,6 +4840,8 @@ public class HBaseFsck extends Configured implements Closeable { out.println(""); out.println(" Replication options"); out.println(" -fixReplication Deletes replication queues for removed peers"); + out.println(" -cleanReplicationBrarier [tableName] clean the replication barriers " + + "of a specified table, tableName is required"); out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -4887,13 +4902,12 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - long timelag = Long.parseLong(args[i+1]); + long timelag = Long.parseLong(args[++i]); setTimeLag(timelag); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-timelag needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sleepBeforeRerun")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -4901,19 +4915,17 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - sleepBeforeRerun = Long.parseLong(args[i+1]); + sleepBeforeRerun = Long.parseLong(args[++i]); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-sleepBeforeRerun needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sidelineDir")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, "HBaseFsck: -sidelineDir needs a value."); return printUsageAndExit(); } - i++; - setSidelineDir(args[i]); + setSidelineDir(args[++i]); } else if (cmd.equals("-fix")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "This option is deprecated, please use -fixAssignments instead."); @@ -4983,14 +4995,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxOverlapsToSideline = Integer.parseInt(args[i+1]); + int maxOverlapsToSideline = Integer.parseInt(args[++i]); setMaxOverlapsToSideline(maxOverlapsToSideline); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxOverlapsToSideline needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-maxMerge")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -4998,14 +5009,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxMerge = Integer.parseInt(args[i+1]); + int maxMerge = Integer.parseInt(args[++i]); setMaxMerge(maxMerge); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxMerge needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-summary")) { setSummary(); } else if (cmd.equals("-metaonly")) { @@ -5014,6 +5024,12 @@ public class HBaseFsck extends Configured implements Closeable { setRegionBoundariesCheck(); } else if (cmd.equals("-fixReplication")) { setFixReplication(true); + } else if (cmd.equals("-cleanReplicationBarrier")) { + setCleanReplicationBarrier(true); + if(args[++i].startsWith("-")){ + printUsageAndExit(); + } + setCleanReplicationBarrierTable(args[i]); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); @@ -5089,6 +5105,60 @@ public class HBaseFsck extends Configured implements Closeable { return this; } + public void setCleanReplicationBarrierTable(String cleanReplicationBarrierTable) { + this.cleanReplicationBarrierTable = TableName.valueOf(cleanReplicationBarrierTable); + } + + public void cleanReplicationBarrier() throws IOException { + if (!cleanReplicationBarrier) { + return; + } + if (cleanReplicationBarrierTable.isSystemTable() + || !admin.tableExists(cleanReplicationBarrierTable) + || admin.getDescriptor(cleanReplicationBarrierTable).hasGlobalReplicationScope()) { + errors.reportError(ERROR_CODE.INVALID_TABLE, + "invalid table: " + cleanReplicationBarrierTable); + return; + } + boolean cleaned = false; + + List> regions = + MetaTableAccessor.getTableRegionsAndLocations(connection, cleanReplicationBarrierTable); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); + List peerDescriptions = admin.listReplicationPeers(); + if (peerDescriptions != null && peerDescriptions.size() > 0) { + List peers = peerDescriptions.stream() + .filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(), + cleanReplicationBarrierTable)) + .map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList()); + List regionNames = regions.stream().map(region -> region.getFirst().getEncodedName()) + .collect(Collectors.toList()); + try { + for (String peer : peers) { + queueStorage.removeLastSequenceIds(peer, regionNames); + } + } catch (ReplicationException re) { + throw new IOException(re); + } + } + for (Pair region : regions) { + long[] barriers = + MetaTableAccessor.getReplicationBarrier(connection, region.getFirst().getRegionName()); + if (barriers.length == 0) { + continue; + } + if (!cleaned) { + cleaned = true; + } + meta.delete(new Delete(region.getFirst().getRegionName()) + .addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); + } + if (cleaned) { + setShouldRerun(); + } + } + /** * ls -r for debugging purposes */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java new file mode 100644 index 0000000..435723a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestHBaseFsckCleanReplicationBarriers { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHBaseFsckCleanReplicationBarriers.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + public static final byte[][] KEYS = { Bytes.toBytes("aaa") }; + + private static String PEER_ID = "1"; + + private static ReplicationQueueStorage QUEUE_STORAGE; + + private static String WAL_FILE_NAME = "test.wal"; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + ReplicationPeerStorage peerStorage = ReplicationStorageFactory + .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), + UTIL.getConfiguration()); + createPeer(); + QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID, + WAL_FILE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testCanPushAfterCleanReplicationBarrier() throws Exception { + TableName tableName = TableName.valueOf("test"); + String cf = "info"; + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build()) + .setReplicationScope(HConstants.REPLICATION_SCOPE_LOCAL).build(); + UTIL.createTable(tableDescriptor, KEYS); + assertTrue(UTIL.getAdmin().getRegions(tableName).size() > 0); + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); + updatePushedSeqId(region, 10); + assertEquals("check if there is lastPushedId", 10, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_ID)); + boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertTrue(cleaned); + assertEquals("check if there is lastPushedId", -1, + QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_ID)); + cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertFalse(cleaned); + } + + public static void createPeer() throws IOException { + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL.getClusterKey()).setSerial(true).build(); + UTIL.getAdmin().addReplicationPeer(PEER_ID, rpc); + } + + private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) + throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + if (state != null) { + put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, + Bytes.toBytes(state.name())); + } + for (int i = 0; i < barriers.length; i++) { + put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, + put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i])); + } + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + + private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { + QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), + PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index 99e4f08..bcdec0b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util.hbck; import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -28,9 +29,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; +import org.junit.experimental.categories.Category; +@Category({ ReplicationTests.class, MediumTests.class }) public class HbckTestingUtil { private static ExecutorService exec = new ScheduledThreadPoolExecutor(10); public static HBaseFsck doFsck( @@ -40,14 +45,14 @@ public class HbckTestingUtil { public static HBaseFsck doFsck( Configuration conf, boolean fix, TableName table) throws Exception { - return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); } public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, - boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks, - boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication, - TableName table) throws Exception { + boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, + boolean fixHFileLinks, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, + boolean fixReplication, boolean cleanReplicationBarrier, TableName table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { HBaseFsck.setDisplayFullReport(); // i.e. -details @@ -63,6 +68,7 @@ public class HbckTestingUtil { fsck.setFixHFileLinks(fixHFileLinks); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); fsck.setFixReplication(fixReplication); + fsck.setCleanReplicationBarrier(cleanReplicationBarrier); if (table != null) { fsck.includeTable(table); } @@ -88,6 +94,16 @@ public class HbckTestingUtil { return hbck; } + public static boolean cleanReplicationBarrier(Configuration conf, TableName table) + throws IOException, ClassNotFoundException { + HBaseFsck hbck = new HBaseFsck(conf, null); + hbck.setCleanReplicationBarrierTable(table.getNameAsString()); + hbck.setCleanReplicationBarrier(true); + hbck.connect(); + hbck.cleanReplicationBarrier(); + return hbck.shouldRerun(); + } + public static boolean inconsistencyFound(HBaseFsck fsck) throws Exception { List errs = fsck.getErrors().getErrorList(); return (errs != null && !errs.isEmpty()); -- 2.7.4