diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 4fea0d4..9c31e46 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -156,11 +156,18 @@ public class ProcedureStoreTracker { partial = false; } - public BitSetNode(BitSetNode other) { + public BitSetNode(final BitSetNode other, final boolean resetDelete) { this.start = other.start; this.partial = other.partial; this.updated = other.updated.clone(); - this.deleted = other.deleted.clone(); + if (resetDelete) { + this.deleted = new long[other.deleted.length]; + for (int i = 0; i < this.deleted.length; ++i) { + this.deleted[i] = ~(other.updated[i]); + } + } else { + this.deleted = other.deleted.clone(); + } } public void update(final long procId) { @@ -171,11 +178,11 @@ public class ProcedureStoreTracker { updateState(procId, true); } - public Long getStart() { + public long getStart() { return start; } - public Long getEnd() { + public long getEnd() { return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1; } @@ -250,33 +257,6 @@ public class ProcedureStoreTracker { } /** - * If an active (non-deleted) procedure in current BitSetNode has been updated in {@code other} - * BitSetNode, then delete it from current node. - * @return true if node changed, i.e. some procedure(s) from {@code other} was subtracted from - * current node. - */ - public boolean subtract(BitSetNode other) { - // Assert that other node intersects with this node. - assert !(other.getEnd() < this.start) && !(this.getEnd() < other.start); - int thisOffset = 0, otherOffset = 0; - if (this.start < other.start) { - thisOffset = (int) (other.start - this.start) / BITS_PER_WORD; - } else { - otherOffset = (int) (this.start - other.start) / BITS_PER_WORD; - } - int size = Math.min(this.updated.length - thisOffset, other.updated.length - otherOffset); - boolean nonZeroIntersect = false; - for (int i = 0; i < size; i++) { - long intersect = ~this.deleted[thisOffset + i] & other.updated[otherOffset + i]; - if (intersect != 0) { - this.deleted[thisOffset + i] |= intersect; - nonZeroIntersect = true; - } - } - return nonZeroIntersect; - } - - /** * Convert to * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode * protobuf. @@ -292,7 +272,6 @@ public class ProcedureStoreTracker { return builder.build(); } - // ======================================================================== // Grow/Merge Helpers // ======================================================================== @@ -461,63 +440,132 @@ public class ProcedureStoreTracker { /** * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. */ - public void resetTo(ProcedureStoreTracker tracker) { + public void resetTo(final ProcedureStoreTracker tracker) { + resetTo(tracker, false); + } + + public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) { this.partial = tracker.partial; this.minUpdatedProcId = tracker.minUpdatedProcId; this.maxUpdatedProcId = tracker.maxUpdatedProcId; this.keepDeletes = tracker.keepDeletes; for (Map.Entry entry : tracker.map.entrySet()) { - map.put(entry.getKey(), new BitSetNode(entry.getValue())); + map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); } } public void insert(long procId) { - BitSetNode node = getOrCreateNode(procId); - node.update(procId); - trackProcIds(procId); + insert(null, procId); } public void insert(final long procId, final long[] subProcIds) { - update(procId); + BitSetNode node = null; + node = update(node, procId); for (int i = 0; i < subProcIds.length; ++i) { - insert(subProcIds[i]); + node = insert(node, subProcIds[i]); } } + private BitSetNode insert(BitSetNode node, final long procId) { + if (node == null || !node.contains(procId)) { + node = getOrCreateNode(procId); + } + node.update(procId); + trackProcIds(procId); + return node; + } + public void update(long procId) { - Map.Entry entry = map.floorEntry(procId); - assert entry != null : "expected node to update procId=" + procId; + update(null, procId); + } - BitSetNode node = entry.getValue(); - assert node.contains(procId); + private BitSetNode update(BitSetNode node, final long procId) { + node = lookupClosestNode(node, procId); + assert node != null : "expected node to update procId=" + procId; + assert node.contains(procId) : "expected procId=" + procId + " in the node"; node.update(procId); trackProcIds(procId); + return node; } public void delete(long procId) { - Map.Entry entry = map.floorEntry(procId); - assert entry != null : "expected node to delete procId=" + procId; + delete(null, procId); + } - BitSetNode node = entry.getValue(); - assert node.contains(procId) : "expected procId in the node"; + public void delete(final long[] procIds) { + Arrays.sort(procIds); + BitSetNode node = null; + for (int i = 0; i < procIds.length; ++i) { + node = delete(node, procIds[i]); + } + } + + private BitSetNode delete(BitSetNode node, final long procId) { + node = lookupClosestNode(node, procId); + assert node != null : "expected node to delete procId=" + procId; + assert node.contains(procId) : "expected procId=" + procId + " in the node"; node.delete(procId); if (!keepDeletes && node.isEmpty()) { // TODO: RESET if (map.size() == 1) - map.remove(entry.getKey()); + map.remove(node.getStart()); } trackProcIds(procId); + return node; } - public void delete(long[] procIds) { - // TODO: optimize - Arrays.sort(procIds); - for (int i = 0; i < procIds.length; ++i) { - delete(procIds[i]); + @InterfaceAudience.Private + public void setDeleted(final long procId, final boolean isDeleted) { + BitSetNode node = getOrCreateNode(procId); + assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; + node.updateState(procId, isDeleted); + trackProcIds(procId); + } + + public void setDeletedIfSet(final long... procId) { + BitSetNode node = null; + for (int i = 0; i < procId.length; ++i) { + node = lookupClosestNode(node, procId[i]); + if (node != null && node.isUpdated(procId[i])) { + node.delete(procId[i]); + } } } + public void setDeletedIfSet(final ProcedureStoreTracker tracker) { + BitSetNode trackerNode = null; + for (BitSetNode node: map.values()) { + final long minProcId = node.getStart(); + final long maxProcId = node.getEnd(); + for (long procId = minProcId; procId <= maxProcId; ++procId) { + if (!node.isUpdated(procId)) continue; + + trackerNode = tracker.lookupClosestNode(trackerNode, procId); + if (isUpdatedOrDeleted(trackerNode, procId)) { + node.delete(procId); + } + } + } + } + + /** + * lookup the node containing the specified procId. + * @param node cached node to check before doing a lookup + * @param procId the procId to lookup + * @return the node that may contains the procId or null + */ + private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) { + if (node != null && node.contains(procId)) return node; + final Map.Entry entry = map.floorEntry(procId); + return entry != null ? entry.getValue() : null; + } + + private static boolean isUpdatedOrDeleted(final BitSetNode node, final long procId) { + if (node == null || !node.contains(procId)) return false; + return node.isUpdated(procId) || node.isDeleted(procId) == DeleteState.YES; + } + private void trackProcIds(long procId) { minUpdatedProcId = Math.min(minUpdatedProcId, procId); maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); @@ -531,14 +579,6 @@ public class ProcedureStoreTracker { return maxUpdatedProcId; } - @InterfaceAudience.Private - public void setDeleted(final long procId, final boolean isDeleted) { - BitSetNode node = getOrCreateNode(procId); - assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; - node.updateState(procId, isDeleted); - trackProcIds(procId); - } - public void reset() { this.keepDeletes = false; this.partial = false; @@ -626,11 +666,6 @@ public class ProcedureStoreTracker { return true; } - public boolean isTracking(long minId, long maxId) { - // TODO: we can make it more precise, instead of looking just at the block - return map.floorEntry(minId) != null || map.floorEntry(maxId) != null; - } - /** * Clears the list of updated procedure ids. This doesn't affect global list of active * procedure ids. @@ -731,37 +766,6 @@ public class ProcedureStoreTracker { } } - /** - * Iterates over - * {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other} - * tracker. - * @return true if tracker changed, i.e. some procedure from {@code other} were subtracted from - * current tracker. - */ - public boolean subtract(ProcedureStoreTracker other) { - // Can not intersect partial bitmap. - assert !partial && !other.partial; - boolean nonZeroIntersect = false; - for (Map.Entry currentEntry : map.entrySet()) { - BitSetNode currentBitSetNode = currentEntry.getValue(); - Map.Entry otherTrackerEntry = other.map.floorEntry(currentEntry.getKey()); - if (otherTrackerEntry == null // No node in other map with key <= currentEntry.getKey(). - // First entry in other map doesn't intersect with currentEntry. - || otherTrackerEntry.getValue().getEnd() < currentEntry.getKey()) { - otherTrackerEntry = other.map.ceilingEntry(currentEntry.getKey()); - if (otherTrackerEntry == null || !currentBitSetNode.contains(otherTrackerEntry.getKey())) { - // No node in other map intersects with currentBitSetNode's range. - continue; - } - } - do { - nonZeroIntersect |= currentEntry.getValue().subtract(otherTrackerEntry.getValue()); - otherTrackerEntry = other.map.higherEntry(otherTrackerEntry.getKey()); - } while (otherTrackerEntry != null && currentBitSetNode.contains(otherTrackerEntry.getKey())); - } - return nonZeroIntersect; - } - // ======================================================================== // Convert to/from Protocol Buffer. // ======================================================================== diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index e5c8fca..aeae569 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -101,8 +101,18 @@ public class ProcedureWALFormatReader { private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); private final WalProcedureMap procedureMap = new WalProcedureMap(1024); - // private long compactionLogId; - private long maxProcId = 0; + private final ProcedureWALFormat.Loader loader; + + /** + * Global tracker that will be used by the WALProcedureStore after load. + * If the last WAL was closed cleanly we already have a full tracker ready to be used. + * If the last WAL was truncated (e.g. master killed) the tracker will be empty + * and the 'partial' flag will be set. In this case on WAL replay we are going + * to rebuild the tracker. + */ + private final ProcedureStoreTracker tracker; + // private final boolean hasFastStartSupport; + /** * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we * re-build the list of procedures updated in that WAL because we need it for log cleaning @@ -113,13 +123,9 @@ public class ProcedureWALFormatReader { * {@link ProcedureStoreTracker.BitSetNode#subtract(ProcedureStoreTracker.BitSetNode)}). */ private ProcedureStoreTracker localTracker; - private final ProcedureWALFormat.Loader loader; - /** - * Global tracker. If set to partial, it will be updated as procedures are loaded from wals, - * otherwise not. - */ - private final ProcedureStoreTracker tracker; - // private final boolean hasFastStartSupport; + + // private long compactionLogId; + private long maxProcId = 0; public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, ProcedureWALFormat.Loader loader) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 3a46f8f..54f375b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -106,6 +106,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private static final int DEFAULT_SYNC_STATS_COUNT = 10; private final LinkedList logs = new LinkedList<>(); + private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker(); private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); @@ -379,6 +380,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } }); } finally { + buildHoldingCleanupTracker(); loading.set(false); } } @@ -604,16 +606,20 @@ public class WALProcedureStore extends ProcedureStoreBase { storeTracker.insert(procId); } else { storeTracker.insert(procId, subProcIds); + holdingCleanupTracker.setDeletedIfSet(procId); } break; case UPDATE: storeTracker.update(procId); + holdingCleanupTracker.setDeletedIfSet(procId); break; case DELETE: if (subProcIds != null && subProcIds.length > 0) { storeTracker.delete(subProcIds); + holdingCleanupTracker.setDeletedIfSet(subProcIds); } else { storeTracker.delete(procId); + holdingCleanupTracker.setDeletedIfSet(procId); } break; default: @@ -918,6 +924,11 @@ public class WALProcedureStore extends ProcedureStoreBase { lastRollTs.set(rollTs); logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs)); + // if it's the first next WAL being added, build the holding cleanup tracker + if (logs.size() == 2) { + buildHoldingCleanupTracker(); + } + if (LOG.isDebugEnabled()) { LOG.debug("Roll new state log: " + logId); } @@ -946,38 +957,28 @@ public class WALProcedureStore extends ProcedureStoreBase { // ========================================================================== // Log Files cleaner helpers // ========================================================================== - - /** - * Iterates over log files from latest (ignoring currently active one) to oldest, deleting the - * ones which don't contain anything useful for recovery. - * @throws IOException - */ private void removeInactiveLogs() throws IOException { - // TODO: can we somehow avoid first iteration (starting from newest log) and still figure out - // efficient way to cleanup old logs. - // Alternatively, a complex and maybe more efficient method would be using this iteration to - // rewrite latest states of active procedures to a new log file and delete all old ones. - if (logs.size() <= 1) return; - ProcedureStoreTracker runningTracker = new ProcedureStoreTracker(); - runningTracker.resetTo(storeTracker); - List logsToBeDeleted = new ArrayList<>(); - for (int i = logs.size() - 2; i >= 0; i--) { - ProcedureWALFile log = logs.get(i); - // If nothing was subtracted, delete the log file since it doesn't contain any useful proc - // states. - if (!runningTracker.subtract(log.getTracker())) { - logsToBeDeleted.add(log); - } + // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. + // once there is nothing olding the oldest WAL we can remove it. + while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { + removeLogFile(logs.getFirst()); + buildHoldingCleanupTracker(); } - // Delete the logs from oldest to newest and stop at first log that can't be deleted to avoid - // holes in the log file sequence (for better debug capability). - while (true) { - ProcedureWALFile log = logs.getFirst(); - if (logsToBeDeleted.contains(log)) { - removeLogFile(log); - } else { - break; - } + } + + private void buildHoldingCleanupTracker() { + if (logs.size() <= 1) { + // we only have one wal, so nothing to do + holdingCleanupTracker.reset(); + return; + } + + // compute the holding tracker. + // - the first WAL is used for the 'updates' + // - the other WALs are scanned to remove procs already in other wals. + holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); + for (int i = 1; i < logs.size(); ++i) { + holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker()); } } @@ -990,12 +991,19 @@ public class WALProcedureStore extends ProcedureStoreBase { if (LOG.isDebugEnabled()) { LOG.debug("Remove all state logs with ID less than " + lastLogId); } + + boolean removed = false; while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); if (lastLogId < log.getLogId()) { break; } removeLogFile(log); + removed = true; + } + + if (removed) { + buildHoldingCleanupTracker(); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index 76fd2c5..550116e 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -106,32 +106,6 @@ public class TestProcedureStoreTracker { } @Test - public void testIsTracking() { - long[][] procIds = new long[][] {{4, 7}, {1024, 1027}, {8192, 8194}}; - long[][] checkIds = new long[][] {{2, 8}, {1023, 1025}, {8193, 8191}}; - - ProcedureStoreTracker tracker = new ProcedureStoreTracker(); - for (int i = 0; i < procIds.length; ++i) { - long[] seq = procIds[i]; - tracker.insert(seq[0]); - tracker.insert(seq[1]); - } - - for (int i = 0; i < procIds.length; ++i) { - long[] check = checkIds[i]; - long[] seq = procIds[i]; - assertTrue(tracker.isTracking(seq[0], seq[1])); - assertTrue(tracker.isTracking(check[0], check[1])); - tracker.delete(seq[0]); - tracker.delete(seq[1]); - assertFalse(tracker.isTracking(seq[0], seq[1])); - assertFalse(tracker.isTracking(check[0], check[1])); - } - - assertTrue(tracker.isEmpty()); - } - - @Test public void testBasicCRUD() { ProcedureStoreTracker tracker = new ProcedureStoreTracker(); assertTrue(tracker.isEmpty()); @@ -287,64 +261,31 @@ public class TestProcedureStoreTracker { } @Test - public void testBitSetNodeSubtract() { - // 1 not updated in n2, nothing to subtract - BitSetNode n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ }); - BitSetNode n2 = buildBitSetNode(new long[]{ 1L }, new long[]{}, new long[]{}); - assertFalse(n1.subtract(n2)); - - // 1 updated in n2, and not deleted in n1, should subtract. - n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{}); - n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{}); - assertTrue(n1.subtract(n2)); - - // 1 updated in n2, but deleted in n1, should not subtract - n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L }); - n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{}); - assertFalse(n1.subtract(n2)); - - // 1 updated in n2, but not deleted in n1, should subtract. - n1 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{}); - n2 = buildBitSetNode(new long[]{ 1L }, new long[]{ 1L }, new long[]{ 1L }); - assertTrue(n1.subtract(n2)); - - // all four cases together. - n1 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L }, new long[]{ 0L, 10L, 20L, 30L }, - new long[]{ 20L }); - n2 = buildBitSetNode(new long[]{ 0L, 10L, 20L, 30L }, new long[]{ 0L, 20L, 30L }, - new long[]{ 0L }); - assertTrue(n1.subtract(n2)); - } + public void testSetDeletedIfSet() { + final ProcedureStoreTracker tracker = new ProcedureStoreTracker(); + final long[] procIds = new long[] { 1, 3, 7, 152, 512, 1024, 1025 }; - @Test - // The structure is same as testBitSetNodeSubtract() but the ids are bigger so that internally - // there are many BitSetNodes. - public void testTrackerSubtract() { - // not updated in n2, nothing to subtract - ProcedureStoreTracker n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, - new long[]{ }); - ProcedureStoreTracker n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{}, new long[]{}); - assertFalse(n1.subtract(n2)); - - // updated in n2, and not deleted in n1, should subtract. - n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{}); - n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{}); - assertTrue(n1.subtract(n2)); - - // updated in n2, but also deleted in n1, should not subtract - n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }); - n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{}); - assertFalse(n1.subtract(n2)); - - // updated in n2, but not deleted in n1, should subtract. - n1 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L, 1000L }, new long[]{}); - n2 = buildTracker(new long[]{ 1L, 1000L }, new long[]{ 1L }, new long[]{ 1L, 1000L }); - assertFalse(n1.subtract(n2)); - - n1 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 100L, 200L, 300L }, - new long[]{ 200L }); - n2 = buildTracker(new long[]{ 0L, 100L, 200L, 300L }, new long[]{ 0L, 200L, 300L }, - new long[]{ 0L }); - assertTrue(n1.subtract(n2)); + // test single proc + for (int i = 0; i < procIds.length; ++i) { + tracker.insert(procIds[i]); + } + assertEquals(false, tracker.isEmpty()); + + for (int i = 0; i < procIds.length; ++i) { + tracker.setDeletedIfSet(procIds[i] - 1); + tracker.setDeletedIfSet(procIds[i]); + tracker.setDeletedIfSet(procIds[i] + 1); + } + assertEquals(true, tracker.isEmpty()); + + // test batch + tracker.reset(); + for (int i = 0; i < procIds.length; ++i) { + tracker.insert(procIds[i]); + } + assertEquals(false, tracker.isEmpty()); + + tracker.setDeletedIfSet(procIds); + assertEquals(true, tracker.isEmpty()); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 7ecffa1..bf8f71b 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -126,13 +126,13 @@ public class TestWALProcedureStore { @Test public void testWalCleanerSequentialClean() throws Exception { - int NUM = 5; - List procs = new ArrayList<>(); + final Procedure[] procs = new Procedure[5]; ArrayList logs = null; + // Insert procedures and roll wal after every insert. - for (int i = 0; i < NUM; i++) { - procs.add(new TestSequentialProcedure()); - procStore.insert(procs.get(i), null); + for (int i = 0; i < procs.length; i++) { + procs[i] = new TestSequentialProcedure(); + procStore.insert(procs[i], null); procStore.rollWriterForTesting(); logs = procStore.getActiveLogs(); assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. @@ -140,12 +140,13 @@ public class TestWALProcedureStore { // Delete procedures in sequential order make sure that only the corresponding wal is deleted // from logs list. - int[] deleteOrder = new int[]{ 0, 1, 2, 3, 4}; + final int[] deleteOrder = new int[] { 0, 1, 2, 3, 4 }; for (int i = 0; i < deleteOrder.length; i++) { - procStore.delete(procs.get(deleteOrder[i]).getProcId()); + procStore.delete(procs[deleteOrder[i]].getProcId()); procStore.removeInactiveLogsForTesting(); - assertFalse(procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); - assertEquals(procStore.getActiveLogs().size(), NUM - i ); + assertFalse(logs.get(deleteOrder[i]).toString(), + procStore.getActiveLogs().contains(logs.get(deleteOrder[i]))); + assertEquals(procStore.getActiveLogs().size(), procs.length - i); } } @@ -154,30 +155,29 @@ public class TestWALProcedureStore { // they are in the starting of the list. @Test public void testWalCleanerNoHoles() throws Exception { - int NUM = 5; - List procs = new ArrayList<>(); + final Procedure[] procs = new Procedure[5]; ArrayList logs = null; // Insert procedures and roll wal after every insert. - for (int i = 0; i < NUM; i++) { - procs.add(new TestSequentialProcedure()); - procStore.insert(procs.get(i), null); + for (int i = 0; i < procs.length; i++) { + procs[i] = new TestSequentialProcedure(); + procStore.insert(procs[i], null); procStore.rollWriterForTesting(); logs = procStore.getActiveLogs(); assertEquals(logs.size(), i + 2); // Extra 1 for current ongoing wal. } - for (int i = 1; i < NUM; i++) { - procStore.delete(procs.get(i).getProcId()); + for (int i = 1; i < procs.length; i++) { + procStore.delete(procs[i].getProcId()); } - assertEquals(procStore.getActiveLogs().size(), NUM + 1); - procStore.delete(procs.get(0).getProcId()); + assertEquals(procStore.getActiveLogs().size(), procs.length + 1); + procStore.delete(procs[0].getProcId()); assertEquals(procStore.getActiveLogs().size(), 1); } @Test public void testWalCleanerUpdates() throws Exception { - TestSequentialProcedure p1 = new TestSequentialProcedure(), - p2 = new TestSequentialProcedure(); + TestSequentialProcedure p1 = new TestSequentialProcedure(); + TestSequentialProcedure p2 = new TestSequentialProcedure(); procStore.insert(p1, null); procStore.insert(p2, null); procStore.rollWriterForTesting(); @@ -192,8 +192,8 @@ public class TestWALProcedureStore { @Test public void testWalCleanerUpdatesDontLeaveHoles() throws Exception { - TestSequentialProcedure p1 = new TestSequentialProcedure(), - p2 = new TestSequentialProcedure(); + TestSequentialProcedure p1 = new TestSequentialProcedure(); + TestSequentialProcedure p2 = new TestSequentialProcedure(); procStore.insert(p1, null); procStore.insert(p2, null); procStore.rollWriterForTesting(); // generates first log with p1 + p2 @@ -660,7 +660,7 @@ public class TestWALProcedureStore { @Test public void testFileNotFoundDuringLeaseRecovery() throws IOException { - TestProcedure[] procs = new TestProcedure[3]; + final TestProcedure[] procs = new TestProcedure[3]; for (int i = 0; i < procs.length; ++i) { procs[i] = new TestProcedure(i + 1, 0); procStore.insert(procs[i], null); @@ -673,7 +673,7 @@ public class TestWALProcedureStore { procStore.stop(false); FileStatus[] status = fs.listStatus(logDir); - assertEquals(procs.length + 2, status.length); + assertEquals(procs.length + 1, status.length); // simulate another active master removing the wals procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir, @@ -696,7 +696,7 @@ public class TestWALProcedureStore { procStore.recoverLease(); procStore.load(loader); assertEquals(procs.length, loader.getMaxProcId()); - assertEquals(procs.length - 1, loader.getRunnableCount()); + assertEquals(1, loader.getRunnableCount()); assertEquals(0, loader.getCompletedCount()); assertEquals(0, loader.getCorruptedCount()); }