From fb58df7eb57a60a04be6632f8e68dfb3f312edcb Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 29 Sep 2018 22:12:16 +0800 Subject: [PATCH] HBASE-21250 Refactor WALProcedureStore and add more comments for better understanding the implementation --- .../hbase/procedure2/store/BitSetNode.java | 391 +++++++++++++++ .../procedure2/store/NoopProcedureStore.java | 9 +- .../procedure2/store/ProcedureStore.java | 9 +- .../store/ProcedureStoreTracker.java | 470 +++--------------- .../CorruptedWALProcedureStoreException.java | 6 +- .../store/wal/ProcedureWALFormat.java | 26 +- .../store/wal/ProcedureWALFormatReader.java | 31 +- .../store/wal/WALProcedureStore.java | 75 ++- .../store/TestProcedureStoreTracker.java | 27 +- 9 files changed, 525 insertions(+), 519 deletions(-) create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java new file mode 100644 index 0000000000..1b5ae8787b --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store; + +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). + * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the range + * of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K is + * BITS_PER_WORD. + */ +@InterfaceAudience.Private +class BitSetNode { + private static final long WORD_MASK = 0xffffffffffffffffL; + private static final int ADDRESS_BITS_PER_WORD = 6; + private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; + private static final int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; + + /** + * Mimics {@link ProcedureStoreTracker#partial}. + */ + private boolean partial; + + /** + *
+   *  ----------------------
+   * | modified | deleted |  meaning
+   * |     0    |   0     |  proc exists, but hasn't been updated since last resetUpdates().
+   * |     1    |   0     |  proc was updated (but not deleted).
+   * |     1    |   1     |  proc was deleted.
+   * |     0    |   1     |  proc doesn't exist (maybe never created, maybe deleted in past).
+   * ----------------------
+   * 
+ * + * The meaning of modified is that, we have modified the state of the procedure, no matter insert, + * update, or delete. And if it is an insert or update, we will set the deleted to 0, if not we + * will set the delete to 1. + *

+ * For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the + * partial one, the initial modified value is 0 and the initial deleted value is also 0. In + * unsetPartialFlag we will reset the deleted to 1 if it is not modified. + */ + + /** + * Set of procedures which have been updated since last {@link #resetModified()}. Useful to track + * procedures which have been updated since last WAL write. + */ + private long[] modified; + /** + * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. This + * represents global state since it's not reset on WAL rolls. + */ + private long[] deleted; + /** + * Offset of bitmap i.e. procedure id corresponding to first bit. + */ + private long start; + + public void dump() { + System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), getActiveMinProcId(), + getActiveMaxProcId()); + System.out.println("Modified:"); + for (int i = 0; i < modified.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((modified[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + System.out.println("Delete:"); + for (int i = 0; i < deleted.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + } + + public BitSetNode(long procId, boolean partial) { + start = alignDown(procId); + + int count = 1; + modified = new long[count]; + deleted = new long[count]; + for (int i = 0; i < count; ++i) { + modified[i] = 0; + deleted[i] = partial ? 0 : WORD_MASK; + } + + this.partial = partial; + updateState(procId, false); + } + + public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { + start = data.getStartId(); + int size = data.getUpdatedCount(); + modified = new long[size]; + deleted = new long[size]; + for (int i = 0; i < size; ++i) { + modified[i] = data.getUpdated(i); + deleted[i] = data.getDeleted(i); + } + partial = false; + } + + public BitSetNode(BitSetNode other, boolean resetDelete) { + this.start = other.start; + this.partial = other.partial; + this.modified = other.modified.clone(); + if (resetDelete) { + this.deleted = new long[other.deleted.length]; + for (int i = 0; i < this.deleted.length; ++i) { + this.deleted[i] = ~(other.modified[i]); + } + } else { + this.deleted = other.deleted.clone(); + } + } + + public void insertOrUpdate(final long procId) { + updateState(procId, false); + } + + public void delete(final long procId) { + updateState(procId, true); + } + + public long getStart() { + return start; + } + + public long getEnd() { + return start + (modified.length << ADDRESS_BITS_PER_WORD) - 1; + } + + public boolean contains(final long procId) { + return start <= procId && procId <= getEnd(); + } + + public DeleteState isDeleted(final long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= deleted.length) { + return DeleteState.MAYBE; + } + return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; + } + + public boolean isModified(long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= modified.length) { + return false; + } + return (modified[wordIndex] & (1L << bitmapIndex)) != 0; + } + + /** + * @return true, if all the procedures has been modified. + */ + public boolean isAllModified() { + // TODO: cache the value + for (int i = 0; i < modified.length; ++i) { + if ((modified[i] | deleted[i]) != WORD_MASK) { + return false; + } + } + return true; + } + + /** + * @return true, if there are no active procedures in this BitSetNode, else false. + */ + public boolean isEmpty() { + // TODO: cache the value + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] != WORD_MASK) { + return false; + } + } + return true; + } + + public void resetModified() { + for (int i = 0; i < modified.length; ++i) { + modified[i] = 0; + } + } + + public void unsetPartialFlag() { + partial = false; + for (int i = 0; i < modified.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((modified[i] & (1L << j)) == 0) { + deleted[i] |= (1L << j); + } + } + } + } + + /** + * Convert to + * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode + * protobuf. + */ + public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { + ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = + ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); + builder.setStartId(start); + for (int i = 0; i < modified.length; ++i) { + builder.addUpdated(modified[i]); + builder.addDeleted(deleted[i]); + } + return builder.build(); + } + + // ======================================================================== + // Grow/Merge Helpers + // ======================================================================== + public boolean canGrow(final long procId) { + return Math.abs(procId - start) < MAX_NODE_SIZE; + } + + public boolean canMerge(final BitSetNode rightNode) { + // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. + assert start < rightNode.start; + return (rightNode.getEnd() - start) < MAX_NODE_SIZE; + } + + public void grow(final long procId) { + int delta, offset; + + if (procId < start) { + // add to head + long newStart = alignDown(procId); + delta = (int) (start - newStart) >> ADDRESS_BITS_PER_WORD; + offset = delta; + start = newStart; + } else { + // Add to tail + long newEnd = alignUp(procId + 1); + delta = (int) (newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; + offset = 0; + } + + long[] newBitmap; + int oldSize = modified.length; + + newBitmap = new long[oldSize + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = 0; + } + System.arraycopy(modified, 0, newBitmap, offset, oldSize); + modified = newBitmap; + + newBitmap = new long[deleted.length + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = partial ? 0 : WORD_MASK; + } + System.arraycopy(deleted, 0, newBitmap, offset, oldSize); + deleted = newBitmap; + } + + public void merge(final BitSetNode rightNode) { + int delta = (int) (rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; + + long[] newBitmap; + int oldSize = modified.length; + int newSize = (delta - rightNode.modified.length); + int offset = oldSize + newSize; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(modified, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.modified, 0, newBitmap, offset, rightNode.modified.length); + modified = newBitmap; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(deleted, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); + deleted = newBitmap; + + for (int i = 0; i < newSize; ++i) { + modified[offset + i] = 0; + deleted[offset + i] = partial ? 0 : WORD_MASK; + } + } + + @Override + public String toString() { + return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; + } + + // ======================================================================== + // Min/Max Helpers + // ======================================================================== + public long getActiveMinProcId() { + long minProcId = start; + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] == 0) { + return (minProcId); + } + + if (deleted[i] != WORD_MASK) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((deleted[i] & (1L << j)) != 0) { + return minProcId + j; + } + } + } + + minProcId += BITS_PER_WORD; + } + return minProcId; + } + + public long getActiveMaxProcId() { + long maxProcId = getEnd(); + for (int i = deleted.length - 1; i >= 0; --i) { + if (deleted[i] == 0) { + return maxProcId; + } + + if (deleted[i] != WORD_MASK) { + for (int j = BITS_PER_WORD - 1; j >= 0; --j) { + if ((deleted[i] & (1L << j)) == 0) { + return maxProcId - (BITS_PER_WORD - 1 - j); + } + } + } + maxProcId -= BITS_PER_WORD; + } + return maxProcId; + } + + // ======================================================================== + // Bitmap Helpers + // ======================================================================== + private int getBitmapIndex(final long procId) { + return (int) (procId - start); + } + + void updateState(long procId, boolean isDeleted) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + long value = (1L << bitmapIndex); + + modified[wordIndex] |= value; + if (isDeleted) { + deleted[wordIndex] |= value; + } else { + deleted[wordIndex] &= ~value; + } + } + + // ======================================================================== + // Helpers + // ======================================================================== + /** + * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ + private static long alignUp(final long x) { + return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; + } + + /** + * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ + private static long alignDown(final long x) { + return x & -BITS_PER_WORD; + } +} \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index 9c6176d4bb..8fbc1473ed 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store; import java.io.IOException; @@ -64,17 +63,17 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void insert(Procedure proc, Procedure[] subprocs) { + public void insert(Procedure proc, Procedure[] subprocs) { // no-op } @Override - public void insert(Procedure[] proc) { + public void insert(Procedure[] proc) { // no-op } @Override - public void update(Procedure proc) { + public void update(Procedure proc) { // no-op } @@ -84,7 +83,7 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void delete(Procedure proc, long[] subprocs) { + public void delete(Procedure proc, long[] subprocs) { // no-op } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 72883405d7..8063b125ba 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -81,6 +81,7 @@ public interface ProcedureStore { * @throws IOException if there was an error fetching/deserializing the procedure * @return the next procedure in the iteration. */ + @SuppressWarnings("rawtypes") Procedure next() throws IOException; } @@ -173,7 +174,7 @@ public interface ProcedureStore { * @param proc the procedure to serialize and write to the store. * @param subprocs the newly created child of the proc. */ - void insert(Procedure proc, Procedure[] subprocs); + void insert(Procedure proc, Procedure[] subprocs); /** * Serialize a set of new procedures. @@ -182,14 +183,14 @@ public interface ProcedureStore { * * @param procs the procedures to serialize and write to the store. */ - void insert(Procedure[] procs); + void insert(Procedure[] procs); /** * The specified procedure was executed, * and the new state should be written to the store. * @param proc the procedure to serialize and write to the store. */ - void update(Procedure proc); + void update(Procedure proc); /** * The specified procId was removed from the executor, @@ -205,7 +206,7 @@ public interface ProcedureStore { * @param parentProc the parent procedure to serialize and write to the store. * @param subProcIds the IDs of the sub-procedure to remove. */ - void delete(Procedure parentProc, long[] subProcIds); + void delete(Procedure parentProc, long[] subProcIds); /** * The specified procIds were removed from the executor, diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 2dad5ac72c..61541ad41e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -53,383 +53,14 @@ public class ProcedureStoreTracker { * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to * understand it's real use. */ - private boolean partial = false; + boolean partial = false; - private long minUpdatedProcId = Long.MAX_VALUE; - private long maxUpdatedProcId = Long.MIN_VALUE; + private long minModifiedProcId = Long.MAX_VALUE; + private long maxModifiedProcId = Long.MIN_VALUE; public enum DeleteState { YES, NO, MAYBE } - /** - * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). - * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the - * range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K - * is BITS_PER_WORD. - */ - public static class BitSetNode { - private final static long WORD_MASK = 0xffffffffffffffffL; - private final static int ADDRESS_BITS_PER_WORD = 6; - private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; - private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; - - /** - * Mimics {@link ProcedureStoreTracker#partial}. - */ - private final boolean partial; - - /* ---------------------- - * | updated | deleted | meaning - * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates(). - * | 1 | 0 | proc was updated (but not deleted). - * | 1 | 1 | proc was deleted. - * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past). - /* ---------------------- - */ - - /** - * Set of procedures which have been updated since last {@link #resetUpdates()}. - * Useful to track procedures which have been updated since last WAL write. - */ - private long[] updated; - /** - * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. - * This represents global state since it's not reset on WAL rolls. - */ - private long[] deleted; - /** - * Offset of bitmap i.e. procedure id corresponding to first bit. - */ - private long start; - - public void dump() { - System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), - getActiveMinProcId(), getActiveMaxProcId()); - System.out.println("Update:"); - for (int i = 0; i < updated.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0"); - } - System.out.println(" " + i); - } - System.out.println(); - System.out.println("Delete:"); - for (int i = 0; i < deleted.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); - } - System.out.println(" " + i); - } - System.out.println(); - } - - public BitSetNode(final long procId, final boolean partial) { - start = alignDown(procId); - - int count = 1; - updated = new long[count]; - deleted = new long[count]; - for (int i = 0; i < count; ++i) { - updated[i] = 0; - deleted[i] = partial ? 0 : WORD_MASK; - } - - this.partial = partial; - updateState(procId, false); - } - - protected BitSetNode(final long start, final long[] updated, final long[] deleted) { - this.start = start; - this.updated = updated; - this.deleted = deleted; - this.partial = false; - } - - public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { - start = data.getStartId(); - int size = data.getUpdatedCount(); - updated = new long[size]; - deleted = new long[size]; - for (int i = 0; i < size; ++i) { - updated[i] = data.getUpdated(i); - deleted[i] = data.getDeleted(i); - } - partial = false; - } - - public BitSetNode(final BitSetNode other, final boolean resetDelete) { - this.start = other.start; - this.partial = other.partial; - this.updated = other.updated.clone(); - if (resetDelete) { - this.deleted = new long[other.deleted.length]; - for (int i = 0; i < this.deleted.length; ++i) { - this.deleted[i] = ~(other.updated[i]); - } - } else { - this.deleted = other.deleted.clone(); - } - } - - public void update(final long procId) { - updateState(procId, false); - } - - public void delete(final long procId) { - updateState(procId, true); - } - - public long getStart() { - return start; - } - - public long getEnd() { - return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1; - } - - public boolean contains(final long procId) { - return start <= procId && procId <= getEnd(); - } - - public DeleteState isDeleted(final long procId) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - if (wordIndex >= deleted.length) { - return DeleteState.MAYBE; - } - return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; - } - - private boolean isUpdated(final long procId) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - if (wordIndex >= updated.length) { - return false; - } - return (updated[wordIndex] & (1L << bitmapIndex)) != 0; - } - - public boolean isUpdated() { - // TODO: cache the value - for (int i = 0; i < updated.length; ++i) { - if ((updated[i] | deleted[i]) != WORD_MASK) { - return false; - } - } - return true; - } - - /** - * @return true, if there are no active procedures in this BitSetNode, else false. - */ - public boolean isEmpty() { - // TODO: cache the value - for (int i = 0; i < deleted.length; ++i) { - if (deleted[i] != WORD_MASK) { - return false; - } - } - return true; - } - - public void resetUpdates() { - for (int i = 0; i < updated.length; ++i) { - updated[i] = 0; - } - } - - /** - * Clears the {@link #deleted} bitmaps. - */ - public void undeleteAll() { - for (int i = 0; i < updated.length; ++i) { - deleted[i] = 0; - } - } - - public void unsetPartialFlag() { - for (int i = 0; i < updated.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - if ((updated[i] & (1L << j)) == 0) { - deleted[i] |= (1L << j); - } - } - } - } - - /** - * Convert to - * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode - * protobuf. - */ - public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { - ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = - ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); - builder.setStartId(start); - for (int i = 0; i < updated.length; ++i) { - builder.addUpdated(updated[i]); - builder.addDeleted(deleted[i]); - } - return builder.build(); - } - - // ======================================================================== - // Grow/Merge Helpers - // ======================================================================== - public boolean canGrow(final long procId) { - return Math.abs(procId - start) < MAX_NODE_SIZE; - } - - public boolean canMerge(final BitSetNode rightNode) { - // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. - assert start < rightNode.start; - return (rightNode.getEnd() - start) < MAX_NODE_SIZE; - } - - public void grow(final long procId) { - int delta, offset; - - if (procId < start) { - // add to head - long newStart = alignDown(procId); - delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD; - offset = delta; - start = newStart; - } else { - // Add to tail - long newEnd = alignUp(procId + 1); - delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; - offset = 0; - } - - long[] newBitmap; - int oldSize = updated.length; - - newBitmap = new long[oldSize + delta]; - for (int i = 0; i < newBitmap.length; ++i) { - newBitmap[i] = 0; - } - System.arraycopy(updated, 0, newBitmap, offset, oldSize); - updated = newBitmap; - - newBitmap = new long[deleted.length + delta]; - for (int i = 0; i < newBitmap.length; ++i) { - newBitmap[i] = partial ? 0 : WORD_MASK; - } - System.arraycopy(deleted, 0, newBitmap, offset, oldSize); - deleted = newBitmap; - } - - public void merge(final BitSetNode rightNode) { - int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; - - long[] newBitmap; - int oldSize = updated.length; - int newSize = (delta - rightNode.updated.length); - int offset = oldSize + newSize; - - newBitmap = new long[oldSize + delta]; - System.arraycopy(updated, 0, newBitmap, 0, oldSize); - System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length); - updated = newBitmap; - - newBitmap = new long[oldSize + delta]; - System.arraycopy(deleted, 0, newBitmap, 0, oldSize); - System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); - deleted = newBitmap; - - for (int i = 0; i < newSize; ++i) { - updated[offset + i] = 0; - deleted[offset + i] = partial ? 0 : WORD_MASK; - } - } - - @Override - public String toString() { - return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; - } - - // ======================================================================== - // Min/Max Helpers - // ======================================================================== - public long getActiveMinProcId() { - long minProcId = start; - for (int i = 0; i < deleted.length; ++i) { - if (deleted[i] == 0) { - return(minProcId); - } - - if (deleted[i] != WORD_MASK) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - if ((deleted[i] & (1L << j)) != 0) { - return minProcId + j; - } - } - } - - minProcId += BITS_PER_WORD; - } - return minProcId; - } - - public long getActiveMaxProcId() { - long maxProcId = getEnd(); - for (int i = deleted.length - 1; i >= 0; --i) { - if (deleted[i] == 0) { - return maxProcId; - } - - if (deleted[i] != WORD_MASK) { - for (int j = BITS_PER_WORD - 1; j >= 0; --j) { - if ((deleted[i] & (1L << j)) == 0) { - return maxProcId - (BITS_PER_WORD - 1 - j); - } - } - } - maxProcId -= BITS_PER_WORD; - } - return maxProcId; - } - - // ======================================================================== - // Bitmap Helpers - // ======================================================================== - private int getBitmapIndex(final long procId) { - return (int)(procId - start); - } - - private void updateState(final long procId, final boolean isDeleted) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - long value = (1L << bitmapIndex); - - updated[wordIndex] |= value; - if (isDeleted) { - deleted[wordIndex] |= value; - } else { - deleted[wordIndex] &= ~value; - } - } - - - // ======================================================================== - // Helpers - // ======================================================================== - /** - * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. - */ - private static long alignUp(final long x) { - return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; - } - - /** - * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. - */ - private static long alignDown(final long x) { - return x & -BITS_PER_WORD; - } - } - - public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { + public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { reset(); for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) { final BitSetNode node = new BitSetNode(protoNode); @@ -440,14 +71,15 @@ public class ProcedureStoreTracker { /** * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. */ - public void resetTo(final ProcedureStoreTracker tracker) { + public void resetTo(ProcedureStoreTracker tracker) { resetTo(tracker, false); } - public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) { + public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { + reset(); this.partial = tracker.partial; - this.minUpdatedProcId = tracker.minUpdatedProcId; - this.maxUpdatedProcId = tracker.maxUpdatedProcId; + this.minModifiedProcId = tracker.minModifiedProcId; + this.maxModifiedProcId = tracker.maxModifiedProcId; this.keepDeletes = tracker.keepDeletes; for (Map.Entry entry : tracker.map.entrySet()) { map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); @@ -458,25 +90,24 @@ public class ProcedureStoreTracker { insert(null, procId); } - public void insert(final long[] procIds) { + public void insert(long[] procIds) { for (int i = 0; i < procIds.length; ++i) { insert(procIds[i]); } } - public void insert(final long procId, final long[] subProcIds) { - BitSetNode node = null; - node = update(node, procId); + public void insert(long procId, long[] subProcIds) { + BitSetNode node = update(null, procId); for (int i = 0; i < subProcIds.length; ++i) { node = insert(node, subProcIds[i]); } } - private BitSetNode insert(BitSetNode node, final long procId) { + private BitSetNode insert(BitSetNode node, long procId) { if (node == null || !node.contains(procId)) { node = getOrCreateNode(procId); } - node.update(procId); + node.insertOrUpdate(procId); trackProcIds(procId); return node; } @@ -485,11 +116,11 @@ public class ProcedureStoreTracker { update(null, procId); } - private BitSetNode update(BitSetNode node, final long procId) { + private BitSetNode update(BitSetNode node, long procId) { node = lookupClosestNode(node, procId); assert node != null : "expected node to update procId=" + procId; assert node.contains(procId) : "expected procId=" + procId + " in the node"; - node.update(procId); + node.insertOrUpdate(procId); trackProcIds(procId); return node; } @@ -520,34 +151,51 @@ public class ProcedureStoreTracker { return node; } - @InterfaceAudience.Private - public void setDeleted(final long procId, final boolean isDeleted) { + /** + * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The + * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart + * this is not true, as we will read the wal files in reverse order so a delete may come first. + */ + public void setDeleted(long procId, boolean isDeleted) { BitSetNode node = getOrCreateNode(procId); assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; node.updateState(procId, isDeleted); trackProcIds(procId); } - public void setDeletedIfSet(final long... procId) { + /** + * Set the given bit for the procId to delete if it was modified before. + *

+ * This method is used to test whether a procedure wal file can be safely deleted, as if all the + * procedures in the given procedure wal file has been modified in the new procedure wal files, + * then we can delete it. + */ + public void setDeletedIfModified(long... procId) { BitSetNode node = null; for (int i = 0; i < procId.length; ++i) { node = lookupClosestNode(node, procId[i]); - if (node != null && node.isUpdated(procId[i])) { + if (node != null && node.isModified(procId[i])) { node.delete(procId[i]); } } } - public void setDeletedIfSet(final ProcedureStoreTracker tracker) { + /** + * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by + * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, + * then we mark it as deleted. + * @see setDeletedIfModified + */ + public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { BitSetNode trackerNode = null; - for (BitSetNode node: map.values()) { + for (BitSetNode node : map.values()) { final long minProcId = node.getStart(); final long maxProcId = node.getEnd(); for (long procId = minProcId; procId <= maxProcId; ++procId) { - if (!node.isUpdated(procId)) continue; + if (!node.isModified(procId)) continue; trackerNode = tracker.lookupClosestNode(trackerNode, procId); - if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) { + if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isModified(procId)) { // the procedure was removed or updated node.delete(procId); } @@ -568,16 +216,16 @@ public class ProcedureStoreTracker { } private void trackProcIds(long procId) { - minUpdatedProcId = Math.min(minUpdatedProcId, procId); - maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); } - public long getUpdatedMinProcId() { - return minUpdatedProcId; + public long getModifiedMinProcId() { + return minModifiedProcId; } - public long getUpdatedMaxProcId() { - return maxUpdatedProcId; + public long getModifiedMaxProcId() { + return maxModifiedProcId; } public void reset() { @@ -589,7 +237,7 @@ public class ProcedureStoreTracker { public boolean isUpdated(long procId) { final Map.Entry entry = map.floorEntry(procId); - return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId); + return entry != null && entry.getValue().contains(procId) && entry.getValue().isModified(procId); } /** @@ -604,7 +252,7 @@ public class ProcedureStoreTracker { if (entry != null && entry.getValue().contains(procId)) { BitSetNode node = entry.getValue(); DeleteState state = node.isDeleted(procId); - return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state; + return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; } return partial ? DeleteState.MAYBE : DeleteState.YES; } @@ -656,11 +304,11 @@ public class ProcedureStoreTracker { } /** - * @return true if any procedure was updated since last call to {@link #resetUpdates()}. + * @return true if all procedure was updated or deleted since last call to {@link #resetUpdates()}. */ - public boolean isUpdated() { + public boolean isAllUpdated() { for (Map.Entry entry : map.entrySet()) { - if (!entry.getValue().isUpdated()) { + if (!entry.getValue().isAllModified()) { return false; } } @@ -673,16 +321,10 @@ public class ProcedureStoreTracker { */ public void resetUpdates() { for (Map.Entry entry : map.entrySet()) { - entry.getValue().resetUpdates(); - } - minUpdatedProcId = Long.MAX_VALUE; - maxUpdatedProcId = Long.MIN_VALUE; - } - - public void undeleteAll() { - for (Map.Entry entry : map.entrySet()) { - entry.getValue().undeleteAll(); + entry.getValue().resetModified(); } + minModifiedProcId = Long.MAX_VALUE; + maxModifiedProcId = Long.MIN_VALUE; } private BitSetNode getOrCreateNode(final long procId) { @@ -760,7 +402,7 @@ public class ProcedureStoreTracker { public void dump() { System.out.println("map " + map.size()); - System.out.println("isUpdated " + isUpdated()); + System.out.println("isUpdated " + isAllUpdated()); System.out.println("isEmpty " + isEmpty()); for (Map.Entry entry : map.entrySet()) { entry.getValue().dump(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java index dd34896ebb..ba4480fca7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java @@ -15,19 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; /** * Thrown when a procedure WAL is corrupted */ @InterfaceAudience.Private -@InterfaceStability.Stable public class CorruptedWALProcedureStoreException extends HBaseIOException { + + private static final long serialVersionUID = -3407300445435898074L; + /** default constructor */ public CorruptedWALProcedureStoreException() { super(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index ac3a52941e..5a527c4fbd 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -18,25 +18,22 @@ package org.apache.hadoop.hbase.procedure2.store.wal; -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Iterator; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; @@ -45,9 +42,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Helper class that contains the WAL serialization utils. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public final class ProcedureWALFormat { - private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormat.class); static final byte LOG_TYPE_STREAM = 0; static final byte LOG_TYPE_COMPACTED = 1; @@ -60,6 +55,9 @@ public final class ProcedureWALFormat { @InterfaceAudience.Private public static class InvalidWALDataException extends IOException { + + private static final long serialVersionUID = 5471733223070202196L; + public InvalidWALDataException(String s) { super(s); } @@ -205,7 +203,7 @@ public final class ProcedureWALFormat { } public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, - Procedure proc, Procedure[] subprocs) throws IOException { + Procedure proc, Procedure[] subprocs) throws IOException { final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); builder.setType(type); builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); @@ -217,17 +215,17 @@ public final class ProcedureWALFormat { builder.build().writeDelimitedTo(slot); } - public static void writeInsert(ByteSlot slot, Procedure proc) + public static void writeInsert(ByteSlot slot, Procedure proc) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); } - public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs) + public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); } - public static void writeUpdate(ByteSlot slot, Procedure proc) + public static void writeUpdate(ByteSlot slot, Procedure proc) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); } @@ -240,7 +238,7 @@ public final class ProcedureWALFormat { builder.build().writeDelimitedTo(slot); } - public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs) + public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs) throws IOException { final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 4ab70f18e1..c1c89dcfc2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -111,7 +111,6 @@ public class ProcedureWALFormatReader { * to rebuild the tracker. */ private final ProcedureStoreTracker tracker; - // TODO: private final boolean hasFastStartSupport; /** * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we @@ -123,21 +122,18 @@ public class ProcedureWALFormatReader { */ private ProcedureStoreTracker localTracker; - // private long compactionLogId; private long maxProcId = 0; public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, ProcedureWALFormat.Loader loader) { this.tracker = tracker; this.loader = loader; - // we support fast-start only if we have a clean shutdown. - // this.hasFastStartSupport = !tracker.isEmpty(); } public void read(final ProcedureWALFile log) throws IOException { localTracker = log.getTracker().isPartial() ? log.getTracker() : null; if (localTracker != null) { - LOG.info("Rebuilding tracker for " + log); + LOG.info("Rebuilding tracker for {}", log); } long count = 0; @@ -147,7 +143,7 @@ public class ProcedureWALFormatReader { while (hasMore) { ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); if (entry == null) { - LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log); + LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log); break; } count++; @@ -184,15 +180,6 @@ public class ProcedureWALFormatReader { if (!localProcedureMap.isEmpty()) { log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); procedureMap.mergeTail(localProcedureMap); - - //if (hasFastStartSupport) { - // TODO: Some procedure may be already runnables (see readInitEntry()) - // (we can also check the "update map" in the log trackers) - // -------------------------------------------------- - //EntryIterator iter = procedureMap.fetchReady(); - //if (iter != null) loader.load(iter); - // -------------------------------------------------- - //} } } @@ -267,9 +254,7 @@ public class ProcedureWALFormatReader { } private void deleteEntry(final long procId) { - if (LOG.isTraceEnabled()) { - LOG.trace("delete entry " + procId); - } + LOG.trace("delete entry {}", procId); maxProcId = Math.max(maxProcId, procId); localProcedureMap.remove(procId); assert !procedureMap.contains(procId); @@ -282,11 +267,11 @@ public class ProcedureWALFormatReader { } } - private boolean isDeleted(final long procId) { + private boolean isDeleted(long procId) { return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; } - private boolean isRequired(final long procId) { + private boolean isRequired(long procId) { return !isDeleted(procId) && !procedureMap.contains(procId); } @@ -318,7 +303,7 @@ public class ProcedureWALFormatReader { protected Entry replayNext; protected Entry replayPrev; // procedure-infos - protected Procedure procedure; + protected Procedure procedure; protected ProcedureProtos.Procedure proto; protected boolean ready = false; @@ -357,7 +342,7 @@ public class ProcedureWALFormatReader { return false; } - public Procedure convert() throws IOException { + public Procedure convert() throws IOException { if (procedure == null) { procedure = ProcedureUtil.convertToProcedure(proto); } @@ -408,7 +393,7 @@ public class ProcedureWALFormatReader { } @Override - public Procedure next() throws IOException { + public Procedure next() throws IOException { try { return current.convert(); } finally { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 7d5d6d2448..7af00335b7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -166,7 +166,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private int syncWaitMsec; // Variables used for UI display - private CircularFifoQueue syncMetricsQueue; + private CircularFifoQueue syncMetricsQueue; public static class SyncMetrics { private long timestamp; @@ -228,11 +228,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // Create archive dir up front. Rename won't work w/o it up on HDFS. if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { if (this.fs.mkdirs(this.walArchiveDir)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir); - } + LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); } else { - LOG.warn("Failed create of " + this.walArchiveDir); + LOG.warn("Failed create of {}", this.walArchiveDir); } } } @@ -248,7 +246,7 @@ public class WALProcedureStore extends ProcedureStoreBase { runningProcCount = numSlots; syncMaxSlot = numSlots; slots = new ByteSlot[numSlots]; - slotsCache = new LinkedTransferQueue(); + slotsCache = new LinkedTransferQueue<>(); while (slotsCache.size() < numSlots) { slotsCache.offer(new ByteSlot()); } @@ -267,7 +265,7 @@ public class WALProcedureStore extends ProcedureStoreBase { useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); // WebUI - syncMetricsQueue = new CircularFifoQueue( + syncMetricsQueue = new CircularFifoQueue<>( conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); // Init sync thread @@ -485,7 +483,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void insert(final Procedure proc, final Procedure[] subprocs) { + public void insert(Procedure proc, Procedure[] subprocs) { if (LOG.isTraceEnabled()) { LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); } @@ -519,7 +517,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void insert(final Procedure[] procs) { + public void insert(Procedure[] procs) { if (LOG.isTraceEnabled()) { LOG.trace("Insert " + Arrays.toString(procs)); } @@ -548,7 +546,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void update(final Procedure proc) { + public void update(Procedure proc) { if (LOG.isTraceEnabled()) { LOG.trace("Update " + proc); } @@ -571,11 +569,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void delete(final long procId) { - if (LOG.isTraceEnabled()) { - LOG.trace("Delete " + procId); - } - + public void delete(long procId) { + LOG.trace("Delete {}", procId); ByteSlot slot = acquireSlot(); try { // Serialize the delete @@ -594,7 +589,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void delete(final Procedure proc, final long[] subProcIds) { + public void delete(Procedure proc, long[] subProcIds) { assert proc != null : "expected a non-null procedure"; assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; if (LOG.isTraceEnabled()) { @@ -630,7 +625,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private void delete(final long[] procIds) { + private void delete(long[] procIds) { if (LOG.isTraceEnabled()) { LOG.trace("Delete " + Arrays.toString(procIds)); } @@ -736,20 +731,20 @@ public class WALProcedureStore extends ProcedureStoreBase { storeTracker.insert(subProcIds); } else { storeTracker.insert(procId, subProcIds); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); } break; case UPDATE: storeTracker.update(procId); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); break; case DELETE: if (subProcIds != null && subProcIds.length > 0) { storeTracker.delete(subProcIds); - holdingCleanupTracker.setDeletedIfSet(subProcIds); + holdingCleanupTracker.setDeletedIfModified(subProcIds); } else { storeTracker.delete(procId); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); } break; default: @@ -973,16 +968,12 @@ public class WALProcedureStore extends ProcedureStoreBase { private void periodicRoll() throws IOException { if (storeTracker.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("no active procedures"); - } + LOG.trace("no active procedures"); tryRollWriter(); removeAllLogs(flushLogId - 1); } else { - if (storeTracker.isUpdated()) { - if (LOG.isTraceEnabled()) { - LOG.trace("all the active procedures are in the latest log"); - } + if (storeTracker.isAllUpdated()) { + LOG.trace("all the active procedures are in the latest log"); removeAllLogs(flushLogId - 1); } @@ -997,18 +988,20 @@ public class WALProcedureStore extends ProcedureStoreBase { } private boolean rollWriter() throws IOException { - if (!isRunning()) return false; + if (!isRunning()) { + return false; + } // Create new state-log if (!rollWriter(flushLogId + 1)) { - LOG.warn("someone else has already created log " + flushLogId); + LOG.warn("someone else has already created log {}", flushLogId); return false; } // We have the lease on the log, // but we should check if someone else has created new files if (getMaxLogId(getLogFiles()) > flushLogId) { - LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId); + LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); logs.getLast().removeFile(this.walArchiveDir); return false; } @@ -1092,7 +1085,7 @@ public class WALProcedureStore extends ProcedureStoreBase { try { ProcedureWALFile log = logs.getLast(); - log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); + log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); log.updateLocalTracker(storeTracker); long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); log.addToSize(trailerSize); @@ -1134,9 +1127,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // - the other WALs are scanned to remove procs already in other wals. // TODO: exit early if holdingCleanupTracker.isEmpty() holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); - holdingCleanupTracker.setDeletedIfSet(storeTracker); + holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker); for (int i = 1, size = logs.size() - 1; i < size; ++i) { - holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker()); + holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker()); } } @@ -1295,20 +1288,18 @@ public class WALProcedureStore extends ProcedureStoreBase { * Loads given log file and it's tracker. */ private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) - throws IOException { + throws IOException { final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); if (logFile.getLen() == 0) { - LOG.warn("Remove uninitialized log: " + logFile); + LOG.warn("Remove uninitialized log: {}", logFile); log.removeFile(walArchiveDir); return null; } - if (LOG.isDebugEnabled()) { - LOG.debug("Opening Pv2 " + logFile); - } + LOG.debug("Opening Pv2 {}", logFile); try { log.open(); } catch (ProcedureWALFormat.InvalidWALDataException e) { - LOG.warn("Remove uninitialized log: " + logFile, e); + LOG.warn("Remove uninitialized log: {}", logFile, e); log.removeFile(walArchiveDir); return null; } catch (IOException e) { @@ -1322,7 +1313,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } catch (IOException e) { log.getTracker().reset(); log.getTracker().setPartialFlag(true); - LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); + LOG.warn("Unable to read tracker for {}", log, e); } log.close(); @@ -1350,7 +1341,7 @@ public class WALProcedureStore extends ProcedureStoreBase { }); try { store.start(16); - ProcedureExecutor pe = new ProcedureExecutor(conf, new Object()/*Pass anything*/, store); + ProcedureExecutor pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store); pe.init(1, true); } finally { store.stop(true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index ffc6ab8de0..15a8c1ddc9 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue; import java.util.Random; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.BitSetNode; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.ClassRule; @@ -119,29 +118,29 @@ public class TestProcedureStoreTracker { tracker.insert(procs[0]); tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] }); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllUpdated()); tracker.resetUpdates(); - assertFalse(tracker.isUpdated()); + assertFalse(tracker.isAllUpdated()); for (int i = 0; i < 4; ++i) { tracker.update(procs[i]); assertFalse(tracker.isEmpty()); - assertFalse(tracker.isUpdated()); + assertFalse(tracker.isAllUpdated()); } tracker.update(procs[4]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllUpdated()); tracker.update(procs[5]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllUpdated()); for (int i = 0; i < 5; ++i) { tracker.delete(procs[i]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllUpdated()); } tracker.delete(procs[5]); assertTrue(tracker.isEmpty()); @@ -252,11 +251,11 @@ public class TestProcedureStoreTracker { BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) { BitSetNode bitSetNode = new BitSetNode(0L, false); for (long i : active) { - bitSetNode.update(i); + bitSetNode.insertOrUpdate(i); } - bitSetNode.resetUpdates(); + bitSetNode.resetModified(); for (long i : updated) { - bitSetNode.update(i); + bitSetNode.insertOrUpdate(i); } for (long i : deleted) { bitSetNode.delete(i); @@ -276,9 +275,9 @@ public class TestProcedureStoreTracker { assertEquals(false, tracker.isEmpty()); for (int i = 0; i < procIds.length; ++i) { - tracker.setDeletedIfSet(procIds[i] - 1); - tracker.setDeletedIfSet(procIds[i]); - tracker.setDeletedIfSet(procIds[i] + 1); + tracker.setDeletedIfModified(procIds[i] - 1); + tracker.setDeletedIfModified(procIds[i]); + tracker.setDeletedIfModified(procIds[i] + 1); } assertEquals(true, tracker.isEmpty()); @@ -289,7 +288,7 @@ public class TestProcedureStoreTracker { } assertEquals(false, tracker.isEmpty()); - tracker.setDeletedIfSet(procIds); + tracker.setDeletedIfModified(procIds); assertEquals(true, tracker.isEmpty()); } } -- 2.17.1