From 1dafcdf6de767acbd41da2921084893922efdb34 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 24 Jul 2018 20:54:41 -0700 Subject: [PATCH] TestWALProcedureExecutore order-checking test that doesn't find probs. --- .../hadoop/hbase/procedure2/ProcedureExecutor.java | 6 +- .../procedure2/store/wal/ProcedureWALFile.java | 1 + .../store/wal/ProcedureWALPrettyPrinter.java | 3 +- .../procedure2/store/wal/WALProcedureStore.java | 1 + .../store/wal/TestWALProcedureStore.java | 2 +- .../org/apache/hadoop/hbase/master/HMaster.java | 2 +- .../hbase/master/assignment/AssignProcedure.java | 2 + .../hbase/master/assignment/RegionStateStore.java | 3 +- .../assignment/RegionTransitionProcedure.java | 6 +- .../store/wal/TestWALProcedureExecutor.java | 207 +++++++++++++++++++++ 10 files changed, 223 insertions(+), 10 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureExecutor.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index ef1ce5953f..6bb43086f5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1456,7 +1456,7 @@ public class ProcedureExecutor { // to be working on the same procedure concurrently (locking in procedures is NOT about // concurrency but about tying an entity to a procedure; i.e. a region to a particular // procedure instance). This can make for issues if both threads are changing state. - // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); + // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()) // in RegionTransitionProcedure#reportTransition for example of Procedure putting // itself back on the scheduler making it possible for two threads running against // the one Procedure. Might be ok if they are both doing different, idempotent sections. @@ -1743,9 +1743,8 @@ public class ProcedureExecutor { @Override public void run() { - long lastUpdate = EnvironmentEdgeManager.currentTime(); try { - while (isRunning() && keepAlive(lastUpdate)) { + while (isRunning() && keepAlive(EnvironmentEdgeManager.currentTime())) { Procedure proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (proc == null) { continue; @@ -1767,7 +1766,6 @@ public class ProcedureExecutor { LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, activeCount); this.activeProcedure = null; - lastUpdate = EnvironmentEdgeManager.currentTime(); executionStartTime.set(Long.MAX_VALUE); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 6226350a47..78bf83202f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu /** * Describes a WAL File + * @see ProcedureWALPrettyPrinter */ @InterfaceAudience.Private @InterfaceStability.Evolving diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java index c692365b03..15ff26e6a4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java @@ -46,7 +46,8 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; import org.apache.hbase.thirdparty.org.apache.commons.cli.PosixParser; /** - * ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file + * ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file. + * @see #main(String[]) */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @InterfaceStability.Evolving diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 99001f7ea9..06dbbe32e4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -671,6 +671,7 @@ public class WALProcedureStore extends ProcedureStoreBase { long logId = -1; lock.lock(); + LOG.info("hold={}, len={}", lock.getHoldCount(), lock.getQueueLength()); try { // Wait for the sync to be completed while (true) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 64cf211161..4af222f959 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -81,7 +81,7 @@ public class TestWALProcedureStore { private Path testDir; private Path logDir; - private void setupConfig(final Configuration conf) { + static void setupConfig(final Configuration conf) { conf.setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, true); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 7ad77652db..25d834a535 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1231,7 +1231,7 @@ public class HMaster extends HRegionServer implements MasterServices { int cpus = Runtime.getRuntime().availableProcessors(); final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max( - (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); + (cpus > 4? cpus / 4: 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); final boolean abortOnCorruption = conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java index 86f0a3ff59..4bd6b51019 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java @@ -218,6 +218,8 @@ public class AssignProcedure extends RegionTransitionProcedure { LOG.info("Starting " + this + "; " + regionNode.toShortString() + "; forceNewPlan=" + this.forceNewPlan + ", retain=" + retain); + // Be aware that this next call suspends this Procedure! + // The RegionTransitionProcedure will throw the Suspended Exception! env.getAssignmentManager().queueAssign(regionNode); return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index d85fea757d..e33152e820 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -119,8 +119,7 @@ public class RegionStateStore { final long openSeqNum = hrl.getSeqNum(); // TODO: move under trace, now is visible for debugging - LOG.info( - "Load hbase:meta entry region={}, regionState={}, lastHost={}, " + + LOG.info("Load hbase:meta entry region={}, regionState={}, lastHost={}, " + "regionLocation={}, openSeqNum={}", regionInfo.getEncodedName(), state, lastHost, regionLocation, openSeqNum); visitor.visitRegionState(result, regionInfo, state, regionLocation, lastHost, openSeqNum); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index 6eaf13ca4d..e095cd45ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -335,7 +335,11 @@ public abstract class RegionTransitionProcedure } transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH; if (regionNode.getProcedureEvent().suspendIfNotReady(this)) { - // Why this suspend? Because we want to ensure Store happens before proceed? + // Why this suspend? Because AssignProcedure at the end of the startTransition + // suspends itself after adding this assign to the dispatch assign queue. It + // does this so the AM/Balancer can drive the Assigns rather than this + // procedure. It will wake up the Procedure when it has enough to run + // an Assign. A background assign thread picks up the queued Assigns. throw new ProcedureSuspendedException(); } break; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureExecutor.java new file mode 100644 index 0000000000..a938770637 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureExecutor.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test ProcedureExecutor over WALProcedureStore. + * This suite was test to check that Procedure steps ended up properly ordered + * inside procedure store. + */ +@Category({MasterTests.class, SmallTests.class}) +public class TestWALProcedureExecutor { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALProcedureExecutor.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureExecutor.class); + + private TestStateMachineEnv procEnv; + private ProcedureExecutor procExecutor; + + private HBaseTestingUtility htu; + private FileSystem fs; + private WALProcedureStore procStore; + private static final int SLOTS = 16; + + @Before + public void before() throws Exception { + this.htu = new HBaseTestingUtility(); + this.htu.startMiniDFSCluster(3); + this.fs = this.htu.getTestFileSystem(); + TestWALProcedureStore.setupConfig(htu.getConfiguration()); + Path logDir = new Path(this.htu.getDataTestDirOnTestFS(), "proc-logs"); + this.procEnv = new TestStateMachineEnv(); + this.procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); + this.procStore.start(SLOTS); + this.procStore.recoverLease(); + this.procStore.load(new ProcedureTestingUtility.LoadCounter()); + } + + @After + public void after() throws IOException { + this.procStore.stop(false); + this.htu.shutdownMiniDFSCluster(); + } + + @Test + public void testOrdering() throws Exception { + final Configuration conf = new Configuration(htu.getConfiguration()); + // Do more workers than slots to bring on contention over slots. + final int threadCount = SLOTS * 10; + this.procExecutor = createNewExecutor(conf, threadCount); + long procId = -1; + for (int i = 0; i < 1000; i++) { + procId = this.procExecutor.submitProcedure(new TestStateMachineProcedure()); + } + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + assertEquals(true, procExecutor.isFinished(procId)); + ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); + List files = this.procStore.getActiveLogs(); + this.procStore.recoverLease(); + for (ProcedureWALFile file: files) { + ProcedureWALPrettyPrinter.main(new String[]{"-f", file.toString()}); + } + this.procStore.stop(false); + } + + private ProcedureExecutor createNewExecutor(final Configuration conf, final int numThreads) throws Exception { + ProcedureExecutor pe = new ProcedureExecutor(conf, procEnv, procStore); + pe.start(numThreads, true); + return pe; + } + + public static class TestStateMachineEnv { + AtomicInteger execCount = new AtomicInteger(0); + AtomicInteger rollbackCount = new AtomicInteger(0); + } + + public enum TestStateMachineState { + STEP_0, STEP_1, STEP_2, STEP_3, STEP_4, STEP_5, STEP_6, STEP_7 + }; + + /** + * Make a Procedure with lots of steps and that suspends itself at the end + * of each step so plenty of opportunity for ProcedureExecutor Worker threads + * to interweave on a single Procedure (Suspending makes us skip out of + * the tight procedure executor loop to give other worker threads a chance + * to run). + */ + public static class TestStateMachineProcedure extends StateMachineProcedure { + private final Object eventObject = new Object(); + private final ProcedureEvent event; + + + public TestStateMachineProcedure() { + this.event = new ProcedureEvent(this.eventObject); + setState(ProcedureProtos.ProcedureState.INITIALIZING); + } + + @Override + protected Flow executeFromState(TestStateMachineEnv env, + TestStateMachineState testStateMachineState) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + env.execCount.incrementAndGet(); + LOG.info("state={}, {}", testStateMachineState, this); + switch (testStateMachineState) { + case STEP_0: + setNextState(TestStateMachineState.STEP_1); + break; + case STEP_1: + setNextState(TestStateMachineState.STEP_2); + break; + case STEP_2: + setNextState(TestStateMachineState.STEP_3); + break; + case STEP_3: + setNextState(TestStateMachineState.STEP_4); + break; + case STEP_4: + setNextState(TestStateMachineState.STEP_5); + break; + case STEP_5: + setNextState(TestStateMachineState.STEP_6); + break; + case STEP_6: + setNextState(TestStateMachineState.STEP_7); + break; + case STEP_7: + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected boolean isYieldAfterExecutionStep(TestStateMachineEnv env) { + // Yield after each step so can get a mix of running threads. + return true; + } + + @Override + protected void rollbackState(TestStateMachineEnv env, + TestStateMachineState testStateMachineState) throws IOException, InterruptedException { + LOG.info("ROLLBACK " + testStateMachineState + " " + this); + env.rollbackCount.incrementAndGet(); + } + + @Override + protected TestStateMachineState getState(int stateId) { + return TestStateMachineState.values()[stateId]; + } + + @Override + protected int getStateId(TestStateMachineState testStateMachineState) { + return testStateMachineState.ordinal(); + } + + @Override + protected TestStateMachineState getInitialState() { + return TestStateMachineState.STEP_0; + } + } +} -- 2.16.3