From 699b3fe7c2385523ed70e5c8fe927d846b09abba Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 30 Sep 2018 21:28:25 +0800 Subject: [PATCH] HBASE-21250 Refactor WALProcedureStore and add more comments for better understanding the implementation --- .../hbase/procedure2/store/BitSetNode.java | 397 ++++++++++++++ .../procedure2/store/NoopProcedureStore.java | 9 +- .../procedure2/store/ProcedureStore.java | 9 +- .../store/ProcedureStoreTracker.java | 493 +++--------------- .../CorruptedWALProcedureStoreException.java | 6 +- .../store/wal/ProcedureWALFile.java | 7 +- .../store/wal/ProcedureWALFormat.java | 38 +- .../store/wal/ProcedureWALFormatReader.java | 140 +++-- .../store/wal/WALProcedureStore.java | 206 ++++---- .../store/TestProcedureStoreTracker.java | 31 +- .../hadoop/hbase/HBaseTestingUtility.java | 1 + 11 files changed, 702 insertions(+), 635 deletions(-) create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java new file mode 100644 index 0000000000..b76c026d01 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store; + +import java.util.Arrays; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). + * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the range + * of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K is + * BITS_PER_WORD. + *

+ * We have two main bit sets to describe the state of procedures, the meanings are: + * + *

+ *  ----------------------
+ * | modified | deleted |  meaning
+ * |     0    |   0     |  proc exists, but hasn't been updated since last resetUpdates().
+ * |     1    |   0     |  proc was updated (but not deleted).
+ * |     1    |   1     |  proc was deleted.
+ * |     0    |   1     |  proc doesn't exist (maybe never created, maybe deleted in past).
+ * ----------------------
+ * 
+ * + * The meaning of modified is that, we have modified the state of the procedure, no matter insert, + * update, or delete. And if it is an insert or update, we will set the deleted to 0, if not we will + * set the delete to 1. + *

+ * For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the + * partial one, the initial modified value is 0 and the initial deleted value is also 0. In + * {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified. + */ +@InterfaceAudience.Private +class BitSetNode { + private static final long WORD_MASK = 0xffffffffffffffffL; + private static final int ADDRESS_BITS_PER_WORD = 6; + private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; + private static final int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; + + /** + * Mimics {@link ProcedureStoreTracker#partial}. It will effect how we fill the new deleted bits + * when growing. + */ + private boolean partial; + + /** + * Set of procedures which have been modified since last {@link #resetModified()}. Useful to track + * procedures which have been modified since last WAL write. + */ + private long[] modified; + + /** + * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. This + * represents global state since it's not reset on WAL rolls. + */ + private long[] deleted; + /** + * Offset of bitmap i.e. procedure id corresponding to first bit. + */ + private long start; + + public void dump() { + System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), getActiveMinProcId(), + getActiveMaxProcId()); + System.out.println("Modified:"); + for (int i = 0; i < modified.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((modified[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + System.out.println("Delete:"); + for (int i = 0; i < deleted.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + } + + public BitSetNode(long procId, boolean partial) { + start = alignDown(procId); + + int count = 1; + modified = new long[count]; + deleted = new long[count]; + if (!partial) { + Arrays.fill(deleted, WORD_MASK); + } + + this.partial = partial; + updateState(procId, false); + } + + public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { + start = data.getStartId(); + int size = data.getUpdatedCount(); + assert size == data.getDeletedCount(); + modified = new long[size]; + deleted = new long[size]; + for (int i = 0; i < size; ++i) { + modified[i] = data.getUpdated(i); + deleted[i] = data.getDeleted(i); + } + partial = false; + } + + public BitSetNode(BitSetNode other, boolean resetDelete) { + this.start = other.start; + this.partial = other.partial; + this.modified = other.modified.clone(); + // The resetDelete will be set to true when building cleanup tracker. + // The intention here is that, if a procedure is not modified in this tracker, then we do not + // need to take care of it, so we will set deleted to true for these bits, i.e, if modified is + // 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is + // deleted |= ~modified, i.e, + if (resetDelete) { + this.deleted = new long[other.deleted.length]; + for (int i = 0; i < this.deleted.length; ++i) { + this.deleted[i] |= ~(other.modified[i]); + } + } else { + this.deleted = other.deleted.clone(); + } + } + + public void insertOrUpdate(final long procId) { + updateState(procId, false); + } + + public void delete(final long procId) { + updateState(procId, true); + } + + public long getStart() { + return start; + } + + public long getEnd() { + return start + (modified.length << ADDRESS_BITS_PER_WORD) - 1; + } + + public boolean contains(final long procId) { + return start <= procId && procId <= getEnd(); + } + + public DeleteState isDeleted(final long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= deleted.length) { + return DeleteState.MAYBE; + } + return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; + } + + public boolean isModified(long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= modified.length) { + return false; + } + return (modified[wordIndex] & (1L << bitmapIndex)) != 0; + } + + /** + * @return true, if all the procedures has been modified. + */ + public boolean isAllModified() { + // TODO: cache the value + for (int i = 0; i < modified.length; ++i) { + if ((modified[i] | deleted[i]) != WORD_MASK) { + return false; + } + } + return true; + } + + /** + * @return true, if there are no active procedures in this BitSetNode, else false. + */ + public boolean isEmpty() { + // TODO: cache the value + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] != WORD_MASK) { + return false; + } + } + return true; + } + + public void resetModified() { + Arrays.fill(modified, 0); + } + + public void unsetPartialFlag() { + partial = false; + for (int i = 0; i < modified.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((modified[i] & (1L << j)) == 0) { + deleted[i] |= (1L << j); + } + } + } + } + + /** + * Convert to + * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode + * protobuf. + */ + public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { + ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = + ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); + builder.setStartId(start); + for (int i = 0; i < modified.length; ++i) { + builder.addUpdated(modified[i]); + builder.addDeleted(deleted[i]); + } + return builder.build(); + } + + // ======================================================================== + // Grow/Merge Helpers + // ======================================================================== + public boolean canGrow(final long procId) { + return Math.abs(procId - start) < MAX_NODE_SIZE; + } + + public boolean canMerge(final BitSetNode rightNode) { + // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. + assert start < rightNode.start; + return (rightNode.getEnd() - start) < MAX_NODE_SIZE; + } + + public void grow(final long procId) { + int delta, offset; + + if (procId < start) { + // add to head + long newStart = alignDown(procId); + delta = (int) (start - newStart) >> ADDRESS_BITS_PER_WORD; + offset = delta; + start = newStart; + } else { + // Add to tail + long newEnd = alignUp(procId + 1); + delta = (int) (newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; + offset = 0; + } + + long[] newBitmap; + int oldSize = modified.length; + + newBitmap = new long[oldSize + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = 0; + } + System.arraycopy(modified, 0, newBitmap, offset, oldSize); + modified = newBitmap; + + newBitmap = new long[deleted.length + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = partial ? 0 : WORD_MASK; + } + System.arraycopy(deleted, 0, newBitmap, offset, oldSize); + deleted = newBitmap; + } + + public void merge(final BitSetNode rightNode) { + int delta = (int) (rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; + + long[] newBitmap; + int oldSize = modified.length; + int newSize = (delta - rightNode.modified.length); + int offset = oldSize + newSize; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(modified, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.modified, 0, newBitmap, offset, rightNode.modified.length); + modified = newBitmap; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(deleted, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); + deleted = newBitmap; + + for (int i = 0; i < newSize; ++i) { + modified[offset + i] = 0; + deleted[offset + i] = partial ? 0 : WORD_MASK; + } + } + + @Override + public String toString() { + return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; + } + + // ======================================================================== + // Min/Max Helpers + // ======================================================================== + public long getActiveMinProcId() { + long minProcId = start; + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] == 0) { + return (minProcId); + } + + if (deleted[i] != WORD_MASK) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((deleted[i] & (1L << j)) != 0) { + return minProcId + j; + } + } + } + + minProcId += BITS_PER_WORD; + } + return minProcId; + } + + public long getActiveMaxProcId() { + long maxProcId = getEnd(); + for (int i = deleted.length - 1; i >= 0; --i) { + if (deleted[i] == 0) { + return maxProcId; + } + + if (deleted[i] != WORD_MASK) { + for (int j = BITS_PER_WORD - 1; j >= 0; --j) { + if ((deleted[i] & (1L << j)) == 0) { + return maxProcId - (BITS_PER_WORD - 1 - j); + } + } + } + maxProcId -= BITS_PER_WORD; + } + return maxProcId; + } + + // ======================================================================== + // Bitmap Helpers + // ======================================================================== + private int getBitmapIndex(final long procId) { + return (int) (procId - start); + } + + void updateState(long procId, boolean isDeleted) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + long value = (1L << bitmapIndex); + + modified[wordIndex] |= value; + if (isDeleted) { + deleted[wordIndex] |= value; + } else { + deleted[wordIndex] &= ~value; + } + } + + // ======================================================================== + // Helpers + // ======================================================================== + /** + * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ + private static long alignUp(final long x) { + return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; + } + + /** + * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ + private static long alignDown(final long x) { + return x & -BITS_PER_WORD; + } +} \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index 9c6176d4bb..8fbc1473ed 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store; import java.io.IOException; @@ -64,17 +63,17 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void insert(Procedure proc, Procedure[] subprocs) { + public void insert(Procedure proc, Procedure[] subprocs) { // no-op } @Override - public void insert(Procedure[] proc) { + public void insert(Procedure[] proc) { // no-op } @Override - public void update(Procedure proc) { + public void update(Procedure proc) { // no-op } @@ -84,7 +83,7 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void delete(Procedure proc, long[] subprocs) { + public void delete(Procedure proc, long[] subprocs) { // no-op } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 72883405d7..8063b125ba 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -81,6 +81,7 @@ public interface ProcedureStore { * @throws IOException if there was an error fetching/deserializing the procedure * @return the next procedure in the iteration. */ + @SuppressWarnings("rawtypes") Procedure next() throws IOException; } @@ -173,7 +174,7 @@ public interface ProcedureStore { * @param proc the procedure to serialize and write to the store. * @param subprocs the newly created child of the proc. */ - void insert(Procedure proc, Procedure[] subprocs); + void insert(Procedure proc, Procedure[] subprocs); /** * Serialize a set of new procedures. @@ -182,14 +183,14 @@ public interface ProcedureStore { * * @param procs the procedures to serialize and write to the store. */ - void insert(Procedure[] procs); + void insert(Procedure[] procs); /** * The specified procedure was executed, * and the new state should be written to the store. * @param proc the procedure to serialize and write to the store. */ - void update(Procedure proc); + void update(Procedure proc); /** * The specified procId was removed from the executor, @@ -205,7 +206,7 @@ public interface ProcedureStore { * @param parentProc the parent procedure to serialize and write to the store. * @param subProcIds the IDs of the sub-procedure to remove. */ - void delete(Procedure parentProc, long[] subProcIds); + void delete(Procedure parentProc, long[] subProcIds); /** * The specified procIds were removed from the executor, diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 2dad5ac72c..ee63db466d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -53,383 +53,14 @@ public class ProcedureStoreTracker { * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to * understand it's real use. */ - private boolean partial = false; + boolean partial = false; - private long minUpdatedProcId = Long.MAX_VALUE; - private long maxUpdatedProcId = Long.MIN_VALUE; + private long minModifiedProcId = Long.MAX_VALUE; + private long maxModifiedProcId = Long.MIN_VALUE; public enum DeleteState { YES, NO, MAYBE } - /** - * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). - * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the - * range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K - * is BITS_PER_WORD. - */ - public static class BitSetNode { - private final static long WORD_MASK = 0xffffffffffffffffL; - private final static int ADDRESS_BITS_PER_WORD = 6; - private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; - private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; - - /** - * Mimics {@link ProcedureStoreTracker#partial}. - */ - private final boolean partial; - - /* ---------------------- - * | updated | deleted | meaning - * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates(). - * | 1 | 0 | proc was updated (but not deleted). - * | 1 | 1 | proc was deleted. - * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past). - /* ---------------------- - */ - - /** - * Set of procedures which have been updated since last {@link #resetUpdates()}. - * Useful to track procedures which have been updated since last WAL write. - */ - private long[] updated; - /** - * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. - * This represents global state since it's not reset on WAL rolls. - */ - private long[] deleted; - /** - * Offset of bitmap i.e. procedure id corresponding to first bit. - */ - private long start; - - public void dump() { - System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), - getActiveMinProcId(), getActiveMaxProcId()); - System.out.println("Update:"); - for (int i = 0; i < updated.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0"); - } - System.out.println(" " + i); - } - System.out.println(); - System.out.println("Delete:"); - for (int i = 0; i < deleted.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); - } - System.out.println(" " + i); - } - System.out.println(); - } - - public BitSetNode(final long procId, final boolean partial) { - start = alignDown(procId); - - int count = 1; - updated = new long[count]; - deleted = new long[count]; - for (int i = 0; i < count; ++i) { - updated[i] = 0; - deleted[i] = partial ? 0 : WORD_MASK; - } - - this.partial = partial; - updateState(procId, false); - } - - protected BitSetNode(final long start, final long[] updated, final long[] deleted) { - this.start = start; - this.updated = updated; - this.deleted = deleted; - this.partial = false; - } - - public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { - start = data.getStartId(); - int size = data.getUpdatedCount(); - updated = new long[size]; - deleted = new long[size]; - for (int i = 0; i < size; ++i) { - updated[i] = data.getUpdated(i); - deleted[i] = data.getDeleted(i); - } - partial = false; - } - - public BitSetNode(final BitSetNode other, final boolean resetDelete) { - this.start = other.start; - this.partial = other.partial; - this.updated = other.updated.clone(); - if (resetDelete) { - this.deleted = new long[other.deleted.length]; - for (int i = 0; i < this.deleted.length; ++i) { - this.deleted[i] = ~(other.updated[i]); - } - } else { - this.deleted = other.deleted.clone(); - } - } - - public void update(final long procId) { - updateState(procId, false); - } - - public void delete(final long procId) { - updateState(procId, true); - } - - public long getStart() { - return start; - } - - public long getEnd() { - return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1; - } - - public boolean contains(final long procId) { - return start <= procId && procId <= getEnd(); - } - - public DeleteState isDeleted(final long procId) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - if (wordIndex >= deleted.length) { - return DeleteState.MAYBE; - } - return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; - } - - private boolean isUpdated(final long procId) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - if (wordIndex >= updated.length) { - return false; - } - return (updated[wordIndex] & (1L << bitmapIndex)) != 0; - } - - public boolean isUpdated() { - // TODO: cache the value - for (int i = 0; i < updated.length; ++i) { - if ((updated[i] | deleted[i]) != WORD_MASK) { - return false; - } - } - return true; - } - - /** - * @return true, if there are no active procedures in this BitSetNode, else false. - */ - public boolean isEmpty() { - // TODO: cache the value - for (int i = 0; i < deleted.length; ++i) { - if (deleted[i] != WORD_MASK) { - return false; - } - } - return true; - } - - public void resetUpdates() { - for (int i = 0; i < updated.length; ++i) { - updated[i] = 0; - } - } - - /** - * Clears the {@link #deleted} bitmaps. - */ - public void undeleteAll() { - for (int i = 0; i < updated.length; ++i) { - deleted[i] = 0; - } - } - - public void unsetPartialFlag() { - for (int i = 0; i < updated.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - if ((updated[i] & (1L << j)) == 0) { - deleted[i] |= (1L << j); - } - } - } - } - - /** - * Convert to - * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode - * protobuf. - */ - public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { - ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = - ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); - builder.setStartId(start); - for (int i = 0; i < updated.length; ++i) { - builder.addUpdated(updated[i]); - builder.addDeleted(deleted[i]); - } - return builder.build(); - } - - // ======================================================================== - // Grow/Merge Helpers - // ======================================================================== - public boolean canGrow(final long procId) { - return Math.abs(procId - start) < MAX_NODE_SIZE; - } - - public boolean canMerge(final BitSetNode rightNode) { - // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. - assert start < rightNode.start; - return (rightNode.getEnd() - start) < MAX_NODE_SIZE; - } - - public void grow(final long procId) { - int delta, offset; - - if (procId < start) { - // add to head - long newStart = alignDown(procId); - delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD; - offset = delta; - start = newStart; - } else { - // Add to tail - long newEnd = alignUp(procId + 1); - delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; - offset = 0; - } - - long[] newBitmap; - int oldSize = updated.length; - - newBitmap = new long[oldSize + delta]; - for (int i = 0; i < newBitmap.length; ++i) { - newBitmap[i] = 0; - } - System.arraycopy(updated, 0, newBitmap, offset, oldSize); - updated = newBitmap; - - newBitmap = new long[deleted.length + delta]; - for (int i = 0; i < newBitmap.length; ++i) { - newBitmap[i] = partial ? 0 : WORD_MASK; - } - System.arraycopy(deleted, 0, newBitmap, offset, oldSize); - deleted = newBitmap; - } - - public void merge(final BitSetNode rightNode) { - int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; - - long[] newBitmap; - int oldSize = updated.length; - int newSize = (delta - rightNode.updated.length); - int offset = oldSize + newSize; - - newBitmap = new long[oldSize + delta]; - System.arraycopy(updated, 0, newBitmap, 0, oldSize); - System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length); - updated = newBitmap; - - newBitmap = new long[oldSize + delta]; - System.arraycopy(deleted, 0, newBitmap, 0, oldSize); - System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); - deleted = newBitmap; - - for (int i = 0; i < newSize; ++i) { - updated[offset + i] = 0; - deleted[offset + i] = partial ? 0 : WORD_MASK; - } - } - - @Override - public String toString() { - return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; - } - - // ======================================================================== - // Min/Max Helpers - // ======================================================================== - public long getActiveMinProcId() { - long minProcId = start; - for (int i = 0; i < deleted.length; ++i) { - if (deleted[i] == 0) { - return(minProcId); - } - - if (deleted[i] != WORD_MASK) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - if ((deleted[i] & (1L << j)) != 0) { - return minProcId + j; - } - } - } - - minProcId += BITS_PER_WORD; - } - return minProcId; - } - - public long getActiveMaxProcId() { - long maxProcId = getEnd(); - for (int i = deleted.length - 1; i >= 0; --i) { - if (deleted[i] == 0) { - return maxProcId; - } - - if (deleted[i] != WORD_MASK) { - for (int j = BITS_PER_WORD - 1; j >= 0; --j) { - if ((deleted[i] & (1L << j)) == 0) { - return maxProcId - (BITS_PER_WORD - 1 - j); - } - } - } - maxProcId -= BITS_PER_WORD; - } - return maxProcId; - } - - // ======================================================================== - // Bitmap Helpers - // ======================================================================== - private int getBitmapIndex(final long procId) { - return (int)(procId - start); - } - - private void updateState(final long procId, final boolean isDeleted) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - long value = (1L << bitmapIndex); - - updated[wordIndex] |= value; - if (isDeleted) { - deleted[wordIndex] |= value; - } else { - deleted[wordIndex] &= ~value; - } - } - - - // ======================================================================== - // Helpers - // ======================================================================== - /** - * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. - */ - private static long alignUp(final long x) { - return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; - } - - /** - * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. - */ - private static long alignDown(final long x) { - return x & -BITS_PER_WORD; - } - } - - public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { + public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { reset(); for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) { final BitSetNode node = new BitSetNode(protoNode); @@ -440,14 +71,23 @@ public class ProcedureStoreTracker { /** * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. */ - public void resetTo(final ProcedureStoreTracker tracker) { + public void resetTo(ProcedureStoreTracker tracker) { resetTo(tracker, false); } - public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) { + /** + * Resets internal state to same as given {@code tracker}, and change the deleted flag according + * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap. + *

+ * The {@code resetDelete} will be set to true when building cleanup tracker, please see the + * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the + * deleted flag if {@code resetDelete} is true. + */ + public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { + reset(); this.partial = tracker.partial; - this.minUpdatedProcId = tracker.minUpdatedProcId; - this.maxUpdatedProcId = tracker.maxUpdatedProcId; + this.minModifiedProcId = tracker.minModifiedProcId; + this.maxModifiedProcId = tracker.maxModifiedProcId; this.keepDeletes = tracker.keepDeletes; for (Map.Entry entry : tracker.map.entrySet()) { map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); @@ -458,25 +98,24 @@ public class ProcedureStoreTracker { insert(null, procId); } - public void insert(final long[] procIds) { + public void insert(long[] procIds) { for (int i = 0; i < procIds.length; ++i) { insert(procIds[i]); } } - public void insert(final long procId, final long[] subProcIds) { - BitSetNode node = null; - node = update(node, procId); + public void insert(long procId, long[] subProcIds) { + BitSetNode node = update(null, procId); for (int i = 0; i < subProcIds.length; ++i) { node = insert(node, subProcIds[i]); } } - private BitSetNode insert(BitSetNode node, final long procId) { + private BitSetNode insert(BitSetNode node, long procId) { if (node == null || !node.contains(procId)) { node = getOrCreateNode(procId); } - node.update(procId); + node.insertOrUpdate(procId); trackProcIds(procId); return node; } @@ -485,11 +124,11 @@ public class ProcedureStoreTracker { update(null, procId); } - private BitSetNode update(BitSetNode node, final long procId) { + private BitSetNode update(BitSetNode node, long procId) { node = lookupClosestNode(node, procId); assert node != null : "expected node to update procId=" + procId; assert node.contains(procId) : "expected procId=" + procId + " in the node"; - node.update(procId); + node.insertOrUpdate(procId); trackProcIds(procId); return node; } @@ -506,7 +145,7 @@ public class ProcedureStoreTracker { } } - private BitSetNode delete(BitSetNode node, final long procId) { + private BitSetNode delete(BitSetNode node, long procId) { node = lookupClosestNode(node, procId); assert node != null : "expected node to delete procId=" + procId; assert node.contains(procId) : "expected procId=" + procId + " in the node"; @@ -520,34 +159,58 @@ public class ProcedureStoreTracker { return node; } - @InterfaceAudience.Private - public void setDeleted(final long procId, final boolean isDeleted) { + /** + * Will be called when restarting where we need to rebuild the ProcedureStoreTracker. + */ + public void setMinMaxModifiedProcIds(long min, long max) { + this.minModifiedProcId = min; + this.maxModifiedProcId = max; + } + /** + * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The + * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart + * this is not true, as we will read the wal files in reverse order so a delete may come first. + */ + public void setDeleted(long procId, boolean isDeleted) { BitSetNode node = getOrCreateNode(procId); assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; node.updateState(procId, isDeleted); trackProcIds(procId); } - public void setDeletedIfSet(final long... procId) { + /** + * Set the given bit for the procId to delete if it was modified before. + *

+ * This method is used to test whether a procedure wal file can be safely deleted, as if all the + * procedures in the given procedure wal file has been modified in the new procedure wal files, + * then we can delete it. + */ + public void setDeletedIfModified(long... procId) { BitSetNode node = null; for (int i = 0; i < procId.length; ++i) { node = lookupClosestNode(node, procId[i]); - if (node != null && node.isUpdated(procId[i])) { + if (node != null && node.isModified(procId[i])) { node.delete(procId[i]); } } } - public void setDeletedIfSet(final ProcedureStoreTracker tracker) { + /** + * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by + * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, + * then we mark it as deleted. + * @see setDeletedIfModified + */ + public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { BitSetNode trackerNode = null; - for (BitSetNode node: map.values()) { + for (BitSetNode node : map.values()) { final long minProcId = node.getStart(); final long maxProcId = node.getEnd(); for (long procId = minProcId; procId <= maxProcId; ++procId) { - if (!node.isUpdated(procId)) continue; + if (!node.isModified(procId)) continue; trackerNode = tracker.lookupClosestNode(trackerNode, procId); - if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) { + if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isModified(procId)) { // the procedure was removed or updated node.delete(procId); } @@ -568,28 +231,28 @@ public class ProcedureStoreTracker { } private void trackProcIds(long procId) { - minUpdatedProcId = Math.min(minUpdatedProcId, procId); - maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); } - public long getUpdatedMinProcId() { - return minUpdatedProcId; + public long getModifiedMinProcId() { + return minModifiedProcId; } - public long getUpdatedMaxProcId() { - return maxUpdatedProcId; + public long getModifiedMaxProcId() { + return maxModifiedProcId; } public void reset() { this.keepDeletes = false; this.partial = false; this.map.clear(); - resetUpdates(); + resetModified(); } public boolean isUpdated(long procId) { final Map.Entry entry = map.floorEntry(procId); - return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId); + return entry != null && entry.getValue().contains(procId) && entry.getValue().isModified(procId); } /** @@ -604,7 +267,7 @@ public class ProcedureStoreTracker { if (entry != null && entry.getValue().contains(procId)) { BitSetNode node = entry.getValue(); DeleteState state = node.isDeleted(procId); - return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state; + return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; } return partial ? DeleteState.MAYBE : DeleteState.YES; } @@ -656,11 +319,11 @@ public class ProcedureStoreTracker { } /** - * @return true if any procedure was updated since last call to {@link #resetUpdates()}. + * @return true if all procedure was updated or deleted since last call to {@link #resetModified()}. */ - public boolean isUpdated() { + public boolean isAllUpdated() { for (Map.Entry entry : map.entrySet()) { - if (!entry.getValue().isUpdated()) { + if (!entry.getValue().isAllModified()) { return false; } } @@ -671,21 +334,15 @@ public class ProcedureStoreTracker { * Clears the list of updated procedure ids. This doesn't affect global list of active * procedure ids. */ - public void resetUpdates() { - for (Map.Entry entry : map.entrySet()) { - entry.getValue().resetUpdates(); - } - minUpdatedProcId = Long.MAX_VALUE; - maxUpdatedProcId = Long.MIN_VALUE; - } - - public void undeleteAll() { + public void resetModified() { for (Map.Entry entry : map.entrySet()) { - entry.getValue().undeleteAll(); + entry.getValue().resetModified(); } + minModifiedProcId = Long.MAX_VALUE; + maxModifiedProcId = Long.MIN_VALUE; } - private BitSetNode getOrCreateNode(final long procId) { + private BitSetNode getOrCreateNode(long procId) { // If procId can fit in left node (directly or by growing it) BitSetNode leftNode = null; boolean leftCanGrow = false; @@ -760,7 +417,7 @@ public class ProcedureStoreTracker { public void dump() { System.out.println("map " + map.size()); - System.out.println("isUpdated " + isUpdated()); + System.out.println("isUpdated " + isAllUpdated()); System.out.println("isEmpty " + isEmpty()); for (Map.Entry entry : map.entrySet()) { entry.getValue().dump(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java index dd34896ebb..ba4480fca7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java @@ -15,19 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; /** * Thrown when a procedure WAL is corrupted */ @InterfaceAudience.Private -@InterfaceStability.Stable public class CorruptedWALProcedureStoreException extends HBaseIOException { + + private static final long serialVersionUID = -3407300445435898074L; + /** default constructor */ public CorruptedWALProcedureStoreException() { super(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 6226350a47..16767446e1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -15,20 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; @@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Describes a WAL File */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class ProcedureWALFile implements Comparable { private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index ac3a52941e..c9986edc90 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -18,25 +18,22 @@ package org.apache.hadoop.hbase.procedure2.store.wal; -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Iterator; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; @@ -45,9 +42,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Helper class that contains the WAL serialization utils. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public final class ProcedureWALFormat { - private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormat.class); static final byte LOG_TYPE_STREAM = 0; static final byte LOG_TYPE_COMPACTED = 1; @@ -60,6 +55,9 @@ public final class ProcedureWALFormat { @InterfaceAudience.Private public static class InvalidWALDataException extends IOException { + + private static final long serialVersionUID = 5471733223070202196L; + public InvalidWALDataException(String s) { super(s); } @@ -75,9 +73,9 @@ public final class ProcedureWALFormat { private ProcedureWALFormat() {} - public static void load(final Iterator logs, - final ProcedureStoreTracker tracker, final Loader loader) throws IOException { - final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); + public static void load(Iterator logs, ProcedureStoreTracker tracker, + Loader loader) throws IOException { + ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); tracker.setKeepDeletes(true); try { // Ignore the last log which is current active log. @@ -93,8 +91,10 @@ public final class ProcedureWALFormat { reader.finish(); // The tracker is now updated with all the procedures read from the logs - tracker.setPartialFlag(false); - tracker.resetUpdates(); + if (tracker.isPartial()) { + tracker.setPartialFlag(false); + } + tracker.resetModified(); } finally { tracker.setKeepDeletes(false); } @@ -205,7 +205,7 @@ public final class ProcedureWALFormat { } public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, - Procedure proc, Procedure[] subprocs) throws IOException { + Procedure proc, Procedure[] subprocs) throws IOException { final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); builder.setType(type); builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); @@ -217,17 +217,17 @@ public final class ProcedureWALFormat { builder.build().writeDelimitedTo(slot); } - public static void writeInsert(ByteSlot slot, Procedure proc) + public static void writeInsert(ByteSlot slot, Procedure proc) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); } - public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs) + public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); } - public static void writeUpdate(ByteSlot slot, Procedure proc) + public static void writeUpdate(ByteSlot slot, Procedure proc) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); } @@ -240,7 +240,7 @@ public final class ProcedureWALFormat { builder.build().writeDelimitedTo(slot); } - public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs) + public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs) throws IOException { final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 4ab70f18e1..4dd2565b58 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -111,33 +111,31 @@ public class ProcedureWALFormatReader { * to rebuild the tracker. */ private final ProcedureStoreTracker tracker; - // TODO: private final boolean hasFastStartSupport; /** - * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we - * re-build the list of procedures updated in that WAL because we need it for log cleaning - * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted. - * (see {@link WALProcedureStore#removeInactiveLogs()}). - * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother - * re-building it. + * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build + * the list of procedures modified in that WAL because we need it for log cleaning purposes. If + * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see + * {@link WALProcedureStore#removeInactiveLogs()}). + *

+ * Notice that, the deleted part for this tracker will not be global valid as we can only count + * the deletes in the current file, but it is not big problem as finally, the above tracker will + * have the global state of deleted, and it will also be used to build the cleanup tracker. */ private ProcedureStoreTracker localTracker; - // private long compactionLogId; private long maxProcId = 0; public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, ProcedureWALFormat.Loader loader) { this.tracker = tracker; this.loader = loader; - // we support fast-start only if we have a clean shutdown. - // this.hasFastStartSupport = !tracker.isEmpty(); } - public void read(final ProcedureWALFile log) throws IOException { - localTracker = log.getTracker().isPartial() ? log.getTracker() : null; - if (localTracker != null) { - LOG.info("Rebuilding tracker for " + log); + public void read(ProcedureWALFile log) throws IOException { + localTracker = log.getTracker(); + if (localTracker.isPartial()) { + LOG.info("Rebuilding tracker for {}", log); } long count = 0; @@ -147,7 +145,7 @@ public class ProcedureWALFormatReader { while (hasMore) { ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); if (entry == null) { - LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log); + LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log); break; } count++; @@ -178,21 +176,17 @@ public class ProcedureWALFormatReader { loader.markCorruptedWAL(log, e); } - if (localTracker != null) { - localTracker.setPartialFlag(false); - } if (!localProcedureMap.isEmpty()) { - log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); + log.setProcIds(localProcedureMap.getMinModifiedProcId(), + localProcedureMap.getMaxModifiedProcId()); + if (localTracker.isPartial()) { + localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(), + localProcedureMap.getMaxModifiedProcId()); + } procedureMap.mergeTail(localProcedureMap); - - //if (hasFastStartSupport) { - // TODO: Some procedure may be already runnables (see readInitEntry()) - // (we can also check the "update map" in the log trackers) - // -------------------------------------------------- - //EntryIterator iter = procedureMap.fetchReady(); - //if (iter != null) loader.load(iter); - // -------------------------------------------------- - //} + } + if (localTracker.isPartial()) { + localTracker.setPartialFlag(false); } } @@ -202,37 +196,46 @@ public class ProcedureWALFormatReader { // fetch the procedure ready to run. ProcedureIterator procIter = procedureMap.fetchReady(); - if (procIter != null) loader.load(procIter); + if (procIter != null) { + loader.load(procIter); + } // remaining procedures have missing link or dependencies // consider them as corrupted, manual fix is probably required. procIter = procedureMap.fetchAll(); - if (procIter != null) loader.handleCorrupted(procIter); + if (procIter != null) { + loader.handleCorrupted(procIter); + } + } + + private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) { + if (tracker.isPartial()) { + tracker.setDeleted(procId, true); + } } - private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) { + private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) { + if (tracker.isPartial()) { + tracker.insert(proc.getProcId()); + } + } + + private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) { maxProcId = Math.max(maxProcId, proc.getProcId()); if (isRequired(proc.getProcId())) { - if (LOG.isTraceEnabled()) { - LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId()); - } + LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId()); localProcedureMap.add(proc); - if (tracker.isPartial()) { - tracker.insert(proc.getProcId()); - } - } - if (localTracker != null) { - localTracker.insert(proc.getProcId()); + insertIfPartial(tracker, proc); } + insertIfPartial(localTracker, proc); } - private void readInitEntry(final ProcedureWALEntry entry) - throws IOException { + private void readInitEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; loadProcedure(entry, entry.getProcedure(0)); } - private void readInsertEntry(final ProcedureWALEntry entry) throws IOException { + private void readInsertEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; loadProcedure(entry, entry.getProcedure(0)); for (int i = 1; i < entry.getProcedureCount(); ++i) { @@ -240,12 +243,12 @@ public class ProcedureWALFormatReader { } } - private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException { + private void readUpdateEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; loadProcedure(entry, entry.getProcedure(0)); } - private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException { + private void readDeleteEntry(ProcedureWALEntry entry) { assert entry.hasProcId() : "expected ProcID"; if (entry.getChildIdCount() > 0) { @@ -267,26 +270,19 @@ public class ProcedureWALFormatReader { } private void deleteEntry(final long procId) { - if (LOG.isTraceEnabled()) { - LOG.trace("delete entry " + procId); - } + LOG.trace("delete entry {}", procId); maxProcId = Math.max(maxProcId, procId); localProcedureMap.remove(procId); assert !procedureMap.contains(procId); - if (tracker.isPartial()) { - tracker.setDeleted(procId, true); - } - if (localTracker != null) { - // In case there is only delete entry for this procedure in current log. - localTracker.setDeleted(procId, true); - } + setDeletedIfPartial(tracker, procId); + setDeletedIfPartial(localTracker, procId); } - private boolean isDeleted(final long procId) { + private boolean isDeleted(long procId) { return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; } - private boolean isRequired(final long procId) { + private boolean isRequired(long procId) { return !isDeleted(procId) && !procedureMap.contains(procId); } @@ -318,7 +314,7 @@ public class ProcedureWALFormatReader { protected Entry replayNext; protected Entry replayPrev; // procedure-infos - protected Procedure procedure; + protected Procedure procedure; protected ProcedureProtos.Procedure proto; protected boolean ready = false; @@ -357,7 +353,7 @@ public class ProcedureWALFormatReader { return false; } - public Procedure convert() throws IOException { + public Procedure convert() throws IOException { if (procedure == null) { procedure = ProcedureUtil.convertToProcedure(proto); } @@ -408,7 +404,7 @@ public class ProcedureWALFormatReader { } @Override - public Procedure next() throws IOException { + public Procedure next() throws IOException { try { return current.convert(); } finally { @@ -432,8 +428,8 @@ public class ProcedureWALFormatReader { private Entry childUnlinkedHead; // Track ProcId range - private long minProcId = Long.MAX_VALUE; - private long maxProcId = Long.MIN_VALUE; + private long minModifiedProcId = Long.MAX_VALUE; + private long maxModifiedProcId = Long.MIN_VALUE; public WalProcedureMap(int size) { procedureMap = new Entry[size]; @@ -496,16 +492,16 @@ public class ProcedureWALFormatReader { } private void trackProcIds(long procId) { - minProcId = Math.min(minProcId, procId); - maxProcId = Math.max(maxProcId, procId); + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); } - public long getMinProcId() { - return minProcId; + public long getMinModifiedProcId() { + return minModifiedProcId; } - public long getMaxProcId() { - return maxProcId; + public long getMaxModifiedProcId() { + return maxModifiedProcId; } public boolean contains(long procId) { @@ -524,8 +520,8 @@ public class ProcedureWALFormatReader { replayOrderTail = null; rootHead = null; childUnlinkedHead = null; - minProcId = Long.MAX_VALUE; - maxProcId = Long.MIN_VALUE; + minModifiedProcId = Long.MAX_VALUE; + maxModifiedProcId = Long.MIN_VALUE; } /* @@ -578,8 +574,8 @@ public class ProcedureWALFormatReader { childUnlinkedHead = other.childUnlinkedHead; } } - maxProcId = Math.max(maxProcId, other.maxProcId); - minProcId = Math.max(minProcId, other.minProcId); + maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId); + minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId); other.clear(); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 7d5d6d2448..221d4779e4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -15,14 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -35,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -52,25 +49,60 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; /** * WAL implementation of the ProcedureStore. + *

+ * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, + * then {@link #load(ProcedureLoader)}. + *

+ * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by + * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all + * the old wal files. + *

+ * FIXME: notice that the current recover lease implementation is problematic, it can not deal with + * the races if there are two master both wants to acquire the lease... + *

+ * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the + * comments of this method for more details. + *

+ * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is + * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete + * methods we will put thing into the {@link #slots} and wait. And there is a background sync + * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them + * to the FileSystem, and notify the caller that we have finished. + *

+ * TODO: try using disruptor to increase performance and simplify the logic? + *

+ * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is + * also the one being written currently. And the deleted bits in it are for all the procedures, not + * only the ones in the newest wal file. And when rolling a log, we will first store it in the + * trailer of the current wal file, and then reset its modified bits, so that it can start to track + * the modified procedures for the new wal file. + *

+ * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal + * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It + * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the + * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it + * with the tracker of every newer wal files, using the + * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out + * that all the modified procedures for the oldest wal file are modified or deleted in newer wal + * files, then we can delete the it. * @see ProcedureWALPrettyPrinter for printing content of a single WAL. * @see #main(String[]) to parse a directory of MasterWALProcs. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class WALProcedureStore extends ProcedureStoreBase { private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); public static final String LOG_PREFIX = "pv2-"; @@ -166,7 +198,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private int syncWaitMsec; // Variables used for UI display - private CircularFifoQueue syncMetricsQueue; + private CircularFifoQueue syncMetricsQueue; public static class SyncMetrics { private long timestamp; @@ -228,11 +260,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // Create archive dir up front. Rename won't work w/o it up on HDFS. if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { if (this.fs.mkdirs(this.walArchiveDir)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir); - } + LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); } else { - LOG.warn("Failed create of " + this.walArchiveDir); + LOG.warn("Failed create of {}", this.walArchiveDir); } } } @@ -248,7 +278,7 @@ public class WALProcedureStore extends ProcedureStoreBase { runningProcCount = numSlots; syncMaxSlot = numSlots; slots = new ByteSlot[numSlots]; - slotsCache = new LinkedTransferQueue(); + slotsCache = new LinkedTransferQueue<>(); while (slotsCache.size() < numSlots) { slotsCache.offer(new ByteSlot()); } @@ -267,7 +297,7 @@ public class WALProcedureStore extends ProcedureStoreBase { useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); // WebUI - syncMetricsQueue = new CircularFifoQueue( + syncMetricsQueue = new CircularFifoQueue<>( conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); // Init sync thread @@ -394,9 +424,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // We have the lease on the log oldLogs = getLogFiles(); if (getMaxLogId(oldLogs) > flushLogId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); - } + LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); logs.getLast().removeFile(this.walArchiveDir); continue; } @@ -410,7 +438,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void load(final ProcedureLoader loader) throws IOException { + public void load(ProcedureLoader loader) throws IOException { lock.lock(); try { if (logs.isEmpty()) { @@ -425,7 +453,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Load the old logs - final Iterator it = logs.descendingIterator(); + Iterator it = logs.descendingIterator(); it.next(); // Skip the current log ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { @@ -485,7 +513,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void insert(final Procedure proc, final Procedure[] subprocs) { + public void insert(Procedure proc, Procedure[] subprocs) { if (LOG.isTraceEnabled()) { LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); } @@ -519,7 +547,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void insert(final Procedure[] procs) { + public void insert(Procedure[] procs) { if (LOG.isTraceEnabled()) { LOG.trace("Insert " + Arrays.toString(procs)); } @@ -548,7 +576,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void update(final Procedure proc) { + public void update(Procedure proc) { if (LOG.isTraceEnabled()) { LOG.trace("Update " + proc); } @@ -571,11 +599,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void delete(final long procId) { - if (LOG.isTraceEnabled()) { - LOG.trace("Delete " + procId); - } - + public void delete(long procId) { + LOG.trace("Delete {}", procId); ByteSlot slot = acquireSlot(); try { // Serialize the delete @@ -594,7 +619,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void delete(final Procedure proc, final long[] subProcIds) { + public void delete(Procedure proc, long[] subProcIds) { assert proc != null : "expected a non-null procedure"; assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; if (LOG.isTraceEnabled()) { @@ -630,7 +655,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private void delete(final long[] procIds) { + private void delete(long[] procIds) { if (LOG.isTraceEnabled()) { LOG.trace("Delete " + Arrays.toString(procIds)); } @@ -736,20 +761,20 @@ public class WALProcedureStore extends ProcedureStoreBase { storeTracker.insert(subProcIds); } else { storeTracker.insert(procId, subProcIds); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); } break; case UPDATE: storeTracker.update(procId); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); break; case DELETE: if (subProcIds != null && subProcIds.length > 0) { storeTracker.delete(subProcIds); - holdingCleanupTracker.setDeletedIfSet(subProcIds); + holdingCleanupTracker.setDeletedIfModified(subProcIds); } else { storeTracker.delete(procId); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); } break; default: @@ -973,16 +998,12 @@ public class WALProcedureStore extends ProcedureStoreBase { private void periodicRoll() throws IOException { if (storeTracker.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("no active procedures"); - } + LOG.trace("no active procedures"); tryRollWriter(); removeAllLogs(flushLogId - 1); } else { - if (storeTracker.isUpdated()) { - if (LOG.isTraceEnabled()) { - LOG.trace("all the active procedures are in the latest log"); - } + if (storeTracker.isAllUpdated()) { + LOG.trace("all the active procedures are in the latest log"); removeAllLogs(flushLogId - 1); } @@ -997,18 +1018,20 @@ public class WALProcedureStore extends ProcedureStoreBase { } private boolean rollWriter() throws IOException { - if (!isRunning()) return false; + if (!isRunning()) { + return false; + } // Create new state-log if (!rollWriter(flushLogId + 1)) { - LOG.warn("someone else has already created log " + flushLogId); + LOG.warn("someone else has already created log {}", flushLogId); return false; } // We have the lease on the log, // but we should check if someone else has created new files if (getMaxLogId(getLogFiles()) > flushLogId) { - LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId); + LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); logs.getLast().removeFile(this.walArchiveDir); return false; } @@ -1064,7 +1087,7 @@ public class WALProcedureStore extends ProcedureStoreBase { closeCurrentLogStream(); - storeTracker.resetUpdates(); + storeTracker.resetModified(); stream = newStream; flushLogId = logId; totalSynced.set(0); @@ -1092,12 +1115,12 @@ public class WALProcedureStore extends ProcedureStoreBase { try { ProcedureWALFile log = logs.getLast(); - log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); + log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); log.updateLocalTracker(storeTracker); long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); log.addToSize(trailerSize); } catch (IOException e) { - LOG.warn("Unable to write the trailer: " + e.getMessage()); + LOG.warn("Unable to write the trailer", e); } try { stream.close(); @@ -1134,9 +1157,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // - the other WALs are scanned to remove procs already in other wals. // TODO: exit early if holdingCleanupTracker.isEmpty() holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); - holdingCleanupTracker.setDeletedIfSet(storeTracker); + holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker); for (int i = 1, size = logs.size() - 1; i < size; ++i) { - holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker()); + holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker()); } } @@ -1144,12 +1167,12 @@ public class WALProcedureStore extends ProcedureStoreBase { * Remove all logs with logId <= {@code lastLogId}. */ private void removeAllLogs(long lastLogId) { - if (logs.size() <= 1) return; - - if (LOG.isTraceEnabled()) { - LOG.trace("Remove all state logs with ID less than " + lastLogId); + if (logs.size() <= 1) { + return; } + LOG.trace("Remove all state logs with ID less than {}", lastLogId); + boolean removed = false; while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); @@ -1167,14 +1190,10 @@ public class WALProcedureStore extends ProcedureStoreBase { private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { try { - if (LOG.isTraceEnabled()) { - LOG.trace("Removing log=" + log); - } + LOG.trace("Removing log={}", log); log.removeFile(walArchiveDir); logs.remove(log); - if (LOG.isDebugEnabled()) { - LOG.info("Removed log=" + log + ", activeLogs=" + logs); - } + LOG.debug("Removed log={}, activeLogs={}", log, logs); assert logs.size() > 0 : "expected at least one log"; } catch (IOException e) { LOG.error("Unable to remove log: " + log, e); @@ -1238,50 +1257,53 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private static long getMaxLogId(final FileStatus[] logFiles) { - long maxLogId = 0; - if (logFiles != null && logFiles.length > 0) { - for (int i = 0; i < logFiles.length; ++i) { - maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName())); - } + /** + * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort + * the file set by log id. + * @return Max-LogID of the specified log file set + */ + private static long getMaxLogId(FileStatus[] logFiles) { + if (logFiles == null || logFiles.length == 0) { + return 0L; } - return maxLogId; + return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); } /** + * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort + * the file set by log id. * @return Max-LogID of the specified log file set */ - private long initOldLogs(final FileStatus[] logFiles) throws IOException { - this.logs.clear(); - + private long initOldLogs(FileStatus[] logFiles) throws IOException { + if (logFiles == null || logFiles.length == 0) { + return 0L; + } long maxLogId = 0; - if (logFiles != null && logFiles.length > 0) { - for (int i = 0; i < logFiles.length; ++i) { - final Path logPath = logFiles[i].getPath(); - leaseRecovery.recoverFileLease(fs, logPath); - if (!isRunning()) { - throw new IOException("wal aborting"); - } + for (int i = 0; i < logFiles.length; ++i) { + final Path logPath = logFiles[i].getPath(); + leaseRecovery.recoverFileLease(fs, logPath); + if (!isRunning()) { + throw new IOException("wal aborting"); + } - maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); - ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); - if (log != null) { - this.logs.add(log); - } + maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); + ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); + if (log != null) { + this.logs.add(log); } - Collections.sort(this.logs); - initTrackerFromOldLogs(); } + initTrackerFromOldLogs(); return maxLogId; } /** - * If last log's tracker is not null, use it as {@link #storeTracker}. - * Otherwise, set storeTracker as partial, and let {@link ProcedureWALFormatReader} rebuild - * it using entries in the log. + * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker + * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. */ private void initTrackerFromOldLogs() { - if (logs.isEmpty() || !isRunning()) return; + if (logs.isEmpty() || !isRunning()) { + return; + } ProcedureWALFile log = logs.getLast(); if (!log.getTracker().isPartial()) { storeTracker.resetTo(log.getTracker()); @@ -1295,20 +1317,18 @@ public class WALProcedureStore extends ProcedureStoreBase { * Loads given log file and it's tracker. */ private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) - throws IOException { + throws IOException { final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); if (logFile.getLen() == 0) { - LOG.warn("Remove uninitialized log: " + logFile); + LOG.warn("Remove uninitialized log: {}", logFile); log.removeFile(walArchiveDir); return null; } - if (LOG.isDebugEnabled()) { - LOG.debug("Opening Pv2 " + logFile); - } + LOG.debug("Opening Pv2 {}", logFile); try { log.open(); } catch (ProcedureWALFormat.InvalidWALDataException e) { - LOG.warn("Remove uninitialized log: " + logFile, e); + LOG.warn("Remove uninitialized log: {}", logFile, e); log.removeFile(walArchiveDir); return null; } catch (IOException e) { @@ -1322,7 +1342,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } catch (IOException e) { log.getTracker().reset(); log.getTracker().setPartialFlag(true); - LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); + LOG.warn("Unable to read tracker for {}", log, e); } log.close(); @@ -1350,7 +1370,7 @@ public class WALProcedureStore extends ProcedureStoreBase { }); try { store.start(16); - ProcedureExecutor pe = new ProcedureExecutor(conf, new Object()/*Pass anything*/, store); + ProcedureExecutor pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store); pe.init(1, true); } finally { store.stop(true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index ffc6ab8de0..e140727868 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue; import java.util.Random; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.BitSetNode; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.ClassRule; @@ -119,29 +118,29 @@ public class TestProcedureStoreTracker { tracker.insert(procs[0]); tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] }); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllUpdated()); - tracker.resetUpdates(); - assertFalse(tracker.isUpdated()); + tracker.resetModified(); + assertFalse(tracker.isAllUpdated()); for (int i = 0; i < 4; ++i) { tracker.update(procs[i]); assertFalse(tracker.isEmpty()); - assertFalse(tracker.isUpdated()); + assertFalse(tracker.isAllUpdated()); } tracker.update(procs[4]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllUpdated()); tracker.update(procs[5]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllUpdated()); for (int i = 0; i < 5; ++i) { tracker.delete(procs[i]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllUpdated()); } tracker.delete(procs[5]); assertTrue(tracker.isEmpty()); @@ -235,7 +234,7 @@ public class TestProcedureStoreTracker { for (long i : active) { tracker.insert(i); } - tracker.resetUpdates(); + tracker.resetModified(); for (long i : updated) { tracker.update(i); } @@ -252,11 +251,11 @@ public class TestProcedureStoreTracker { BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) { BitSetNode bitSetNode = new BitSetNode(0L, false); for (long i : active) { - bitSetNode.update(i); + bitSetNode.insertOrUpdate(i); } - bitSetNode.resetUpdates(); + bitSetNode.resetModified(); for (long i : updated) { - bitSetNode.update(i); + bitSetNode.insertOrUpdate(i); } for (long i : deleted) { bitSetNode.delete(i); @@ -276,9 +275,9 @@ public class TestProcedureStoreTracker { assertEquals(false, tracker.isEmpty()); for (int i = 0; i < procIds.length; ++i) { - tracker.setDeletedIfSet(procIds[i] - 1); - tracker.setDeletedIfSet(procIds[i]); - tracker.setDeletedIfSet(procIds[i] + 1); + tracker.setDeletedIfModified(procIds[i] - 1); + tracker.setDeletedIfModified(procIds[i]); + tracker.setDeletedIfModified(procIds[i] + 1); } assertEquals(true, tracker.isEmpty()); @@ -289,7 +288,7 @@ public class TestProcedureStoreTracker { } assertEquals(false, tracker.isEmpty()); - tracker.setDeletedIfSet(procIds); + tracker.setDeletedIfModified(procIds); assertEquals(true, tracker.isEmpty()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e2e4aec00b..528f039521 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; * avoiding port contention if another local HBase instance is already running). *

To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" * setting it to true. + * For triggering test. */ @InterfaceAudience.Public @SuppressWarnings("deprecation") -- 2.17.1