From 7b766a88dc2c1182e73d7e020bfbcfbb80c5f034 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 23 Oct 2018 22:01:52 +0800 Subject: [PATCH] HBASE-21363 Rewrite the buildingHoldCleanupTracker method in WALProcedureStore --- .../store/ProcedureStoreTracker.java | 55 +++++++++---------- .../store/wal/ProcedureWALFormatReader.java | 5 +- .../store/wal/WALProcedureStore.java | 43 ++++++++------- .../procedure2/TestProcedureCleanup.java | 15 ++--- 4 files changed, 54 insertions(+), 64 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 64479b220b..0c8c4dd0d7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; +import java.util.function.BiFunction; import java.util.stream.LongStream; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.yetus.audience.InterfaceAudience; @@ -197,49 +198,45 @@ public class ProcedureStoreTracker { } } - /** - * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by - * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, - * then we mark it as deleted. - * @see #setDeletedIfModified(long...) - */ - public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker, boolean globalTracker) { + private void setDeleteIf(ProcedureStoreTracker tracker, + BiFunction func) { BitSetNode trackerNode = null; for (BitSetNode node : map.values()) { - final long minProcId = node.getStart(); - final long maxProcId = node.getEnd(); + long minProcId = node.getStart(); + long maxProcId = node.getEnd(); for (long procId = minProcId; procId <= maxProcId; ++procId) { if (!node.isModified(procId)) { continue; } trackerNode = tracker.lookupClosestNode(trackerNode, procId); - if (trackerNode == null || !trackerNode.contains(procId)) { - // the procId is not exist in the track, we can only delete the proc - // if globalTracker set to true. - // Only if the procedure is not in the global tracker we can delete the - // the procedure. In other cases, the procedure may not update in a single - // log, we cannot delete it just because the log's track doesn't have - // any info for the procedure. - if (globalTracker) { - node.delete(procId); - } - continue; - } - // Only check delete in the global tracker, only global tracker has the - // whole picture - if (globalTracker && trackerNode.isDeleted(procId) == DeleteState.YES) { - node.delete(procId); - continue; - } - if (trackerNode.isModified(procId)) { - // the procedure was modified + if (func.apply(trackerNode, procId)) { node.delete(procId); } } } } + /** + * For the global tracker, we will use this method to build the holdingCleanupTracker, as the + * modified flags will be cleared after rolling so we only need to test the deleted flags. + * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker) + */ + public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) { + setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) || + node.isDeleted(procId) == DeleteState.YES); + } + + /** + * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by + * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, + * then we mark it as deleted. + * @see #setDeletedIfModified(long...) + */ + public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { + setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId)); + } + /** * lookup the node containing the specified procId. * @param node cached node to check before doing a lookup diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 2e1e06ce05..1b19abbeb9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -134,9 +134,8 @@ public class ProcedureWALFormatReader { } procedureMap.merge(localProcedureMap); } - if (localTracker.isPartial()) { - localTracker.setPartialFlag(false); - } + // Do not reset the partial flag for local tracker, as here the local tracker only know the + // procedures which are modified in this file. } public void finish() throws IOException { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 0a89c3feaa..f5ffb06c80 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -97,7 +97,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it * with the tracker of every newer wal files, using the - * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker, boolean)}. + * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. * If we find out * that all the modified procedures for the oldest wal file are modified or deleted in newer wal * files, then we can delete it. This is because that, every time we call @@ -489,6 +489,8 @@ public class WALProcedureStore extends ProcedureStoreBase { }); } finally { try { + // reset the modified flags for the global tracker, as we may build it during the loading. + storeTracker.resetModified(); // try to cleanup inactive wals and complete the operation buildHoldingCleanupTracker(); tryCleanupLogsOnLoad(); @@ -1173,27 +1175,26 @@ public class WALProcedureStore extends ProcedureStoreBase { } // compute the holding tracker. - // - the first WAL is used for the 'updates' - // - the global tracker is passed in first to decide which procedures are not - // exist anymore, so we can mark them as deleted in holdingCleanupTracker. - // Only global tracker have the whole picture here. - // - the other WALs are scanned to remove procs already updated in a newer wal. - // If it is updated in a newer wal, we can mark it as delelted in holdingCleanupTracker - // But, we can not delete it if it was shown deleted in the newer wal, as said - // above. - // TODO: exit early if holdingCleanupTracker.isEmpty() + // - the first WAL is used for the 'updates' + // - the global tracker will be used to determine whether a procedure has been deleted + // - other trackers will be used to determine whether a procedure has been updated, as a deleted + // procedure can always be detected by checking the global tracker, we can save the deleted + // checks when applying other trackers holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); - //Passing in the global tracker, we can delete the procedures not in the global - //tracker, because they are deleted in the later logs - holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker, true); - for (int i = 1, size = logs.size() - 1; i < size; ++i) { - // Set deleteIfNotExists to false since a single log's tracker is passed in. - // Since a specific procedure may not show up in the log at all(not executed or - // updated during the time), we can not delete the procedure just because this log - // don't have the info of the procedure. We can delete the procedure only if - // in this log's tracker, it was cleanly showed that the procedure is modified or deleted - // in the corresponding BitSetNode. - holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker(), false); + holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker); + // the logs is a linked list, so avoid calling get(index) on it. + Iterator iter = logs.iterator(); + // skip the tracker for the first file when creating the iterator. + iter.next(); + ProcedureStoreTracker tracker = iter.next().getTracker(); + // testing iter.hasNext after calling iter.next to skip applying the tracker for last file, + // which is just the storeTracker above. + while (iter.hasNext()) { + holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker); + if (holdingCleanupTracker.isEmpty()) { + break; + } + iter.next(); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java index e06fdc5348..72c9a09175 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.procedure2; import static org.junit.Assert.assertTrue; import java.util.concurrent.CountDownLatch; - -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; @@ -36,7 +34,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - @Category({MasterTests.class, SmallTests.class}) public class TestProcedureCleanup { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule @@ -53,7 +50,6 @@ public class TestProcedureCleanup { private static HBaseCommonTestingUtility htu; - private static FileSystem fs; private static Path testDir; private static Path logDir; @@ -78,7 +74,6 @@ public class TestProcedureCleanup { // NOTE: The executor will be created by each test procEnv = new TestProcEnv(); testDir = htu.getDataTestDir(); - fs = testDir.getFileSystem(htu.getConfiguration()); assertTrue(testDir.depth() > 1); @@ -188,7 +183,7 @@ public class TestProcedureCleanup { private CountDownLatch latch = new CountDownLatch(1); @Override - protected Procedure[] execute(final TestProcEnv env) + protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { // Always wait here LOG.info("wait here"); @@ -210,7 +205,7 @@ public class TestProcedureCleanup { private CountDownLatch latch = new CountDownLatch(1); @Override - protected Procedure[] execute(final TestProcEnv env) + protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { // Always suspend the procedure LOG.info("suspend here"); @@ -227,16 +222,14 @@ public class TestProcedureCleanup { } @Override - protected Procedure[] execute(final TestProcEnv env) + protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { if (!childSpwaned) { childSpwaned = true; - return new Procedure[] {new SuspendProcedure()}; + return new Procedure[] { new SuspendProcedure() }; } else { return null; } } } - - } -- 2.17.1