diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 338fcad..6abf2c5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -519,6 +519,20 @@ public abstract class Procedure implements Comparable { return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } + /** + * Get an hashcode for the specified Procedure ID + * @return the hashcode for the specified procId + */ + public static long getProcIdHashCode(final long procId) { + long h = procId; + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + return h; + } + /* * Helper to lookup the root Procedure ID given a specified procedure. */ diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 2982058..a51edb3 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -28,7 +28,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.HashSet; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -43,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue; import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever; @@ -260,43 +260,62 @@ public class ProcedureExecutor { this.conf = conf; } - private List> load() throws IOException { + private void load() throws IOException { Preconditions.checkArgument(completed.isEmpty()); Preconditions.checkArgument(rollbackStack.isEmpty()); Preconditions.checkArgument(procedures.isEmpty()); Preconditions.checkArgument(waitingTimeout.isEmpty()); Preconditions.checkArgument(runnables.size() == 0); - // 1. Load the procedures - Iterator loader = store.load(); - if (loader == null) { - lastProcId.set(0); - return null; - } + store.load(new ProcedureStore.ProcedureLoader() { + @Override + public void setMaxProcId(long maxProcId) { + assert lastProcId.get() < 0; + lastProcId.set(maxProcId); + } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + loadProcedures(procIter); + } - long logMaxProcId = 0; + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + LOG.error("corrupted procedure: " + proc); + } + throw new IOException("has corrupted procedures"); + } + }); + } + + private void loadProcedures(final ProcedureIterator procIter) throws IOException { + // 1. Build the rollback stack int runnablesCount = 0; - while (loader.hasNext()) { - Procedure proc = loader.next(); - proc.beforeReplay(getEnvironment()); - procedures.put(proc.getProcId(), proc); - logMaxProcId = Math.max(logMaxProcId, proc.getProcId()); - LOG.debug("Loading procedure state=" + proc.getState() + - " isFailed=" + proc.hasException() + ": " + proc); + while (procIter.hasNext()) { + Procedure proc = procIter.next(); if (!proc.hasParent() && !proc.isFinished()) { rollbackStack.put(proc.getProcId(), new RootProcedureState()); } + // add the procedure to the map + proc.beforeReplay(getEnvironment()); + procedures.put(proc.getProcId(), proc); + if (proc.getState() == ProcedureState.RUNNABLE) { runnablesCount++; } } - assert lastProcId.get() < 0; - lastProcId.set(logMaxProcId); // 2. Initialize the stacks - TreeSet runnableSet = null; + ArrayList runnableSet = new ArrayList(runnablesCount); HashSet waitingSet = null; - for (final Procedure proc: procedures.values()) { + procIter.reset(); + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + LOG.debug("Loading procedure state=" + proc.getState() + + " isFailed=" + proc.hasException() + ": " + proc); + Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback? @@ -325,9 +344,6 @@ public class ProcedureExecutor { switch (proc.getState()) { case RUNNABLE: - if (runnableSet == null) { - runnableSet = new TreeSet(); - } runnableSet.add(proc); break; case WAITING_TIMEOUT: @@ -353,7 +369,6 @@ public class ProcedureExecutor { } // 3. Validate the stacks - List> corrupted = null; Iterator> itStack = rollbackStack.entrySet().iterator(); while (itStack.hasNext()) { Map.Entry entry = itStack.next(); @@ -361,29 +376,26 @@ public class ProcedureExecutor { if (procStack.isValid()) continue; for (Procedure proc: procStack.getSubprocedures()) { + LOG.error("corrupted procedure: " + proc); procedures.remove(proc.getProcId()); if (runnableSet != null) runnableSet.remove(proc); if (waitingSet != null) waitingSet.remove(proc); } itStack.remove(); - if (corrupted == null) { - corrupted = new ArrayList>(); - } - corrupted.add(entry); } // 4. Push the runnables - if (runnableSet != null) { + if (runnableSet != null && !runnableSet.isEmpty()) { // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure // may be started way before this stuff. - for (Procedure proc: runnableSet) { + for (int i = 0; i < runnableSet.size(); ++i) { + Procedure proc = runnableSet.get(i); if (!proc.hasParent()) { sendProcedureLoadedNotification(proc.getProcId()); } runnables.addBack(proc); } } - return corrupted; } public void start(int numThreads) throws IOException { @@ -419,7 +431,7 @@ public class ProcedureExecutor { store.recoverLease(); // TODO: Split in two steps. - // TODO: Handle corrupted procedure returned (probably just a WARN) + // TODO: Handle corrupted procedures (currently just a warn) // The first one will make sure that we have the latest id, // so we can start the threads and accept new procedures. // The second step will do the actual load of old procedures. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 06bfa44..a05c115 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure2.store; import java.io.IOException; -import java.util.Iterator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -46,6 +45,57 @@ public interface ProcedureStore { } /** + * An Iterator over a collection of Procedure + */ + public interface ProcedureIterator { + /** + * Reset the Iterator by seeking to the beginning of the list. + */ + void reset(); + + /** + * Returns true if the iterator has more elements. + * (In other words, returns true if next() would return a Procedure + * rather than throwing an exception.) + * @return true if the iterator has more procedures + */ + boolean hasNext(); + + /** + * Returns the next procedure in the iteration. + * @throws IOException if there was an error fetching/deserializing the procedure + * @throws NoSuchElementException if the iteration has no more elements + * @return the next procedure in the iteration. + */ + Procedure next() throws IOException; + } + + /** + * Interface passed to the ProcedureStore.load() method to handle the store-load events. + */ + public interface ProcedureLoader { + /** + * Called by ProcedureStore.load() to notify about the maximum proc-id in the store. + * @param maxProcId the highest proc-id in the store + */ + void setMaxProcId(long maxProcId); + + /** + * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed. + * The ProcedureIterator passed to the method, has the procedure sorted in replay-order. + * @param procIter iterator over the procedures ready to be added to the executor. + */ + void load(ProcedureIterator procIter) throws IOException; + + /** + * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to + * the executor, which probably means they are corrupted since some information/link is missing. + * @param procIter iterator over the procedures not ready to be added to the executor, corrupted + */ + void handleCorrupted(ProcedureIterator procIter) throws IOException; + } + + /** * Add the listener to the notification list. * @param listener The AssignmentListener to register */ @@ -87,9 +137,9 @@ public interface ProcedureStore { /** * Load the Procedures in the store. - * @return the set of procedures present in the store + * @param loader the ProcedureLoader that will handle the store-load events */ - Iterator load() throws IOException; + void load(ProcedureLoader loader) throws IOException; /** * When a procedure is submitted to the executor insert(proc, null) will be called. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index 17432ac..abd1b9c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; @@ -63,14 +64,14 @@ public final class ProcedureWALFormat { } } - interface Loader { - void removeLog(ProcedureWALFile log); - void markCorruptedWAL(ProcedureWALFile log, IOException e); + interface Loader extends ProcedureLoader { + public abstract void removeLog(ProcedureWALFile log); + public abstract void markCorruptedWAL(ProcedureWALFile log, IOException e); } private ProcedureWALFormat() {} - public static Iterator load(final Iterator logs, + public static void load(final Iterator logs, final ProcedureStoreTracker tracker, final Loader loader) throws IOException { ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker); tracker.setKeepDeletes(true); @@ -84,14 +85,13 @@ public final class ProcedureWALFormat { log.close(); } } + reader.finalize(loader); // The tracker is now updated with all the procedures read from the logs tracker.setPartialFlag(false); tracker.resetUpdates(); } finally { tracker.setKeepDeletes(false); } - // TODO: Write compacted version? - return reader.getProcedures(); } public static void writeHeader(OutputStream stream, ProcedureWALHeader header) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index a60b8f5..c85b3eb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; @@ -41,17 +39,73 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEn public class ProcedureWALFormatReader { private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class); - private final ProcedureStoreTracker tracker; - //private final long compactionLogId; - - private final Map procedures = new HashMap(); - private final Map localProcedures = - new HashMap(); + // ============================================================================================== + // We read the WALs in reverse order. from the newest to the oldest. + // We have different entry types: + // - INIT: Procedure submitted by the user (also known as 'root procedure') + // - INSERT: Children added to the procedure :[, ...] + // - UPDATE: The specified procedure was updated + // - DELETE: The procedure was removed (completed/rolledback and result TTL expired) + // + // In the WAL we can find multiple time the same procedure as UPDATE or INSERT. + // We read the WAL from top to bottom, so every time we find an entry of the + // same procedure, that will be the "latest" update. + // + // We keep two in-memory maps: + // - localProcedureMap: is the map containing the entries in the WAL we are processing + // - procedureMap: is the map containing all the procedures we found up to the WAL in process. + // localProcedureMap is merged with the procedureMap once we reach the WAL EOF. + // + // Since we are reading the WALs in reverse order (newest to oldest), + // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it. + // + // The WAL is append-only so the last procedure in the WAL is the one that + // was in execution at the time we crashed/closed the server. + // given that, the procedure reply order can be inferred by the WAL order. + // + // Example: + // WAL-2: [A, B, A, C, D] + // WAL-1: [F, G, A, F, B] + // Replay-Order: [D, C, A, B, F, G] + // + // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the + // record to the map that record is moved to the head of the "replayOrder" list. + // Using the example above: + // WAL-2 localProcedureMap.replayOrder is [D, C, A, B] + // WAL-1 localProcedureMap.replayOrder is [F, G] + // + // each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap' + // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order. + // + // Fast Start: INIT/INSERT record and StackIDs + // --------------------------------------------- + // We have to special record, INIT and INSERT that tracks the first time + // the procedure was added to the WAL. We can use that information to be able + // to start procedures before reaching the end of the WAL, or before reading all the WALs. + // but in some cases the WAL with that record can be already gone. + // In alternative we can use the stackIds on each procedure, + // to identify when a procedure is ready to start. + // If there are gaps in the sum of the stackIds we need to read more WALs. + // + // Example (all procs child of A): + // WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5] + // WAL-1: [A, B, C, D] + // + // In the case above we need to read one more WAL to be able to consider + // the root procedure A and all children as ready. + // ============================================================================================== + private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); + private final WalProcedureMap procedureMap = new WalProcedureMap(1024); + //private long compactionLogId; private long maxProcId = 0; + private final ProcedureStoreTracker tracker; + private final boolean hasFastStartSupport; + public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) { this.tracker = tracker; + this.hasFastStartSupport = !tracker.isEmpty(); } public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException { @@ -91,28 +145,35 @@ public class ProcedureWALFormatReader { loader.markCorruptedWAL(log, e); } - if (localProcedures.isEmpty()) { + if (localProcedureMap.isEmpty()) { LOG.info("No active entry found in state log " + log + ". removing it"); loader.removeLog(log); } else { - Iterator> itd = - localProcedures.entrySet().iterator(); - while (itd.hasNext()) { - Map.Entry entry = itd.next(); - itd.remove(); - - // Deserialize the procedure - Procedure proc = Procedure.convert(entry.getValue()); - procedures.put(entry.getKey(), proc); - } + procedureMap.mergeTail(localProcedureMap); - // TODO: Some procedure may be already runnables (see readInitEntry()) - // (we can also check the "update map" in the log trackers) + //if (hasFastStartSupport) { + // TODO: Some procedure may be already runnables (see readInitEntry()) + // (we can also check the "update map" in the log trackers) + // -------------------------------------------------- + //EntryIterator iter = procedureMap.fetchReady(); + //if (iter != null) //load(iter); + // -------------------------------------------------- + //} } } - public Iterator getProcedures() { - return procedures.values().iterator(); + public void finalize(ProcedureWALFormat.Loader loader) throws IOException { + // notify the loader about the max proc ID + loader.setMaxProcId(maxProcId); + + // fetch the procedure ready to run. + ProcedureIterator procIter = procedureMap.fetchReady(); + if (procIter != null) loader.load(procIter); + + // remaining procedures have missing link or dependencies + // consider them as corrupted, manual fix is probably required. + procIter = procedureMap.fetchAll(); + if (procIter != null) loader.handleCorrupted(procIter); } private void loadEntries(final ProcedureWALEntry entry) { @@ -122,7 +183,7 @@ public class ProcedureWALFormatReader { if (LOG.isTraceEnabled()) { LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); } - localProcedures.put(proc.getProcId(), proc); + localProcedureMap.add(proc); tracker.setDeleted(proc.getProcId(), false); } } @@ -152,7 +213,7 @@ public class ProcedureWALFormatReader { LOG.trace("read delete entry " + entry.getProcId()); } maxProcId = Math.max(maxProcId, entry.getProcId()); - localProcedures.remove(entry.getProcId()); + localProcedureMap.remove(entry.getProcId()); tracker.setDeleted(entry.getProcId(), true); } @@ -161,6 +222,272 @@ public class ProcedureWALFormatReader { } private boolean isRequired(final long procId) { - return !isDeleted(procId) && !procedures.containsKey(procId); + return !isDeleted(procId) && !procedureMap.contains(procId); + } + + // ========================================================================== + // We keep an in-memory map of the procedures sorted by replay order + // (see the details in the beginning of the file) + // ========================================================================== + private static class Entry { + // hash-table next + private Entry hashNext; + // replay double-linked-list + private Entry replayNext; + private Entry replayPrev; + // procedure-infos + private Procedure procedure; + private ProcedureProtos.Procedure proto; + + public Entry(Entry hashNext) { this.hashNext = hashNext; } + + public long getProcId() { return proto.getProcId(); } + public long getParentId() { return proto.getParentId(); } + public boolean hasParent() { return proto.hasParentId(); } + + public Procedure convert() throws IOException { + if (procedure == null) { + procedure = Procedure.convert(proto); + } + return procedure; + } + } + + private static class EntryIterator implements ProcedureIterator { + private final Entry replayHead; + private Entry current; + + public EntryIterator(Entry replayHead) { + this.replayHead = replayHead; + this.current = replayHead; + } + + @Override + public void reset() { + this.current = replayHead; + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public Procedure next() throws IOException { + try { + return current.convert(); + } finally { + current = current.replayNext; + } + } + } + + private static class WalProcedureMap { + // procedure hash table + private Entry[] procedureMap; + + // reply-order double-linked-list + private Entry replayOrderHead; + private Entry replayOrderTail; + + public WalProcedureMap(int size) { + procedureMap = new Entry[size]; + replayOrderHead = null; + replayOrderTail = null; + } + + public void add(ProcedureProtos.Procedure procProto) { + Entry entry = addToMap(procProto.getProcId()); + entry.proto = procProto; + addToReplayList(entry); + } + + public boolean remove(long procId) { + Entry entry = removeFromMap(procId); + if (entry != null) { + unlinkFromReplyList(entry); + return true; + } + return false; + } + + public boolean contains(long procId) { + return getProcedure(procId) != null; + } + + public boolean isEmpty() { + return replayOrderHead == null; + } + + public void clear() { + for (int i = 0; i < procedureMap.length; ++i) { + procedureMap[i] = null; + } + replayOrderHead = null; + replayOrderTail = null; + } + + public void mergeTail(WalProcedureMap other) { + for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { + int slotIndex = getMapSlot(p.getProcId()); + p.hashNext = procedureMap[slotIndex]; + procedureMap[slotIndex] = p; + } + + if (replayOrderHead == null) { + replayOrderHead = other.replayOrderHead; + replayOrderTail = other.replayOrderTail; + } else { + assert replayOrderTail.replayNext == null; + assert other.replayOrderHead.replayPrev == null; + replayOrderTail.replayNext = other.replayOrderHead; + other.replayOrderHead.replayPrev = replayOrderTail; + replayOrderTail = other.replayOrderTail; + } + + other.clear(); + } + + public EntryIterator fetchReady() { + Entry readyHead = null; + Entry readyTail = null; + Entry p = replayOrderHead; + while (p != null) { + Entry next = p.replayNext; + if (isReadyToRun(p)) { + unlinkFromReplyList(p); + if (readyTail != null) { + readyTail.replayNext = p; + p.replayPrev = readyTail; + } else { + p.replayPrev = null; + readyHead = p; + } + readyTail = p; + p.replayNext = null; + } + p = next; + } + // we need the hash-table lookups for parents, so this must be done + // out of the loop where we check isReadyToRun() + for (p = readyHead; p != null; p = p.replayNext) { + removeFromMap(p.getProcId()); + } + return readyHead != null ? new EntryIterator(readyHead) : null; + } + + public EntryIterator fetchAll() { + Entry head = replayOrderHead; + for (Entry p = head; p != null; p = p.replayNext) { + removeFromMap(p.getProcId()); + } + for (int i = 0; i < procedureMap.length; ++i) { + assert procedureMap[i] == null : "map not empty i=" + i; + } + replayOrderHead = null; + replayOrderTail = null; + return head != null ? new EntryIterator(head) : null; + } + + private boolean isReadyToRun(Entry entry) { + assert entry != null; + int stackIdSum = 0; + int maxStackId = 0; + while (true) { + for (int i = 0; i < entry.proto.getStackIdCount(); ++i) { + int stackId = 1 + entry.proto.getStackId(i); + maxStackId = Math.max(maxStackId, stackId); + stackIdSum += stackId; + } + if (!entry.hasParent()) break; + entry = getProcedure(entry.getParentId()); + if (entry == null) { + return false; + } + } + int cmpStackIdSum = 0; + for (int i = 1; i <= maxStackId; ++i) { + cmpStackIdSum += i; + } + return (cmpStackIdSum == stackIdSum); + } + + private void unlinkFromReplyList(Entry entry) { + if (replayOrderHead == entry) { + replayOrderHead = entry.replayNext; + } + if (replayOrderTail == entry) { + replayOrderTail = entry.replayPrev; + } + if (entry.replayPrev != null) { + entry.replayPrev.replayNext = entry.replayNext; + } + if (entry.replayNext != null) { + entry.replayNext.replayPrev = entry.replayPrev; + } + } + + private void addToReplayList(final Entry entry) { + unlinkFromReplyList(entry); + if (replayOrderHead != null) { + entry.replayNext = replayOrderHead; + entry.replayPrev = null; + replayOrderHead.replayPrev = entry; + } else { + entry.replayNext = null; + entry.replayPrev = null; + replayOrderTail = entry; + } + replayOrderHead = entry; + } + + private Entry addToMap(final long procId) { + int slotIndex = getMapSlot(procId); + Entry entry = getProcedure(slotIndex, procId); + if (entry == null) { + entry = new Entry(procedureMap[slotIndex]); + procedureMap[slotIndex] = entry; + } + return entry; + } + + private Entry removeFromMap(final long procId) { + int slotIndex = getMapSlot(procId); + Entry prev = null; + Entry entry = procedureMap[slotIndex]; + while (entry != null) { + if (procId == entry.getProcId()) { + if (prev != null) { + prev.hashNext = entry.hashNext; + } else { + procedureMap[slotIndex] = entry.hashNext; + } + entry.hashNext = null; + return entry; + } + prev = entry; + entry = entry.hashNext; + } + return null; + } + + private Entry getProcedure(final long procId) { + return getProcedure(getMapSlot(procId), procId); + } + + private Entry getProcedure(final int slotIndex, final long procId) { + Entry entry = procedureMap[slotIndex]; + while (entry != null) { + if (procId == entry.getProcId()) { + return entry; + } + entry = entry.hashNext; + } + return null; + } + + private int getMapSlot(final long procId) { + return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length); + } } } \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 0bda0d1..23af433 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -239,7 +239,7 @@ public class WALProcedureStore implements ProcedureStore { } @Override - public Iterator load() throws IOException { + public void load(final ProcedureLoader loader) throws IOException { if (logs.isEmpty()) { throw new RuntimeException("recoverLease() must be called before loading data"); } @@ -247,7 +247,8 @@ public class WALProcedureStore implements ProcedureStore { // Nothing to do, If we have only the current log. if (logs.size() == 1) { LOG.debug("No state logs to replay"); - return null; + loader.setMaxProcId(0); + return; } // Load the old logs @@ -255,7 +256,22 @@ public class WALProcedureStore implements ProcedureStore { Iterator it = logs.descendingIterator(); it.next(); // Skip the current log try { - return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { + ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { + @Override + public void setMaxProcId(long maxProcId) { + loader.setMaxProcId(maxProcId); + } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + loader.load(procIter); + } + + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + loader.handleCorrupted(procIter); + } + @Override public void removeLog(ProcedureWALFile log) { toRemove.add(log); @@ -297,7 +313,7 @@ public class WALProcedureStore implements ProcedureStore { } // Push the transaction data and wait until it is persisted - logId = pushData(slot); + pushData(slot); } catch (IOException e) { // We are not able to serialize the procedure. // this is a code error, and we are not able to go on. @@ -603,7 +619,7 @@ public class WALProcedureStore implements ProcedureStore { } private void removeAllLogs(long lastLogId) { - LOG.info("Remove all state logs with ID less then " + lastLogId); + LOG.debug("Remove all state logs with ID less then " + lastLogId); while (!logs.isEmpty()) { ProcedureWALFile log = logs.getFirst(); if (lastLogId < log.getLogId()) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index 88645ed..49ba4e4 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -19,13 +19,17 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -47,6 +51,7 @@ public class TestProcedureReplayOrder { private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class); private static final Procedure NULL_PROC = null; + private static final int NUM_THREADS = 16; private ProcedureExecutor procExecutor; private TestProcedureEnv procEnv; @@ -60,7 +65,7 @@ public class TestProcedureReplayOrder { @Before public void setUp() throws IOException { htu = new HBaseCommonTestingUtility(); - htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10); + htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 25); testDir = htu.getDataTestDir(); fs = testDir.getFileSystem(htu.getConfiguration()); @@ -70,7 +75,7 @@ public class TestProcedureReplayOrder { procEnv = new TestProcedureEnv(); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); - procStore.start(24); + procStore.start(NUM_THREADS); procExecutor.start(1); } @@ -83,46 +88,44 @@ public class TestProcedureReplayOrder { @Test(timeout=90000) public void testSingleStepReplyOrder() throws Exception { - // avoid the procedure to be runnable - procEnv.setAcquireLock(false); + final int NUM_PROC_XTHREAD = 32; + final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD; // submit the procedures - submitProcedures(16, 25, TestSingleStepProcedure.class); + submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class); + + while (procEnv.getExecId() < NUM_PROCS) { + Thread.sleep(100); + } // restart the executor and allow the procedures to run - ProcedureTestingUtility.restart(procExecutor, new Runnable() { - @Override - public void run() { - procEnv.setAcquireLock(true); - } - }); + ProcedureTestingUtility.restart(procExecutor); // wait the execution of all the procedures and // assert that the execution order was sorted by procId ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); - procEnv.assertSortedExecList(); - - // TODO: FIXME: This should be revisited + procEnv.assertSortedExecList(NUM_PROCS); } - @Ignore @Test(timeout=90000) public void testMultiStepReplyOrder() throws Exception { - // avoid the procedure to be runnable - procEnv.setAcquireLock(false); + final int NUM_PROC_XTHREAD = 24; + final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2); // submit the procedures - submitProcedures(16, 10, TestTwoStepProcedure.class); + submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class); + + while (procEnv.getExecId() < NUM_PROCS) { + Thread.sleep(100); + } // restart the executor and allow the procedures to run - ProcedureTestingUtility.restart(procExecutor, new Runnable() { - @Override - public void run() { - procEnv.setAcquireLock(true); - } - }); + ProcedureTestingUtility.restart(procExecutor); - fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER"); + // wait the execution of all the procedures and + // assert that the execution order was sorted by procId + ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); + procEnv.assertSortedExecList(NUM_PROCS); } private void submitProcedures(final int nthreads, final int nprocPerThread, @@ -154,46 +157,38 @@ public class TestProcedureReplayOrder { } private static class TestProcedureEnv { - private ArrayList execList = new ArrayList(); - private boolean acquireLock = true; - - public void setAcquireLock(boolean acquireLock) { - this.acquireLock = acquireLock; - } + private ArrayList execList = new ArrayList(); + private AtomicLong execTimestamp = new AtomicLong(0); - public boolean canAcquireLock() { - return acquireLock; + public long getExecId() { + return execTimestamp.get(); } - public void addToExecList(final Procedure proc) { - execList.add(proc.getProcId()); + public long nextExecId() { + return execTimestamp.incrementAndGet(); } - public ArrayList getExecList() { - return execList; + public void addToExecList(final TestProcedure proc) { + execList.add(proc); } - public void assertSortedExecList() { + public void assertSortedExecList(int numProcs) { + assertEquals(numProcs, execList.size()); LOG.debug("EXEC LIST: " + execList); - for (int i = 1; i < execList.size(); ++i) { - assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i), - execList.get(i-1) < execList.get(i)); + for (int i = 0; i < execList.size() - 1; ++i) { + TestProcedure a = execList.get(i); + TestProcedure b = execList.get(i + 1); + assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId()); } } } - public static class TestSingleStepProcedure extends SequentialProcedure { - public TestSingleStepProcedure() { } - - @Override - protected Procedure[] execute(TestProcedureEnv env) { - LOG.debug("execute procedure " + this); - env.addToExecList(this); - return null; - } + public static abstract class TestProcedure extends Procedure { + protected long execId = 0; + protected int step = 0; - protected boolean acquireLock(final TestProcedureEnv env) { - return env.canAcquireLock(); + public long getExecId() { + return execId; } @Override @@ -201,26 +196,62 @@ public class TestProcedureReplayOrder { @Override protected boolean abort(TestProcedureEnv env) { return true; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + StreamUtils.writeLong(stream, execId); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + execId = StreamUtils.readLong(stream); + step = 2; + } } - public static class TestTwoStepProcedure extends SequentialProcedure { - public TestTwoStepProcedure() { } + public static class TestSingleStepProcedure extends TestProcedure { + public TestSingleStepProcedure() { } @Override - protected Procedure[] execute(TestProcedureEnv env) { - LOG.debug("execute procedure " + this); - env.addToExecList(this); - return new Procedure[] { new TestSingleStepProcedure() }; + protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { + LOG.trace("execute procedure step=" + step + ": " + this); + if (step == 0) { + step = 1; + execId = env.nextExecId(); + return new Procedure[] { this }; + } else if (step == 2) { + env.addToExecList(this); + return null; + } + throw new ProcedureYieldException(); } - protected boolean acquireLock(final TestProcedureEnv env) { - return true; + @Override + public String toString() { + return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")"; } + } + + public static class TestTwoStepProcedure extends TestProcedure { + public TestTwoStepProcedure() { } @Override - protected void rollback(TestProcedureEnv env) { } + protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { + LOG.trace("execute procedure step=" + step + ": " + this); + if (step == 0) { + step = 1; + execId = env.nextExecId(); + return new Procedure[] { new TestSingleStepProcedure() }; + } else if (step == 2) { + env.addToExecList(this); + return null; + } + throw new ProcedureYieldException(); + } @Override - protected boolean abort(TestProcedureEnv env) { return true; } + public String toString() { + return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")"; + } } } \ No newline at end of file diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 344b28b..09ee64c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicInteger; import java.util.Iterator; import java.util.HashSet; import java.util.Set; @@ -35,6 +36,8 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.SequentialProcedure; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; @@ -84,17 +87,20 @@ public class TestWALProcedureStore { fs.delete(logDir, true); } - private Iterator storeRestart() throws Exception { + private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception { procStore.stop(false); procStore.start(PROCEDURE_STORE_SLOTS); procStore.recoverLease(); - return procStore.load(); + procStore.load(loader); } @Test public void testEmptyLogLoad() throws Exception { - Iterator loader = storeRestart(); - assertEquals(0, countProcedures(loader)); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(0, loader.getMaxProcId()); + assertEquals(0, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); } @Test @@ -153,8 +159,10 @@ public class TestWALProcedureStore { assertEquals(1, logs.length); corruptLog(logs[0], 4); - int count = countProcedures(storeRestart()); - assertEquals(100, count); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(100, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); } @Test @@ -173,10 +181,12 @@ public class TestWALProcedureStore { assertEquals(1, logs.length); corruptLog(logs[0], 1823); - int count = countProcedures(storeRestart()); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); assertTrue(procStore.getCorruptedLogs() != null); assertEquals(1, procStore.getCorruptedLogs().size()); - assertEquals(85, count); + assertEquals(85, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); } private void corruptLog(final FileStatus logFile, final long dropBytes) @@ -192,29 +202,11 @@ public class TestWALProcedureStore { } private void verifyProcIdsOnRestart(final Set procIds) throws Exception { - int count = 0; - Iterator loader = storeRestart(); - while (loader.hasNext()) { - Procedure proc = loader.next(); - LOG.debug("loading procId=" + proc.getProcId()); - assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId())); - count++; - } - assertEquals(procIds.size(), count); - } - - private void assertIsEmpty(Iterator iterator) { - assertEquals(0, countProcedures(iterator)); - } - - private int countProcedures(Iterator iterator) { - int count = 0; - while (iterator.hasNext()) { - Procedure proc = iterator.next(); - LOG.trace("loading procId=" + proc.getProcId()); - count++; - } - return count; + LOG.debug("expected: " + procIds); + LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(procIds.size(), loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); } private void assertEmptyLogDir() { @@ -264,4 +256,59 @@ public class TestWALProcedureStore { } } } + + private class LoadCounter implements ProcedureStore.ProcedureLoader { + private final Set procIds; + + private int corruptedCount = 0; + private int loadedCount = 0; + private long maxProcId = 0; + + public LoadCounter() { + this(null); + } + + public LoadCounter(final Set procIds) { + this.procIds = procIds; + } + + public long getMaxProcId() { + return maxProcId; + } + + public int getLoadedCount() { + return loadedCount; + } + + public int getCorruptedCount() { + return corruptedCount; + } + + @Override + public void setMaxProcId(long maxProcId) { + maxProcId = maxProcId; + } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + LOG.trace("loading procId=" + proc.getProcId() + ": " + proc); + if (procIds != null) { + assertTrue("procId=" + proc.getProcId() + " unexpected", + procIds.contains(proc.getProcId())); + } + loadedCount++; + } + } + + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + while (procIter.hasNext()) { + Procedure proc = procIter.next(); + LOG.trace("corrupted procId=" + proc.getProcId() + ": " + proc); + corruptedCount++; + } + } + } } \ No newline at end of file