From 22b95cb995150739328661befc215daf8ef9fea2 Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Tue, 17 Apr 2018 14:34:32 +0800 Subject: [PATCH] add a replication barrier cleaner --- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 52 +++++++++++++++++----- .../regionserver/TestSerialReplicationChecker.java | 15 +++++++ .../hadoop/hbase/util/hbck/HbckTestingUtil.java | 13 +++++- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index fcbb032..cb8207b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -261,6 +261,7 @@ public class HBaseFsck extends Configured implements Closeable { private boolean fixHFileLinks = false; // fix lingering HFileLinks private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows private boolean fixReplication = false; // fix undeleted replication queues for removed peer + private boolean cleanReplicationBarrier = false; // clean replication barriers of a specified table private boolean fixAny = false; // Set to true if any of the fix is required. // limit checking/fixes to listed tables, if empty attempt to check/fix all @@ -4549,6 +4550,10 @@ public class HBaseFsck extends Configured implements Closeable { fixAny |= shouldFix; } + public void setCleanReplicationBarrier(boolean shouldClean) { + cleanReplicationBarrier = shouldClean; + } + /** * Check if we should rerun fsck again. This checks if we've tried to * fix something and we should rerun fsck tool again. @@ -4827,6 +4832,8 @@ public class HBaseFsck extends Configured implements Closeable { out.println(""); out.println(" Replication options"); out.println(" -fixReplication Deletes replication queues for removed peers"); + out.println( + " -cleanReplicationBrarier [tableName] clean the replication barriers of specified table, tableName is required"); out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -4871,6 +4878,7 @@ public class HBaseFsck extends Configured implements Closeable { boolean checkCorruptHFiles = false; boolean sidelineCorruptHFiles = false; + String replicationTable = null; // Process command-line args. for (int i = 0; i < args.length; i++) { @@ -4887,13 +4895,12 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - long timelag = Long.parseLong(args[i+1]); + long timelag = Long.parseLong(args[++i]); setTimeLag(timelag); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-timelag needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sleepBeforeRerun")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -4901,19 +4908,17 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - sleepBeforeRerun = Long.parseLong(args[i+1]); + sleepBeforeRerun = Long.parseLong(args[++i]); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-sleepBeforeRerun needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sidelineDir")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, "HBaseFsck: -sidelineDir needs a value."); return printUsageAndExit(); } - i++; - setSidelineDir(args[i]); + setSidelineDir(args[++i]); } else if (cmd.equals("-fix")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "This option is deprecated, please use -fixAssignments instead."); @@ -4983,14 +4988,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxOverlapsToSideline = Integer.parseInt(args[i+1]); + int maxOverlapsToSideline = Integer.parseInt(args[++i]); setMaxOverlapsToSideline(maxOverlapsToSideline); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxOverlapsToSideline needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-maxMerge")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -4998,14 +5002,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxMerge = Integer.parseInt(args[i+1]); + int maxMerge = Integer.parseInt(args[++i]); setMaxMerge(maxMerge); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxMerge needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-summary")) { setSummary(); } else if (cmd.equals("-metaonly")) { @@ -5014,6 +5017,12 @@ public class HBaseFsck extends Configured implements Closeable { setRegionBoundariesCheck(); } else if (cmd.equals("-fixReplication")) { setFixReplication(true); + } else if (cmd.equals("-cleanReplicationBarrier")) { + setCleanReplicationBarrier(true); + replicationTable = args[++i]; + if(replicationTable.startsWith("-")){ + printUsageAndExit(); + } } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); @@ -5060,6 +5069,10 @@ public class HBaseFsck extends Configured implements Closeable { // check and fix table integrity, region consistency. int code = onlineHbck(); setRetCode(code); + + if(cleanReplicationBarrier){ + cleanReplicationBarrier(replicationTable); + } // If we have changed the HBase state it is better to run hbck again // to see if we haven't broken something else in the process. // We run it only once more because otherwise we can easily fall into @@ -5089,6 +5102,25 @@ public class HBaseFsck extends Configured implements Closeable { return this; } + public void cleanReplicationBarrier(String replicationTable) throws IOException { + TableName tableName = TableName.valueOf(replicationTable); + if (tableName.isSystemTable()) { + LOG.error("There is no replication barrier for system table"); + printUsageAndExit(); + } + List> regions = + MetaTableAccessor.getTableRegionsAndLocations(connection, tableName); + for (Pair region : regions) { + long[] barriers = + MetaTableAccessor.getReplicationBarrier(connection, region.getFirst().getRegionName()); + if (barriers.length == 0) { + continue; + } + meta.delete(new Delete(region.getFirst().getRegionName()) + .addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); + } + } + /** * ls -r for debugging purposes */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java index 29749bd..bfb46a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.AfterClass; @@ -300,4 +301,18 @@ public class TestSerialReplicationChecker { updatePushedSeqId(region, 99); assertTrue(checker.canPush(createEntry(region, 100), cell)); } + + @Test + public void testCanPushAfterCleanReplicationBarrier() throws Exception { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + Put put = MetaTableAccessor.makePutFromRegionInfo(region, EnvironmentEdgeManager.currentTime()); + MetaTableAccessor.putsToMetaTable(conn, Arrays.asList(put)); + addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); + Cell cell = createCell(region); + assertTrue("push seq 10", checker.canPush(createEntry(region, 10), cell)); + assertFalse("push seq 100", checker.canPush(createEntry(region, 100), cell)); + HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), region.getTable()); + assertTrue("push seq 10", checker.canPush(createEntry(region, 10), cell)); + assertTrue("push seq 100", checker.canPush(createEntry(region, 100), cell)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index 99e4f08..85a3647 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util.hbck; import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -40,13 +41,13 @@ public class HbckTestingUtil { public static HBaseFsck doFsck( Configuration conf, boolean fix, TableName table) throws Exception { - return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); } public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks, - boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication, + boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication, boolean cleanReplicationBarrier, TableName table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { @@ -63,6 +64,7 @@ public class HbckTestingUtil { fsck.setFixHFileLinks(fixHFileLinks); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); fsck.setFixReplication(fixReplication); + fsck.setCleanReplicationBarrier(cleanReplicationBarrier); if (table != null) { fsck.includeTable(table); } @@ -88,6 +90,13 @@ public class HbckTestingUtil { return hbck; } + public static void cleanReplicationBarrier(Configuration conf, TableName table) + throws IOException, ClassNotFoundException { + HBaseFsck hbck = new HBaseFsck(conf, null); + hbck.connect(); + hbck.cleanReplicationBarrier(table.getNameAsString()); + } + public static boolean inconsistencyFound(HBaseFsck fsck) throws Exception { List errs = fsck.getErrors().getErrorList(); return (errs != null && !errs.isEmpty()); -- 2.7.4