diff --git a/conf/log4j.properties b/conf/log4j.properties index a6a560ed9b..721b91d175 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -95,6 +95,7 @@ log4j.appender.asyncconsole.target=System.err log4j.logger.org.apache.zookeeper=INFO #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG log4j.logger.org.apache.hadoop.hbase=INFO +log4j.logger.org.apache.hadoop.hbase.procedure2=DEBUG log4j.logger.org.apache.hadoop.hbase.META=INFO # Make these two classes INFO-level. Make them DEBUG to see more zk debug. log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFO diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 0a05e6eb57..9fd8226b79 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -44,7 +44,7 @@ public class ProcedureWALFormatReader { private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class); // ============================================================================================== - // We read the WALs in reverse order. from the newest to the oldest. + // We read the WALs in reverse order from the newest to the oldest. // We have different entry types: // - INIT: Procedure submitted by the user (also known as 'root procedure') // - INSERT: Children added to the procedure :[, ...] @@ -53,7 +53,8 @@ public class ProcedureWALFormatReader { // // In the WAL we can find multiple times the same procedure as UPDATE or INSERT. // We read the WAL from top to bottom, so every time we find an entry of the - // same procedure, that will be the "latest" update. + // same procedure, that will be the "latest" update (Caveat: with multiple threads writing + // the store, this assumption does not hold). // // We keep two in-memory maps: // - localProcedureMap: is the map containing the entries in the WAL we are processing @@ -65,7 +66,7 @@ public class ProcedureWALFormatReader { // // The WAL is append-only so the last procedure in the WAL is the one that // was in execution at the time we crashed/closed the server. - // given that, the procedure replay order can be inferred by the WAL order. + // Given that, the procedure replay order can be inferred by the WAL order. // // Example: // WAL-2: [A, B, A, C, D] @@ -78,7 +79,7 @@ public class ProcedureWALFormatReader { // WAL-2 localProcedureMap.replayOrder is [D, C, A, B] // WAL-1 localProcedureMap.replayOrder is [F, G] // - // each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap' + // Each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap' // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order. // // Fast Start: INIT/INSERT record and StackIDs @@ -149,6 +150,8 @@ public class ProcedureWALFormatReader { LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log); break; } + LOG.info("ENTRY " + entry); + LOG.info(entry); switch (entry.getType()) { case PROCEDURE_WAL_INIT: readInitEntry(entry); @@ -634,7 +637,7 @@ public class ProcedureWALFormatReader { } /* - * (see the comprehensive explaination in the beginning of the file) + * (see the comprehensive explanation in the beginning of the file) * A Procedure is ready when parent and children are ready. * "ready" means that we all the information that we need in-memory. * @@ -651,9 +654,9 @@ public class ProcedureWALFormatReader { * - easy case, the parent is missing from the global map * - more complex case we look at the Stack IDs. * - * The Stack-IDs are added to the procedure order as incremental index + * The Stack-IDs are added to the procedure order as an incremental index * tracking how many times that procedure was executed, which is equivalent - * at the number of times we wrote the procedure to the WAL. + * to the number of times we wrote the procedure to the WAL. * In the example above: * wal-2: B has stackId = [1, 2] * wal-1: B has stackId = [1] @@ -663,7 +666,7 @@ public class ProcedureWALFormatReader { * we notice that there is a gap in the stackIds of B, so something was * executed before. * To identify when a Procedure is ready we do the sum of the stackIds of - * the procedure and the parent. if the stackIdSum is equals to the + * the procedure and the parent. if the stackIdSum is equal to the * sum of {1..maxStackId} then everything we need is available. * * Example-2 @@ -695,6 +698,7 @@ public class ProcedureWALFormatReader { int stackId = 1 + rootEntry.proto.getStackId(i); maxStackId = Math.max(maxStackId, stackId); stackIdSum += stackId; + LOG.info("stackId=" + stackId + " stackIdSum=" + stackIdSum + " maxStackid=" + maxStackId + " " + rootEntry); } for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { @@ -702,8 +706,11 @@ public class ProcedureWALFormatReader { int stackId = 1 + p.proto.getStackId(i); maxStackId = Math.max(maxStackId, stackId); stackIdSum += stackId; + LOG.info("stackId=" + stackId + " stackIdSum=" + stackIdSum + " maxStackid=" + maxStackId + " " + p); } } + // The cmpStackIdSum is this formula for finding the sum of a series of numbers: + // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2); if (cmpStackIdSum == stackIdSum) { rootEntry.ready = true; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java index 289987be8b..b58546ce09 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java @@ -18,16 +18,21 @@ package org.apache.hadoop.hbase.procedure2; +import java.io.IOException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Threads; @@ -186,5 +191,24 @@ public class TestProcedureExecutor { } } - private class TestProcEnv { } -} + private static class TestProcEnv {} + + public static void main (String [] args) throws IOException { + HBaseCommonTestingUtility htu = new HBaseCommonTestingUtility(); + FileSystem fs = FileSystem.get(htu.getConfiguration()); + htu.getConfiguration(). + setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, false); + Path logDir = new Path("/Users/stack/Downloads/archive2/"/*args[0]*/); + ProcedureStore procStore = + ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + ProcedureExecutor procExecutor = + new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); + try { + procStore.start(1); + procExecutor.start(1, true); + } finally { + procExecutor.stop(); + procStore.stop(false); + } + } +} \ No newline at end of file