diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/server/ThreadUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/server/ThreadUtils.java new file mode 100644 index 0000000..e3af819 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/server/ThreadUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.server; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.server.error.ErrorCheckable; + +/** + * Utility class for dealing with multiple-process computation, synchronization and error handling. + */ +public class ThreadUtils { + + private static final Log LOG = LogFactory.getLog(ThreadUtils.class); + + private ThreadUtils() { + // hidden ctor since its a utility class + } + + /** + * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to + * check for errors + * @param latch latch to wait on + * @param monitor monitor to check for errors while waiting + * @param wakeFrequency frequency to make up and check for + * @param latchType name of the latch, for logging + * @throws E type of error the monitor can throw, if the task fails + */ + public static void waitForLatch(CountDownLatch latch, + ErrorCheckable monitor, long wakeFrequency, String latchType) throws E { + boolean zero = false; + while (!zero) { + monitor.failOnError(); + try { + LOG.debug("Waiting for '" + latchType + "' latch. (sleep:" + wakeFrequency + " ms)"); + zero = latch.await(wakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.debug("Wait for latch interrupted, done:" + (latch.getCount() == 0)); + // reset the interrupt status on the thread + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/server/TwoPhaseCommit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/server/TwoPhaseCommit.java new file mode 100644 index 0000000..f15fdfd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/server/TwoPhaseCommit.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.server; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.server.error.ErrorMonitorable; + +/** + * General two-phase commit orchestrator. + *

+ * Designed to be be run asynchronously such you can have multiple parts of a two-phase commit all + * working together. For instance, a single server is writing multiple files (all part of the same + * operation) and these files need to be written in parallel, but they shouldn't be moved to the + * final directory if they all aren't written. In this case you could create multiple + * {@link TwoPhaseCommit}, one for each file, and elect one of the processes as the 'leader'. Each + * file is written on its own {@link Thread} in the prepare stage, counting down on the master's + * commit latch. When the leader finishes writing its file two possible situations can happen: + *

    + *
  1. All files have been written (commit latch count == 0)
  2. + *
  3. Not all files have been written (commit latch count > 0)
  4. + *
+ * In the first case, the leader is then free to perform a commit step and move the files to the + * commit location. In the latter case, the leader blocks, on its own thread, waiting for all the + * writers to finish, and only when they finish does the leader commit the files. + * @param Type of exception that any stage of the commit can throw + */ +public abstract class TwoPhaseCommit implements Callable, Runnable { + + private static final Log LOG = LogFactory.getLog(TwoPhaseCommit.class); + + private final CountDownLatch preparedLatch; + private final CountDownLatch allowCommitLatch; + private final CountDownLatch commitFinishLatch; + private final CountDownLatch completedLatch; + private final long wakeFrequency; + + private ErrorMonitorable errorMonitor; + + /** + * Constructor that has prepare, commit and finish latch counts of 1. + * @param monitor notified if there is an error in the commit + * @param wakeFrequency frequency to wake to check if there is an error between elements + */ + public TwoPhaseCommit(ErrorMonitorable monitor, long wakeFrequency) { + this(monitor, wakeFrequency, 1, 1, 1, 1); + } + + /** + * Create an operation with a specific size of the prepare, commit and finish latches. This is + * useful for more complex internal processing that performs multiple pieces of functionality (or + * coordinating with multiple external processes). + * @param monitor monitor if there is an external error causing the commit to fail or is notified + * if there is an error while running the operation. + * @param wakeFrequency frequency to check for operation errors in the handler while waiting on + * latches + * @param numPrepare number of counts on the prepare latch (obtained via + * {@link #getPreparedLatch()}) + * @param numAllowCommit number of counts on the allow commit latch (latch obtained via + * {@link #getAllowCommitLatch()}); + * @param numCommitted number of counts on the commit finished latch (latch obtained via + * {@link #getCommitFinishedLatch()}) + * @param numCompleted number of counts on the completed latch (latch obtained via + * {@link #getCompletedLatch()} + */ + public TwoPhaseCommit(ErrorMonitorable monitor, long wakeFrequency, int numPrepare, + int numAllowCommit, int numCommitted, int numCompleted) { + this.errorMonitor = monitor; + this.wakeFrequency = wakeFrequency; + this.preparedLatch = new CountDownLatch(numPrepare); + this.allowCommitLatch = new CountDownLatch(numAllowCommit); + this.commitFinishLatch = new CountDownLatch(numCommitted); + this.completedLatch = new CountDownLatch(numCompleted); + } + + /** + * When the prepare phase completes, the latch returned here is counted down. External/internal + * processes may also count-down this latch, so one must be aware of the count prepare latch size + * passed in the constructor + * @return latch counted down on complete of the prepare phase + */ + public CountDownLatch getPreparedLatch() { + return this.preparedLatch; + } + + /** + * The 'commit' phase waits for this latch to reach zero. Some process (internal or external) + * needs to count-down this latch to complete the operation. + * @return latch blocking the commit phase + */ + public CountDownLatch getAllowCommitLatch() { + return this.allowCommitLatch; + } + + public CountDownLatch getCommitFinishedLatch() { + return this.commitFinishLatch; + } + + public CountDownLatch getCompletedLatch() { + return this.completedLatch; + } + + /** + * Prepare a snapshot on the region. + * @throws SnapshotCreationException + */ + protected abstract void prepare() throws E; + + /** + * Commit the snapshot - indicator from master that the snapshot can complete locally. + * @throws SnapshotCreationException + */ + protected abstract void commit() throws E; + + /** + * Cleanup from a failure. + *

+ * This method is called only if one of the earlier phases threw an error or an error was found in + * the error monitor. + * @param e + */ + protected abstract void cleanup(Exception e); + + /** + * Cleanup any state that may have changed from {@link #prepare()} to {@link #commit()}. This is + * guaranteed to run under failure situations after {@link #prepare()} has been called. + */ + protected abstract void finish(); + + @Override + public Void call() throws E { + try { + // start by checking for error first + errorMonitor.failOnError(); + + // get ready for snapshot + LOG.debug("Starting 'prepare' stage of two phase commit"); + prepare(); + // notify that we are prepared to snapshot + LOG.debug("Prepare stage completed, counting down prepare."); + this.preparedLatch.countDown(); + + // wait for the indicator that we should commit + LOG.debug("Counted down prepare, waiting on commit latch to complete."); + waitForLatch(allowCommitLatch, "COMMIT"); + + // get the files for commit + errorMonitor.failOnError(); + LOG.debug("Commit latch released, running commit step."); + commit(); + + errorMonitor.failOnError(); + LOG.debug("Commit completed, counting down finsh latch."); + this.commitFinishLatch.countDown(); + } catch (Exception e) { + LOG.error("Two phase commit failed!", e); + this.cleanup(e); + } finally { + // reset the state of the region, as necessary + this.finish(); + this.completedLatch.countDown(); + } + return null; + } + + @SuppressWarnings("unchecked") + @Override + public void run() { + try { + this.call(); + } catch (Exception e) { + this.errorMonitor.receiveError("Failed because: " + e.getMessage(), (E) e); + } + } + + /** + * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to + * check for errors + * @param latch latch to wait on + * @param latchType String description of what the latch does + * @throws E if the task was failed while waiting + */ + public void waitForLatch(CountDownLatch latch, String latchType) throws E { + ThreadUtils.waitForLatch(latch, errorMonitor, wakeFrequency, latchType); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/server/TestTwoPhaseCommit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/server/TestTwoPhaseCommit.java new file mode 100644 index 0000000..8aae386 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/server/TestTwoPhaseCommit.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.server; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.server.error.ErrorMonitorable; +import org.apache.hadoop.hbase.server.error.impl.ErrorMonitor; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test TwoPhaseCommit behaves as expected + */ +@Category(SmallTests.class) +public class TestTwoPhaseCommit { + + private ErrorMonitorable monitor = new ErrorMonitor(); + private final long wakeFrequency = 10; + + @Test(timeout = 500) + public void testSingleLatchCount() throws Exception { + CheckableTwoPhaseCommit tpc = new CheckableTwoPhaseCommit(monitor, wakeFrequency); + // start the two phase commit + new Thread(tpc).start(); + + // wait for the commit phase + tpc.getPreparedLatch().await(); + assertCommitPhases(tpc, true, false, false, false); + + // count down the commit phase + tpc.getAllowCommitLatch().countDown(); + tpc.getCompletedLatch().await(); + assertCommitPhases(tpc, true, true, false, true); + } + + @Test + public void testMultipleLatchCounts() throws Exception{ + // now do a test with multiple counts for each latch + CheckableTwoPhaseCommit tpc = new CheckableTwoPhaseCommit(monitor, wakeFrequency, 2, 2, 2, 2); + // start the two phase commit + new Thread(tpc).start(); + + // wait for the prepare phase to finish + // this is fast, so shouldn't be too bad + while (!tpc.prepared) {} + assertCommitPhases(tpc, true, false, false, false); + + // count down the commit phase + tpc.getAllowCommitLatch().countDown(); + // shouldn't have committed yet + assertFalse("Commit latch shouldn't have completed yet", tpc.getAllowCommitLatch().await(0, TimeUnit.NANOSECONDS)); + assertCommitPhases(tpc, true, false, false, false); + + // preempt the finish & complete latch to we can wait on that + tpc.getCommitFinishedLatch().countDown(); + tpc.getCompletedLatch().countDown(); + assertEquals("Finsih latch counted down prematurely", 1, tpc.getCommitFinishedLatch().getCount()); + assertEquals("Completed latch counted down prematurely", 1, tpc.getCompletedLatch().getCount()); + + // this should cause commit + tpc.getAllowCommitLatch().countDown(); + assertTrue("Finished latch didn't count down correclty", tpc.getCommitFinishedLatch().getCount() >=1); + //wait for the commit phase to run + + tpc.getCommitFinishedLatch().await(); + // go back and check the prepared latch since this is the only time we can reason about it + assertEquals("Prepared latch didn't count down correctly", 1, tpc.getPreparedLatch().getCount()); + + //wait for the finish phase to run + tpc.getCompletedLatch().await(); + assertCommitPhases(tpc, true, true, false, true); + } + + @Test + public void testErrorPropagation() throws Exception { + // use own own monitor here to not munge the rest of the test + ErrorMonitorable monitor = new ErrorMonitor(); + CheckableTwoPhaseCommit tpc = new CheckableTwoPhaseCommit(monitor, wakeFrequency); + monitor.receiveError("test after prepare", new Exception()); + new Thread(tpc).start(); + + assertCommitPhases(tpc, false, false, false, false); + + // now test that we can put an error in before the commit phase runs + monitor = new ErrorMonitor(); + tpc = new CheckableTwoPhaseCommit(monitor, wakeFrequency); + new Thread(tpc).start(); + + tpc.getPreparedLatch().await(); + monitor.receiveError("test after prepare", new Exception()); + tpc.getCompletedLatch().await(); + assertCommitPhases(tpc, true, false, true, true); + } + + private void assertCommitPhases(CheckableTwoPhaseCommit commit, boolean prepared, + boolean committed, + boolean cleanup, boolean finish) { + assertEquals("Unexpected latch state", prepared, commit.prepared); + assertEquals("Unexpected latch state", committed, commit.commit); + assertEquals("Unexpected latch state", cleanup, commit.cleanup); + assertEquals("Unexpected latch state", finish, commit.finish); + } + + /** + * {@link TwoPhaseCommit} where the progress into each phase can be checked + */ + private class CheckableTwoPhaseCommit extends TwoPhaseCommit { + private boolean prepared = false; + private boolean commit = false; + private boolean cleanup = false; + private boolean finish = false; + + public CheckableTwoPhaseCommit(ErrorMonitorable monitor, long wakeFrequency) { + super(monitor, wakeFrequency); + } + + + public CheckableTwoPhaseCommit(ErrorMonitorable monitor, long wakeFrequency, int i, + int j, int k, int l) { + super(monitor, wakeFrequency, i, j, k, l); + } + + @Override + protected void prepare() throws Exception { + prepared = true; + } + + @Override + protected void commit() throws Exception { + commit = true; + } + + @Override + protected void cleanup(Exception e) { + cleanup = true; + } + + @Override + protected void finish() { + finish = true; + } + } +}