From 78b6050c7c8d6f111c52d9a722f867d6b159651f Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Wed, 18 Apr 2018 16:50:38 +0800 Subject: [PATCH] add a replication barrier cleaner --- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 72 ++++++++++++++++++---- .../regionserver/TestSerialReplicationChecker.java | 22 +++++++ .../hadoop/hbase/util/hbck/HbckTestingUtil.java | 20 ++++-- 3 files changed, 98 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index fcbb032..1aa144a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -261,11 +261,13 @@ public class HBaseFsck extends Configured implements Closeable { private boolean fixHFileLinks = false; // fix lingering HFileLinks private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows private boolean fixReplication = false; // fix undeleted replication queues for removed peer + private boolean cleanReplicationBarrier = false; // clean replication barriers of a table private boolean fixAny = false; // Set to true if any of the fix is required. // limit checking/fixes to listed tables, if empty attempt to check/fix all // hbase:meta are always checked private Set tablesIncluded = new HashSet<>(); + private TableName cleanReplicationBarrierTable; private int maxMerge = DEFAULT_MAX_MERGE; // maximum number of overlapping regions to merge // maximum number of overlapping regions to sideline private int maxOverlapsToSideline = DEFAULT_OVERLAPS_TO_SIDELINE; @@ -779,6 +781,8 @@ public class HBaseFsck extends Configured implements Closeable { checkAndFixReplication(); + cleanReplicationBarrier(); + // Remove the hbck znode cleanupHbckZnode(); @@ -4117,7 +4121,7 @@ public class HBaseFsck extends Configured implements Closeable { HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, LINGERING_HFILELINK, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, - ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS + ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS, INVALID_TABLE } void clear(); void report(String message); @@ -4549,6 +4553,10 @@ public class HBaseFsck extends Configured implements Closeable { fixAny |= shouldFix; } + public void setCleanReplicationBarrier(boolean shouldClean) { + cleanReplicationBarrier = shouldClean; + } + /** * Check if we should rerun fsck again. This checks if we've tried to * fix something and we should rerun fsck tool again. @@ -4559,7 +4567,7 @@ public class HBaseFsck extends Configured implements Closeable { rerun = true; } - boolean shouldRerun() { + public boolean shouldRerun() { return rerun; } @@ -4827,6 +4835,8 @@ public class HBaseFsck extends Configured implements Closeable { out.println(""); out.println(" Replication options"); out.println(" -fixReplication Deletes replication queues for removed peers"); + out.println(" -cleanReplicationBrarier [tableName] clean the replication barriers " + + "of a specified table, tableName is required"); out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -4887,13 +4897,12 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - long timelag = Long.parseLong(args[i+1]); + long timelag = Long.parseLong(args[++i]); setTimeLag(timelag); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-timelag needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sleepBeforeRerun")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -4901,19 +4910,17 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - sleepBeforeRerun = Long.parseLong(args[i+1]); + sleepBeforeRerun = Long.parseLong(args[++i]); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-sleepBeforeRerun needs a numeric value."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-sidelineDir")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, "HBaseFsck: -sidelineDir needs a value."); return printUsageAndExit(); } - i++; - setSidelineDir(args[i]); + setSidelineDir(args[++i]); } else if (cmd.equals("-fix")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "This option is deprecated, please use -fixAssignments instead."); @@ -4983,14 +4990,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxOverlapsToSideline = Integer.parseInt(args[i+1]); + int maxOverlapsToSideline = Integer.parseInt(args[++i]); setMaxOverlapsToSideline(maxOverlapsToSideline); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxOverlapsToSideline needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-maxMerge")) { if (i == args.length - 1) { errors.reportError(ERROR_CODE.WRONG_USAGE, @@ -4998,14 +5004,13 @@ public class HBaseFsck extends Configured implements Closeable { return printUsageAndExit(); } try { - int maxMerge = Integer.parseInt(args[i+1]); + int maxMerge = Integer.parseInt(args[++i]); setMaxMerge(maxMerge); } catch (NumberFormatException e) { errors.reportError(ERROR_CODE.WRONG_USAGE, "-maxMerge needs a numeric value argument."); return printUsageAndExit(); } - i++; } else if (cmd.equals("-summary")) { setSummary(); } else if (cmd.equals("-metaonly")) { @@ -5014,6 +5019,12 @@ public class HBaseFsck extends Configured implements Closeable { setRegionBoundariesCheck(); } else if (cmd.equals("-fixReplication")) { setFixReplication(true); + } else if (cmd.equals("-cleanReplicationBarrier")) { + setCleanReplicationBarrier(true); + if(args[++i].startsWith("-")){ + printUsageAndExit(); + } + setCleanReplicationBarrierTable(args[i]); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); @@ -5089,6 +5100,43 @@ public class HBaseFsck extends Configured implements Closeable { return this; } + + public void setCleanReplicationBarrierTable(String cleanReplicationBarrierTable) { + this.cleanReplicationBarrierTable = TableName.valueOf(cleanReplicationBarrierTable); + } + + + public void cleanReplicationBarrier() throws IOException { + if(!cleanReplicationBarrier){ + return; + } + if (cleanReplicationBarrierTable.isSystemTable() + || !admin.tableExists(cleanReplicationBarrierTable)) { + errors.reportError(ERROR_CODE.INVALID_TABLE, + "invalid table: " + cleanReplicationBarrierTable); + return; + } + LOG.info("tjy: here"); + boolean noBarriers = true; + List> regions = + MetaTableAccessor.getTableRegionsAndLocations(connection, cleanReplicationBarrierTable); + for (Pair region : regions) { + long[] barriers = + MetaTableAccessor.getReplicationBarrier(connection, region.getFirst().getRegionName()); + if (barriers.length == 0) { + continue; + } + if(noBarriers){ + noBarriers = false; + } + meta.delete(new Delete(region.getFirst().getRegionName()) + .addFamily(HConstants.REPLICATION_BARRIER_FAMILY)); + } + if(!noBarriers){ + setShouldRerun(); + } + } + /** * ls -r for debugging purposes */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java index 29749bd..4de146e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; @@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.AfterClass; @@ -300,4 +302,24 @@ public class TestSerialReplicationChecker { updatePushedSeqId(region, 99); assertTrue(checker.canPush(createEntry(region, 100), cell)); } + + @Test + public void testCanPushAfterCleanReplicationBarrier() throws Exception { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + Put put1 = + MetaTableAccessor.makePutFromRegionInfo(region, EnvironmentEdgeManager.currentTime()); + Put put2 = MetaTableAccessor.makePutFromTableState( + new TableState(tableName, TableState.State.ENABLED), EnvironmentEdgeManager.currentTime()); + MetaTableAccessor.putsToMetaTable(conn, Arrays.asList(put1, put2)); + addStateAndBarrier(region, RegionState.State.OPEN, 10, 100); + Cell cell = createCell(region); + assertTrue("push seq 10", checker.canPush(createEntry(region, 10), cell)); + assertFalse("push seq 100", checker.canPush(createEntry(region, 100), cell)); + boolean result = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertTrue(result); + assertTrue("push seq 10", checker.canPush(createEntry(region, 10), cell)); + assertTrue("push seq 100", checker.canPush(createEntry(region, 100), cell)); + result = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName); + assertFalse(result); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index 99e4f08..1808b5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util.hbck; import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -40,14 +41,14 @@ public class HbckTestingUtil { public static HBaseFsck doFsck( Configuration conf, boolean fix, TableName table) throws Exception { - return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); } public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, - boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks, - boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication, - TableName table) throws Exception { + boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, + boolean fixHFileLinks, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, + boolean fixReplication, boolean cleanReplicationBarrier, TableName table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { HBaseFsck.setDisplayFullReport(); // i.e. -details @@ -63,6 +64,7 @@ public class HbckTestingUtil { fsck.setFixHFileLinks(fixHFileLinks); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); fsck.setFixReplication(fixReplication); + fsck.setCleanReplicationBarrier(cleanReplicationBarrier); if (table != null) { fsck.includeTable(table); } @@ -88,6 +90,16 @@ public class HbckTestingUtil { return hbck; } + public static boolean cleanReplicationBarrier(Configuration conf, TableName table) + throws IOException, ClassNotFoundException { + HBaseFsck hbck = new HBaseFsck(conf, null); + hbck.setCleanReplicationBarrierTable(table.getNameAsString()); + hbck.setCleanReplicationBarrier(true); + hbck.connect(); + hbck.cleanReplicationBarrier(); + return hbck.shouldRerun(); + } + public static boolean inconsistencyFound(HBaseFsck fsck) throws Exception { List errs = fsck.getErrors().getErrorList(); return (errs != null && !errs.isEmpty()); -- 2.7.4