diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java new file mode 100644 index 0000000..cdbcbbb --- /dev/null +++ hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java @@ -0,0 +1,173 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +import java.text.MessageFormat; + +/** + * A class that provides a standard waitFor implementation pattern + * See details at https://issues.apache.org/jira/browse/HBASE-7384 + */ +public abstract class Waiter { + + private static final Log LOG = LogFactory.getLog(Bytes.class); + + public static final String TEST_WAITFOR_RATIO_PROP = "test.waitfor.ratio"; + + private static float WAITFOR_RATIO_DEFAULT = Float.parseFloat(System.getProperty( + TEST_WAITFOR_RATIO_PROP, "1")); + + private static float waitForRatio = WAITFOR_RATIO_DEFAULT; + + /** + * Sets the 'wait for ratio' used in the {@link #sleep(long)}, + * {@link #waitFor(long, long, Predicate)} and {@link #waitFor(long, long, boolean, Predicate)} method + * for the current test class. + *

+ * This is useful when running tests in slow machines for tests that are time sensitive. + * @param ratio the 'wait for ratio' to set. + */ + public static void setWaitForRatio(float ratio) { + waitForRatio = ratio; + } + + /* + * Returns the 'wait for ratio' used in the {@link #sleep(long)}, {@link #waitFor(int, Predicate)} + * and {@link #waitFor(int, boolean, Predicate)} methods for the current test class.

This is + * useful when running tests in slow machines for tests that are time sensitive.

The default + * value is obtained from the Java System property test.wait.for.ratio which defaults + * to 1. + * @return the 'wait for ratio' for the current test class. + */ + public static float getWaitForRatio() { + return waitForRatio; + } + + /** + * A predicate 'closure' used by the {@link #waitFor(long, Predicate)} and + * {@link #waitFor(long, long, Predicate)} and {@link #waitFor(long, long, boolean, Predicate)} + * methods. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public static interface Predicate { + + /** + * Perform a predicate evaluation. + * @return the boolean result of the evaluation. + * @throws Exception thrown if the predicate evaluation could not evaluate. + */ + public boolean evaluate() throws Exception; + + } + + /** + * Makes the current thread sleep for the specified number of milliseconds. + *

+ * The sleep time is multiplied by the {@link #getWaitForRatio()}. + * @param time the number of milliseconds to sleep. + */ + public static void sleep(long time) { + try { + Thread.sleep((long) (getWaitForRatio() * time)); + } catch (InterruptedException ex) { + LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString())); + } + } + + /** + * Waits up to the specified timeout for the given {@link Predicate} to become true, + * failing the test if the timeout is reached and the Predicate is still false. + *

+ * The timeout time is multiplied by the {@link #getWaitForRatio()}. + * @param timeout the timeout in milliseconds to wait for the predicate. + * @param predicate the predicate ot evaluate. + * @return the effective wait, in milli-seconds until the predicate become true. + */ + public static long waitFor(long timeout, Predicate predicate) { + return waitFor(timeout, 100, true, predicate); + } + + /** + * Waits up to the specified timeout for the given {@link Predicate} to become true, + * failing the test if the timeout is reached and the Predicate is still false. + *

+ * The timeout time is multiplied by the {@link #getWaitForRatio()}. + * @param timeout the max timeout in milliseconds to wait for the predicate. + * @param interval the time interval in milliseconds to evaluate predicate. + * @param predicate the predicate ot evaluate. + * @return the effective wait, in milli-seconds until the predicate become true. + */ + public static long waitFor(long timeout, long interval, Predicate predicate) { + return waitFor(timeout, interval, true, predicate); + } + + /** + * Waits up to the specified timeout for the given {@link Predicate} to become true. + *

+ * The timeout time is multiplied by the {@link #getWaitForRatio()}. + * @param timeout the timeout in milliseconds to wait for the predicate. + * @param interval the time interval in milliseconds to evaluate predicate. + * @param failIfTimeout indicates if the test should be failed if the predicate times out. + * @param predicate the predicate ot evaluate. + * @return the effective wait, in milli-seconds until the predicate become true or + * -1 if the predicate did not evaluate to true. + */ + public static long waitFor(long timeout, long interval, boolean failIfTimeout, Predicate predicate) { + long started = System.currentTimeMillis(); + long adjustedTimeout = (long)(getWaitForRatio() * timeout); + long mustEnd = started + adjustedTimeout; + Boolean eval = false; + Boolean interrupted = false; + try { + LOG.info(MessageFormat.format("Waiting up to [{0}] msec", adjustedTimeout)); + while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + eval = predicate.evaluate(); + interrupted = true; + break; + } + } + if (!eval) { + if (interrupted) { + LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec", + System.currentTimeMillis() - started)); + } else if (failIfTimeout) { + Assert.fail(MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout)); + } else { + LOG.warn(MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout)); + } + } + return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 7f6b50c..8c52082 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -18,39 +18,12 @@ */ package org.apache.hadoop.hbase.master; -import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.SplitLogTask; -import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.regionserver.TestMasterAddressManager.NodeCreationListener; @@ -69,6 +42,13 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hbase.SplitLogCounters.*; +import static org.junit.Assert.*; + @Category(MediumTests.class) public class TestSplitLogManager { private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); @@ -152,23 +132,17 @@ public class TestSplitLogManager { return; } - private void waitForCounter(Expr e, long oldval, long newval, + private void waitForCounter(final Expr e, final long oldval, long newval, long timems) { - long curt = System.currentTimeMillis(); - long endt = curt + timems; - while (curt < endt) { - if (e.eval() == oldval) { - try { - Thread.sleep(10); - } catch (InterruptedException eintr) { + + Waiter.waitFor(timems, 10, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (e.eval() != oldval); } - curt = System.currentTimeMillis(); - } else { - assertEquals(newval, e.eval()); - return; - } - } - assertTrue(false); + }); + + assertEquals(newval, e.eval()); } private String submitTaskAndWait(TaskBatch batch, String name) diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 41a9168..fdb3010 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -18,19 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.SplitLogCounters; -import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -44,6 +34,12 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + @Category(MediumTests.class) public class TestSplitLogWorker { private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class); @@ -62,23 +58,27 @@ public class TestSplitLogWorker { waitForCounterBoolean(ctr, oldval, newval, timems)); } - private boolean waitForCounterBoolean(AtomicLong ctr, long oldval, long newval, + private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval, long timems) { - long curt = System.currentTimeMillis(); - long endt = curt + timems; - while (curt < endt) { - if (ctr.get() == oldval) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - curt = System.currentTimeMillis(); - } else { - assertEquals(newval, ctr.get()); - return true; + + return waitForCounterBoolean(ctr, oldval, newval, timems, true); + } + + private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval, + long timems, boolean failIfTimeout) { + + long waitedTime = Waiter.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (ctr.get() != oldval); } + }); + + if( waitedTime > 0) { + // when not timed out + assertEquals(newval, ctr.get()); } - return false; + return true; } @Before @@ -173,7 +173,7 @@ public class TestSplitLogWorker { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000); // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if // not it, that we fell through to the next counter in line and it was set. - assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000) || + assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); SplitLogTask slt = SplitLogTask.parseFrom(bytes);