From 335549482a82f09f7792c65bae1c147a139e988b Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Fri, 19 Oct 2018 11:10:45 +0800 Subject: [PATCH] HBASE-21336 Simplify the implementation of WALProcedureMap --- .../store/wal/ProcedureWALFormatReader.java | 24 +- .../procedure2/store/wal/WALProcedureMap.java | 573 ++---------------- .../store/wal/WALProcedureTree.java | 275 +++++++++ .../wal/TestStressWALProcedureStore.java | 2 - .../store/wal/TestWALProcedureStore.java | 164 +---- .../hadoop/hbase/HBaseTestingUtility.java | 1 + 6 files changed, 345 insertions(+), 694 deletions(-) create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 1ac8e01f3a..e8789ae7cd 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -93,8 +92,8 @@ public class ProcedureWALFormatReader { // In the case above we need to read one more WAL to be able to consider // the root procedure A and all children as ready. // ============================================================================================== - private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024); - private final WALProcedureMap procedureMap = new WALProcedureMap(1024); + private final WALProcedureMap localProcedureMap = new WALProcedureMap(); + private final WALProcedureMap procedureMap = new WALProcedureMap(); private final ProcedureWALFormat.Loader loader; @@ -178,7 +177,7 @@ public class ProcedureWALFormatReader { localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(), localProcedureMap.getMaxModifiedProcId()); } - procedureMap.mergeTail(localProcedureMap); + procedureMap.merge(localProcedureMap); } if (localTracker.isPartial()) { localTracker.setPartialFlag(false); @@ -189,18 +188,11 @@ public class ProcedureWALFormatReader { // notify the loader about the max proc ID loader.setMaxProcId(maxProcId); - // fetch the procedure ready to run. - ProcedureIterator procIter = procedureMap.fetchReady(); - if (procIter != null) { - loader.load(procIter); - } - - // remaining procedures have missing link or dependencies - // consider them as corrupted, manual fix is probably required. - procIter = procedureMap.fetchAll(); - if (procIter != null) { - loader.handleCorrupted(procIter); - } + // build the procedure execution tree. When building we will verify that whether a procedure is + // valid. + WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures()); + loader.load(tree.getValidProcs()); + loader.handleCorrupted(tree.getCorruptedProcs()); } private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java index 18d7823cba..26f097d293 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java @@ -17,193 +17,30 @@ */ package org.apache.hadoop.hbase.procedure2.store.wal; -import java.io.IOException; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; -/** - * We keep an in-memory map of the procedures sorted by replay order. (see the details in the - * beginning of {@link ProcedureWALFormatReader}). - * - *
- *      procedureMap = | A |   | E |   | C |   |   |   |   | G |   |   |
- *                       D               B
- *      replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
- *
- *  We also have a lazy grouping by "root procedure", and a list of
- *  unlinked procedures. If after reading all the WALs we have unlinked
- *  procedures it means that we had a missing WAL or a corruption.
- *      rootHead = A <-> D <-> G
- *                 B     E
- *                 C
- *      unlinkFromLinkList = None
- * 
- */ +@InterfaceAudience.Private class WALProcedureMap { private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class); - private static class Entry { - // For bucketed linked lists in hash-table. - private Entry hashNext; - // child head - private Entry childHead; - // double-link for rootHead or childHead - private Entry linkNext; - private Entry linkPrev; - // replay double-linked-list - private Entry replayNext; - private Entry replayPrev; - // procedure-infos - private Procedure procedure; - private ProcedureProtos.Procedure proto; - private boolean ready = false; - - public Entry(Entry hashNext) { - this.hashNext = hashNext; - } - - public long getProcId() { - return proto.getProcId(); - } - - public long getParentId() { - return proto.getParentId(); - } - - public boolean hasParent() { - return proto.hasParentId(); - } - - public boolean isReady() { - return ready; - } - - public boolean isFinished() { - if (!hasParent()) { - // we only consider 'root' procedures. because for the user 'finished' - // means when everything up to the 'root' is finished. - switch (proto.getState()) { - case ROLLEDBACK: - case SUCCESS: - return true; - default: - break; - } - } - return false; - } - - public Procedure convert() throws IOException { - if (procedure == null) { - procedure = ProcedureUtil.convertToProcedure(proto); - } - return procedure; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("Entry("); - sb.append(getProcId()); - sb.append(", parentId="); - sb.append(getParentId()); - sb.append(", class="); - sb.append(proto.getClassName()); - sb.append(")"); - return sb.toString(); - } - } - - private static class EntryIterator implements ProcedureIterator { - private final Entry replayHead; - private Entry current; - - public EntryIterator(Entry replayHead) { - this.replayHead = replayHead; - this.current = replayHead; - } - - @Override - public void reset() { - this.current = replayHead; - } - - @Override - public boolean hasNext() { - return current != null; - } - - @Override - public boolean isNextFinished() { - return current != null && current.isFinished(); - } - - @Override - public void skipNext() { - current = current.replayNext; - } - - @Override - public Procedure next() throws IOException { - try { - return current.convert(); - } finally { - current = current.replayNext; - } - } - } + private final Map procMap = new HashMap<>(); - // procedure hash table - private Entry[] procedureMap; - - // replay-order double-linked-list - private Entry replayOrderHead; - private Entry replayOrderTail; - - // root linked-list - private Entry rootHead; - - // pending unlinked children (root not present yet) - private Entry childUnlinkedHead; - - // Track ProcId range private long minModifiedProcId = Long.MAX_VALUE; - private long maxModifiedProcId = Long.MIN_VALUE; - public WALProcedureMap(int size) { - procedureMap = new Entry[size]; - replayOrderHead = null; - replayOrderTail = null; - rootHead = null; - childUnlinkedHead = null; - } + private long maxModifiedProcId = Long.MIN_VALUE; - public void add(ProcedureProtos.Procedure procProto) { - trackProcIds(procProto.getProcId()); - Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); - boolean newEntry = entry.proto == null; - // We have seen procedure WALs where the entries are out of order; see HBASE-18152. - // To compensate, only replace the Entry procedure if for sure this new procedure - // is indeed an entry that came later. - // TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs - // contain procedure changes goingfrom start to finish in sequence. - if (newEntry || isIncreasing(entry.proto, procProto)) { - entry.proto = procProto; - } - addToReplayList(entry); - if (newEntry) { - if (procProto.hasParentId()) { - childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); - } else { - rootHead = addToLinkList(entry, rootHead); - } - } + private void trackProcId(long procId) { + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); } /** @@ -225,383 +62,47 @@ class WALProcedureMap { return increasing; } - public boolean remove(long procId) { - trackProcIds(procId); - Entry entry = removeFromMap(procId); - if (entry != null) { - unlinkFromReplayList(entry); - unlinkFromLinkList(entry); - return true; - } - return false; - } - - private void trackProcIds(long procId) { - minModifiedProcId = Math.min(minModifiedProcId, procId); - maxModifiedProcId = Math.max(maxModifiedProcId, procId); - } - - public long getMinModifiedProcId() { - return minModifiedProcId; - } - - public long getMaxModifiedProcId() { - return maxModifiedProcId; + public void add(ProcedureProtos.Procedure proc) { + procMap.compute(proc.getProcId(), (procId, existingProc) -> { + if (existingProc == null || isIncreasing(existingProc, proc)) { + return proc; + } else { + return existingProc; + } + }); + trackProcId(proc.getProcId()); } - public boolean contains(long procId) { - return getProcedure(procId) != null; + public void remove(long procId) { + procMap.remove(procId); } public boolean isEmpty() { - return replayOrderHead == null; + return procMap.isEmpty(); } - public void clear() { - for (int i = 0; i < procedureMap.length; ++i) { - procedureMap[i] = null; - } - replayOrderHead = null; - replayOrderTail = null; - rootHead = null; - childUnlinkedHead = null; - minModifiedProcId = Long.MAX_VALUE; - maxModifiedProcId = Long.MIN_VALUE; + public boolean contains(long procId) { + return procMap.containsKey(procId); } - /* - * Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. - - * The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures - * that already exist in the "global" map (the one we are merging the "local" in to). - The - * replayOrderList of the "local" nao will be appended to the "global" map replay list. - The - * "local" map will be cleared at the end of the operation. - */ - public void mergeTail(WALProcedureMap other) { - for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { - int slotIndex = getMapSlot(p.getProcId()); - p.hashNext = procedureMap[slotIndex]; - procedureMap[slotIndex] = p; - } - - if (replayOrderHead == null) { - replayOrderHead = other.replayOrderHead; - replayOrderTail = other.replayOrderTail; - rootHead = other.rootHead; - childUnlinkedHead = other.childUnlinkedHead; - } else { - // append replay list - assert replayOrderTail.replayNext == null; - assert other.replayOrderHead.replayPrev == null; - replayOrderTail.replayNext = other.replayOrderHead; - other.replayOrderHead.replayPrev = replayOrderTail; - replayOrderTail = other.replayOrderTail; - - // merge rootHead - if (rootHead == null) { - rootHead = other.rootHead; - } else if (other.rootHead != null) { - Entry otherTail = findLinkListTail(other.rootHead); - otherTail.linkNext = rootHead; - rootHead.linkPrev = otherTail; - rootHead = other.rootHead; - } - - // merge childUnlinkedHead - if (childUnlinkedHead == null) { - childUnlinkedHead = other.childUnlinkedHead; - } else if (other.childUnlinkedHead != null) { - Entry otherTail = findLinkListTail(other.childUnlinkedHead); - otherTail.linkNext = childUnlinkedHead; - childUnlinkedHead.linkPrev = otherTail; - childUnlinkedHead = other.childUnlinkedHead; - } - } + public void merge(WALProcedureMap other) { + other.procMap.forEach(procMap::putIfAbsent); maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId); minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId); - - other.clear(); - } - - /** - * Returns an EntryIterator with the list of procedures ready to be added to the executor. A - * Procedure is ready if its children and parent are ready. - */ - public ProcedureIterator fetchReady() { - buildGraph(); - - Entry readyHead = null; - Entry readyTail = null; - Entry p = replayOrderHead; - while (p != null) { - Entry next = p.replayNext; - if (p.isReady()) { - unlinkFromReplayList(p); - if (readyTail != null) { - readyTail.replayNext = p; - p.replayPrev = readyTail; - } else { - p.replayPrev = null; - readyHead = p; - } - readyTail = p; - p.replayNext = null; - } - p = next; - } - // we need the hash-table lookups for parents, so this must be done - // out of the loop where we check isReadyToRun() - for (p = readyHead; p != null; p = p.replayNext) { - removeFromMap(p.getProcId()); - unlinkFromLinkList(p); - } - return readyHead != null ? new EntryIterator(readyHead) : null; - } - - /** - * Drain this map and return all procedures in it. - */ - public ProcedureIterator fetchAll() { - Entry head = replayOrderHead; - for (Entry p = head; p != null; p = p.replayNext) { - removeFromMap(p.getProcId()); - } - for (int i = 0; i < procedureMap.length; ++i) { - assert procedureMap[i] == null : "map not empty i=" + i; - } - replayOrderHead = null; - replayOrderTail = null; - childUnlinkedHead = null; - rootHead = null; - return head != null ? new EntryIterator(head) : null; - } - - private void buildGraph() { - Entry p = childUnlinkedHead; - while (p != null) { - Entry next = p.linkNext; - Entry rootProc = getRootProcedure(p); - if (rootProc != null) { - rootProc.childHead = addToLinkList(p, rootProc.childHead); - } - p = next; - } - - for (p = rootHead; p != null; p = p.linkNext) { - checkReadyToRun(p); - } + other.procMap.clear(); + other.maxModifiedProcId = Long.MIN_VALUE; + other.minModifiedProcId = Long.MAX_VALUE; } - private Entry getRootProcedure(Entry entry) { - while (entry != null && entry.hasParent()) { - entry = getProcedure(entry.getParentId()); - } - return entry; + public Collection getProcedures() { + return Collections.unmodifiableCollection(procMap.values()); } - /** - * (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A - * Procedure is ready when parent and children are ready. "ready" means that we all the - * information that we need in-memory. - *

- * Example-1:
- * We have two WALs, we start reading from the newest (wal-2) - * - *

-   *    wal-2 | C B |
-   *    wal-1 | A B C |
-   * 
- * - * If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If - * B is the only one with parent A we can start C. We have to read one more WAL before being able - * to start B. - *

- * How do we know with the only information in B that we are not ready. - *

    - *
  • easy case, the parent is missing from the global map
  • - *
  • more complex case we look at the Stack IDs.
  • - *
- * The Stack-IDs are added to the procedure order as an incremental index tracking how many times - * that procedure was executed, which is equivalent to the number of times we wrote the procedure - * to the WAL.
- * In the example above: - * - *
-   *   wal-2: B has stackId = [1, 2]
-   *   wal-1: B has stackId = [1]
-   *   wal-1: A has stackId = [0]
-   * 
- * - * Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap - * in the stackIds of B, so something was executed before. - *

- * To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the - * parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is - * available. - *

- * Example-2 - * - *

-   *    wal-2 | A |              A stackIds = [0, 2]
-   *    wal-1 | A B |            B stackIds = [1]
-   * 
- * - * There is a gap between A stackIds so something was executed in between. - */ - private boolean checkReadyToRun(Entry rootEntry) { - assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; - - if (rootEntry.isFinished()) { - // If the root procedure is finished, sub-procedures should be gone - if (rootEntry.childHead != null) { - LOG.error("unexpected active children for root-procedure: {}", rootEntry); - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - LOG.error("unexpected active children: {}", p); - } - } - - assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry; - rootEntry.ready = true; - return true; - } - - int stackIdSum = 0; - int maxStackId = 0; - for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) { - int stackId = 1 + rootEntry.proto.getStackId(i); - maxStackId = Math.max(maxStackId, stackId); - stackIdSum += stackId; - LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, - rootEntry); - } - - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - for (int i = 0; i < p.proto.getStackIdCount(); ++i) { - int stackId = 1 + p.proto.getStackId(i); - maxStackId = Math.max(maxStackId, stackId); - stackIdSum += stackId; - LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p); - } - } - // The cmpStackIdSum is this formula for finding the sum of a series of numbers: - // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg - final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2); - if (cmpStackIdSum == stackIdSum) { - rootEntry.ready = true; - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - p.ready = true; - } - return true; - } - return false; - } - - private void unlinkFromReplayList(Entry entry) { - if (replayOrderHead == entry) { - replayOrderHead = entry.replayNext; - } - if (replayOrderTail == entry) { - replayOrderTail = entry.replayPrev; - } - if (entry.replayPrev != null) { - entry.replayPrev.replayNext = entry.replayNext; - } - if (entry.replayNext != null) { - entry.replayNext.replayPrev = entry.replayPrev; - } - } - - private void addToReplayList(final Entry entry) { - unlinkFromReplayList(entry); - entry.replayNext = replayOrderHead; - entry.replayPrev = null; - if (replayOrderHead != null) { - replayOrderHead.replayPrev = entry; - } else { - replayOrderTail = entry; - } - replayOrderHead = entry; - } - - private void unlinkFromLinkList(Entry entry) { - if (entry == rootHead) { - rootHead = entry.linkNext; - } else if (entry == childUnlinkedHead) { - childUnlinkedHead = entry.linkNext; - } - if (entry.linkPrev != null) { - entry.linkPrev.linkNext = entry.linkNext; - } - if (entry.linkNext != null) { - entry.linkNext.linkPrev = entry.linkPrev; - } - } - - private Entry addToLinkList(Entry entry, Entry linkHead) { - unlinkFromLinkList(entry); - entry.linkNext = linkHead; - entry.linkPrev = null; - if (linkHead != null) { - linkHead.linkPrev = entry; - } - return entry; - } - - private Entry findLinkListTail(Entry linkHead) { - Entry tail = linkHead; - while (tail.linkNext != null) { - tail = tail.linkNext; - } - return tail; - } - - private Entry addToMap(long procId, boolean hasParent) { - int slotIndex = getMapSlot(procId); - Entry entry = getProcedure(slotIndex, procId); - if (entry != null) { - return entry; - } - - entry = new Entry(procedureMap[slotIndex]); - procedureMap[slotIndex] = entry; - return entry; - } - - private Entry removeFromMap(final long procId) { - int slotIndex = getMapSlot(procId); - Entry prev = null; - Entry entry = procedureMap[slotIndex]; - while (entry != null) { - if (procId == entry.getProcId()) { - if (prev != null) { - prev.hashNext = entry.hashNext; - } else { - procedureMap[slotIndex] = entry.hashNext; - } - entry.hashNext = null; - return entry; - } - prev = entry; - entry = entry.hashNext; - } - return null; - } - - private Entry getProcedure(long procId) { - return getProcedure(getMapSlot(procId), procId); - } - - private Entry getProcedure(int slotIndex, long procId) { - Entry entry = procedureMap[slotIndex]; - while (entry != null) { - if (procId == entry.getProcId()) { - return entry; - } - entry = entry.hashNext; - } - return null; + public long getMinModifiedProcId() { + return minModifiedProcId; } - private int getMapSlot(long procId) { - return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length); + public long getMaxModifiedProcId() { + return maxModifiedProcId; } -} \ No newline at end of file +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java new file mode 100644 index 0000000000..b451cabc22 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +@InterfaceAudience.Private +public final class WALProcedureTree { + + private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class); + + private static final class Entry { + + private final ProcedureProtos.Procedure proc; + + private final List subProcs = new ArrayList<>(); + + public Entry(ProcedureProtos.Procedure proc) { + this.proc = proc; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Entry("); + sb.append(proc.getProcId()); + sb.append(", parentId="); + sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID); + sb.append(", class="); + sb.append(proc.getClassName()); + sb.append(")"); + return sb.toString(); + } + } + + // when loading we will iterator the procedures twice, so use this class to cache the deserialized + // result to prevent deserializing multiple times. + private static final class ProtoAndProc { + private final ProcedureProtos.Procedure proto; + + private Procedure proc; + + public ProtoAndProc(ProcedureProtos.Procedure proto) { + this.proto = proto; + } + + public Procedure getProc() throws IOException { + if (proc == null) { + proc = ProcedureUtil.convertToProcedure(proto); + } + return proc; + } + } + + private final List validProcs = new ArrayList<>(); + + private final List corruptedProcs = new ArrayList<>(); + + private static boolean isFinished(ProcedureProtos.Procedure proc) { + if (!proc.hasParentId()) { + switch (proc.getState()) { + case ROLLEDBACK: + case SUCCESS: + return true; + default: + break; + } + } + return false; + } + + private WALProcedureTree(Map procMap) { + List rootEntries = buildTree(procMap); + for (Entry rootEntry : rootEntries) { + checkReady(rootEntry, procMap); + } + checkOrphan(procMap); + Comparator cmp = + (p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId()); + Collections.sort(validProcs, cmp); + Collections.sort(corruptedProcs, cmp); + } + + private List buildTree(Map procMap) { + List rootEntries = new ArrayList<>(); + procMap.values().forEach(entry -> { + if (!entry.proc.hasParentId()) { + rootEntries.add(entry); + } else { + Entry parentEntry = procMap.get(entry.proc.getParentId()); + // For a valid procedure this should not be null. We will log the error later if it is null, + // as it will not be referenced by any root procedures. + if (parentEntry != null) { + parentEntry.subProcs.add(entry); + } + } + }); + return rootEntries; + } + + private void collectStackId(Entry entry, List stackIds) { + for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) { + int stackId = entry.proc.getStackId(i); + stackIds.add(stackId); + } + entry.subProcs.forEach(e -> collectStackId(e, stackIds)); + } + + private void addAllToCorruptedAndRemoveFromProcMap(Entry entry, + Map remainingProcMap) { + corruptedProcs.add(new ProtoAndProc(entry.proc)); + remainingProcMap.remove(entry.proc.getProcId()); + for (Entry e : entry.subProcs) { + addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap); + } + } + + private void addAllToValidAndRemoveFromProcMap(Entry entry, Map remainingProcMap) { + validProcs.add(new ProtoAndProc(entry.proc)); + remainingProcMap.remove(entry.proc.getProcId()); + for (Entry e : entry.subProcs) { + addAllToValidAndRemoveFromProcMap(e, remainingProcMap); + } + } + + // In this method first we will check whether the given root procedure and all its sub procedures + // are valid, through the procedure stack. And we will also remove all these procedures from the + // remainingProcMap, so at last, if there are still procedures in the map, we know that there are + // orphan procedures. + private void checkReady(Entry rootEntry, Map remainingProcMap) { + if (isFinished(rootEntry.proc)) { + if (!rootEntry.subProcs.isEmpty()) { + LOG.error("unexpected active children for root-procedure: {}", rootEntry); + rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e)); + addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap); + } else { + addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap); + } + return; + } + // collect all the stack ids + List stackIds = new ArrayList<>(); + collectStackId(rootEntry, stackIds); + Collections.sort(stackIds); + // the stack ids should start from 0 and increase by one every time + for (int i = 0, n = stackIds.size(); i < n; i++) { + int stackId = stackIds.get(i); + if (stackId != i) { + LOG.error("Unexpected stack id for procedures which root is {}, the {}th stack id is {}", + rootEntry.proc, i, stackId); + addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap); + return; + } + } + addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap); + } + + private void checkOrphan(Map procMap) { + procMap.values().forEach(entry -> { + LOG.error("Orphan procedure: {}", entry); + corruptedProcs.add(new ProtoAndProc(entry.proc)); + }); + } + + private static final class Iter implements ProcedureIterator { + + private final List procs; + + private Iterator iter; + + private ProtoAndProc current; + + public Iter(List procs) { + this.procs = procs; + reset(); + } + + @Override + public void reset() { + iter = procs.iterator(); + if (iter.hasNext()) { + current = iter.next(); + } else { + current = null; + } + } + + @Override + public boolean hasNext() { + return current != null; + } + + private void checkNext() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + } + + @Override + public boolean isNextFinished() { + checkNext(); + return isFinished(current.proto); + } + + private void moveToNext() { + if (iter.hasNext()) { + current = iter.next(); + } else { + current = null; + } + } + + @Override + public void skipNext() { + checkNext(); + moveToNext(); + } + + @Override + public Procedure next() throws IOException { + checkNext(); + Procedure proc = current.getProc(); + moveToNext(); + return proc; + } + } + + public ProcedureIterator getValidProcs() { + return new Iter(validProcs); + } + + public ProcedureIterator getCorruptedProcs() { + return new Iter(corruptedProcs); + } + + public static WALProcedureTree build(Collection procedures) { + Map procMap = new HashMap<>(); + for (ProcedureProtos.Procedure proc : procedures) { + procMap.put(proc.getProcId(), new Entry(proc)); + } + return new WALProcedureTree(procMap); + } +} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java index 443386d084..da53fa5d0b 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; import java.util.Random; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index d682481e88..0f598b0df8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.FileNotFoundException; import java.io.IOException; @@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.procedure2.SequentialProcedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -72,7 +70,6 @@ public class TestWALProcedureStore { private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class); private static final int PROCEDURE_STORE_SLOTS = 1; - private static final Procedure NULL_PROC = null; private WALProcedureStore procStore; @@ -153,7 +150,7 @@ public class TestWALProcedureStore { @Test public void testWalCleanerSequentialClean() throws Exception { - final Procedure[] procs = new Procedure[5]; + final Procedure[] procs = new Procedure[5]; ArrayList logs = null; // Insert procedures and roll wal after every insert. @@ -182,7 +179,7 @@ public class TestWALProcedureStore { // they are in the starting of the list. @Test public void testWalCleanerNoHoles() throws Exception { - final Procedure[] procs = new Procedure[5]; + final Procedure[] procs = new Procedure[5]; ArrayList logs = null; // Insert procedures and roll wal after every insert. for (int i = 0; i < procs.length; i++) { @@ -242,7 +239,7 @@ public class TestWALProcedureStore { @Test public void testWalCleanerWithEmptyRolls() throws Exception { - final Procedure[] procs = new Procedure[3]; + final Procedure[] procs = new Procedure[3]; for (int i = 0; i < procs.length; ++i) { procs[i] = new TestSequentialProcedure(); procStore.insert(procs[i], null); @@ -284,12 +281,12 @@ public class TestWALProcedureStore { Set procIds = new HashSet<>(); // Insert something in the log - Procedure proc1 = new TestSequentialProcedure(); + Procedure proc1 = new TestSequentialProcedure(); procIds.add(proc1.getProcId()); procStore.insert(proc1, null); - Procedure proc2 = new TestSequentialProcedure(); - Procedure[] child2 = new Procedure[2]; + Procedure proc2 = new TestSequentialProcedure(); + Procedure[] child2 = new Procedure[2]; child2[0] = new TestSequentialProcedure(); child2[1] = new TestSequentialProcedure(); @@ -323,11 +320,11 @@ public class TestWALProcedureStore { @Test public void testNoTrailerDoubleRestart() throws Exception { // log-0001: proc 0, 1 and 2 are inserted - Procedure proc0 = new TestSequentialProcedure(); + Procedure proc0 = new TestSequentialProcedure(); procStore.insert(proc0, null); - Procedure proc1 = new TestSequentialProcedure(); + Procedure proc1 = new TestSequentialProcedure(); procStore.insert(proc1, null); - Procedure proc2 = new TestSequentialProcedure(); + Procedure proc2 = new TestSequentialProcedure(); procStore.insert(proc2, null); procStore.rollWriterForTesting(); @@ -420,7 +417,7 @@ public class TestWALProcedureStore { } private static void assertUpdated(final ProcedureStoreTracker tracker, - final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { + final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { for (int index : updatedProcs) { long procId = procs[index].getProcId(); assertTrue("Procedure id : " + procId, tracker.isModified(procId)); @@ -432,7 +429,7 @@ public class TestWALProcedureStore { } private static void assertDeleted(final ProcedureStoreTracker tracker, - final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) { + final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) { for (int index : deletedProcs) { long procId = procs[index].getProcId(); assertEquals("Procedure id : " + procId, @@ -447,7 +444,7 @@ public class TestWALProcedureStore { @Test public void testCorruptedTrailersRebuild() throws Exception { - final Procedure[] procs = new Procedure[6]; + final Procedure[] procs = new Procedure[6]; for (int i = 0; i < procs.length; ++i) { procs[i] = new TestSequentialProcedure(); } @@ -575,127 +572,20 @@ public class TestWALProcedureStore { storeRestart(loader); assertEquals(0, loader.getLoadedCount()); assertEquals(rootProcs.length, loader.getCorruptedCount()); - for (Procedure proc: loader.getCorrupted()) { + for (Procedure proc : loader.getCorrupted()) { assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length); assertTrue(proc.toString(), - proc.getProcId() > rootProcs.length && - proc.getProcId() <= (rootProcs.length * 2)); + proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2)); } } - @Test - public void testWalReplayOrder_AB_A() throws Exception { - /* - * | A B | -> | A | - */ - TestProcedure a = new TestProcedure(1, 0); - TestProcedure b = new TestProcedure(2, 1); - - procStore.insert(a, null); - a.addStackId(0); - procStore.update(a); - - procStore.insert(a, new Procedure[] { b }); - b.addStackId(1); - procStore.update(b); - - procStore.rollWriterForTesting(); - - a.addStackId(2); - procStore.update(a); - - storeRestart(new ProcedureStore.ProcedureLoader() { - @Override - public void setMaxProcId(long maxProcId) { - assertEquals(2, maxProcId); - } - - @Override - public void load(ProcedureIterator procIter) throws IOException { - assertTrue(procIter.hasNext()); - assertEquals(1, procIter.next().getProcId()); - assertTrue(procIter.hasNext()); - assertEquals(2, procIter.next().getProcId()); - assertFalse(procIter.hasNext()); - } - - @Override - public void handleCorrupted(ProcedureIterator procIter) throws IOException { - assertFalse(procIter.hasNext()); - } - }); - } - - @Test - public void testWalReplayOrder_ABC_BAD() throws Exception { - /* - * | A B C | -> | B A D | - */ - TestProcedure a = new TestProcedure(1, 0); - TestProcedure b = new TestProcedure(2, 1); - TestProcedure c = new TestProcedure(3, 2); - TestProcedure d = new TestProcedure(4, 0); - - procStore.insert(a, null); - a.addStackId(0); - procStore.update(a); - - procStore.insert(a, new Procedure[] { b }); - b.addStackId(1); - procStore.update(b); - - procStore.insert(b, new Procedure[] { c }); - b.addStackId(2); - procStore.update(b); - - procStore.rollWriterForTesting(); - - b.addStackId(3); - procStore.update(b); - - a.addStackId(4); - procStore.update(a); - - procStore.insert(d, null); - d.addStackId(0); - procStore.update(d); - - storeRestart(new ProcedureStore.ProcedureLoader() { - @Override - public void setMaxProcId(long maxProcId) { - assertEquals(4, maxProcId); - } - - @Override - public void load(ProcedureIterator procIter) throws IOException { - assertTrue(procIter.hasNext()); - assertEquals(4, procIter.next().getProcId()); - // TODO: This will be multiple call once we do fast-start - //assertFalse(procIter.hasNext()); - - assertTrue(procIter.hasNext()); - assertEquals(1, procIter.next().getProcId()); - assertTrue(procIter.hasNext()); - assertEquals(2, procIter.next().getProcId()); - assertTrue(procIter.hasNext()); - assertEquals(3, procIter.next().getProcId()); - assertFalse(procIter.hasNext()); - } - - @Override - public void handleCorrupted(ProcedureIterator procIter) throws IOException { - assertFalse(procIter.hasNext()); - } - }); - } - @Test public void testRollAndRemove() throws IOException { // Insert something in the log - Procedure proc1 = new TestSequentialProcedure(); + Procedure proc1 = new TestSequentialProcedure(); procStore.insert(proc1, null); - Procedure proc2 = new TestSequentialProcedure(); + Procedure proc2 = new TestSequentialProcedure(); procStore.insert(proc2, null); // roll the log, now we have 2 @@ -942,17 +832,6 @@ public class TestWALProcedureStore { assertEquals(0, loader.getCorruptedCount()); } - private void assertEmptyLogDir() { - try { - FileStatus[] status = fs.listStatus(logDir); - assertTrue("expected empty state-log dir", status == null || status.length == 0); - } catch (FileNotFoundException e) { - fail("expected the state-log dir to be present: " + logDir); - } catch (IOException e) { - fail("got en exception on state-log dir list: " + e.getMessage()); - } - } - public static class TestSequentialProcedure extends SequentialProcedure { private static long seqid = 0; @@ -961,13 +840,18 @@ public class TestWALProcedureStore { } @Override - protected Procedure[] execute(Void env) { return null; } + protected Procedure[] execute(Void env) { + return null; + } @Override - protected void rollback(Void env) { } + protected void rollback(Void env) { + } @Override - protected boolean abort(Void env) { return false; } + protected boolean abort(Void env) { + return false; + } @Override protected void serializeStateData(ProcedureStateSerializer serializer) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e2e4aec00b..ab9a7992ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; * avoiding port contention if another local HBase instance is already running). *

To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" * setting it to true. + * Trigger pre commit. */ @InterfaceAudience.Public @SuppressWarnings("deprecation") -- 2.17.1