diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java new file mode 100644 index 0000000..002838f --- /dev/null +++ hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java @@ -0,0 +1,204 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.text.MessageFormat; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +/** + * A class that provides a standard waitFor pattern + * See details at https://issues.apache.org/jira/browse/HBASE-7384 + */ +@InterfaceAudience.Private +public final class Waiter { + + private static final Log LOG = LogFactory.getLog(Waiter.class); + + /** + * System property name whose value is a scale factor to increase time out values dynamically used + * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)}, + * {@link #waitFor(Configuration, long, long, Predicate)}, and + * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method + *

+ * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout + */ + public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio"; + + private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1; + + private static float waitForRatio = -1; + + private Waiter() { + } + + /** + * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)}, + * {@link #waitFor(Configuration, long, Predicate)}, + * {@link #waitFor(Configuration, long, long, Predicate)} and + * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class + *

+ * This is useful to dynamically adjust max time out values when same test cases run in different + * test machine settings without recompiling & re-deploying code. + *

+ * The value is obtained from the Java System property or configuration setting + * hbase.test.wait.for.ratio which defaults to 1. + * @param conf the configuration + * @return the 'wait for ratio' for the current test run. + */ + public static float getWaitForRatio(Configuration conf) { + if (waitForRatio < 0) { + // System property takes precedence over configuration setting + if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) { + waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO)); + } else { + waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT); + } + } + return waitForRatio; + } + + /** + * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and + * {@link Waiter#waitFor(Configuration, long, Predicate)} and + * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate) methods. + */ + @InterfaceAudience.Private + public static interface Predicate { + + /** + * Perform a predicate evaluation. + * @return the boolean result of the evaluation. + * @throws Exception thrown if the predicate evaluation could not evaluate. + */ + public boolean evaluate() throws E; + + } + + /** + * Makes the current thread sleep for the duration equal to the specified time in milliseconds + * multiplied by the {@link #getWaitForRatio(Configuration)}. + * @param conf the configuration + * @param time the number of milliseconds to sleep. + */ + public static void sleep(Configuration conf, long time) { + try { + Thread.sleep((long) (getWaitForRatio(conf) * time)); + } catch (InterruptedException ex) { + LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString())); + } + } + + /** + * Waits up to the duration equal to the specified timeout multiplied by the + * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become + * true, failing the test if the timeout is reached and the Predicate is still + * false. + *

+ * @param conf the configuration + * @param timeout the timeout in milliseconds to wait for the predicate. + * @param predicate the predicate to evaluate. + * @return the effective wait, in milli-seconds until the predicate becomes true or + * wait is interrupted otherwise -1 when times out + */ + public static long waitFor(Configuration conf, long timeout, + Predicate predicate) throws E { + return waitFor(conf, timeout, 100, true, predicate); + } + + /** + * Waits up to the duration equal to the specified timeout multiplied by the + * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become + * true, failing the test if the timeout is reached and the Predicate is still + * false. + *

+ * @param conf the configuration + * @param timeout the max timeout in milliseconds to wait for the predicate. + * @param interval the interval in milliseconds to evaluate predicate. + * @param predicate the predicate to evaluate. + * @return the effective wait, in milli-seconds until the predicate becomes true or + * wait is interrupted otherwise -1 when times out + */ + public static long waitFor(Configuration conf, long timeout, long interval, + Predicate predicate) throws E { + return waitFor(conf, timeout, interval, true, predicate); + } + + /** + * Waits up to the duration equal to the specified timeout multiplied by the + * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become + * true, failing the test if the timeout is reached, the Predicate is still + * false and failIfTimeout is set as true. + *

+ * @param conf the configuration + * @param timeout the timeout in milliseconds to wait for the predicate. + * @param interval the interval in milliseconds to evaluate predicate. + * @param failIfTimeout indicates if should fail current test case when times out. + * @param predicate the predicate to evaluate. + * @return the effective wait, in milli-seconds until the predicate becomes true or + * wait is interrupted otherwise -1 when times out + */ + public static long waitFor(Configuration conf, long timeout, long interval, + boolean failIfTimeout, Predicate predicate) throws E { + long started = System.currentTimeMillis(); + long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout); + long mustEnd = started + adjustedTimeout; + long remainderWait = 0; + long sleepInterval = 0; + Boolean eval = false; + Boolean interrupted = false; + + try { + LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])", + adjustedTimeout, getWaitForRatio(conf))); + while (!(eval = predicate.evaluate()) + && (remainderWait = mustEnd - System.currentTimeMillis()) > 0) { + try { + // handle tail case when remainder wait is less than one interval + sleepInterval = (remainderWait > interval) ? interval : remainderWait; + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + eval = predicate.evaluate(); + interrupted = true; + break; + } + } + if (!eval) { + if (interrupted) { + LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec", + System.currentTimeMillis() - started)); + } else if (failIfTimeout) { + Assert.fail(MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout)); + } else { + LOG.warn(MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout)); + } + } + return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 023ca3c..ee658a0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -52,6 +52,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -2334,4 +2335,28 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public void setFileSystemURI(String fsURI) { FS_URI = fsURI; } + + /** + * Wrapper method for {@link Waiter#waitFor(Configuration, long, Predicate)}. + */ + public long waitFor(long timeout, Predicate predicate) + throws E { + return Waiter.waitFor(this.conf, timeout, predicate); + } + + /** + * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, Predicate)}. + */ + public long waitFor(long timeout, long interval, Predicate predicate) + throws E { + return Waiter.waitFor(this.conf, timeout, interval, predicate); + } + + /** + * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)}. + */ + public long waitFor(long timeout, long interval, + boolean failIfTimeout, Predicate predicate) throws E { + return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate); + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index ccc0ab3..61920da 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.regionserver.TestMasterAddressManager.NodeCreationListener; @@ -142,7 +143,8 @@ public class TestSplitLogManager { public long eval(); } - private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems) { + private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems) + throws Exception { Expr e = new Expr() { public long eval() { return ctr.get(); @@ -152,23 +154,17 @@ public class TestSplitLogManager { return; } - private void waitForCounter(Expr e, long oldval, long newval, - long timems) { - long curt = System.currentTimeMillis(); - long endt = curt + timems; - while (curt < endt) { - if (e.eval() == oldval) { - try { - Thread.sleep(10); - } catch (InterruptedException eintr) { + private void waitForCounter(final Expr e, final long oldval, long newval, long timems) + throws Exception { + + TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (e.eval() != oldval); } - curt = System.currentTimeMillis(); - } else { - assertEquals(newval, e.eval()); - return; - } - } - assertTrue(false); + }); + + assertEquals(newval, e.eval()); } private String submitTaskAndWait(TaskBatch batch, String name) @@ -550,4 +546,3 @@ public class TestSplitLogManager { } } - diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 41a9168..216d6f5 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -56,29 +57,34 @@ public class TestSplitLogWorker { private ZooKeeperWatcher zkw; private SplitLogWorker slw; - private void waitForCounter(AtomicLong ctr, long oldval, long newval, - long timems) { + private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) + throws Exception { assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval, waitForCounterBoolean(ctr, oldval, newval, timems)); } - private boolean waitForCounterBoolean(AtomicLong ctr, long oldval, long newval, - long timems) { - long curt = System.currentTimeMillis(); - long endt = curt + timems; - while (curt < endt) { - if (ctr.get() == oldval) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - curt = System.currentTimeMillis(); - } else { - assertEquals(newval, ctr.get()); - return true; + private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval, + long timems) throws Exception { + + return waitForCounterBoolean(ctr, oldval, newval, timems, true); + } + + private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval, + long timems, boolean failIfTimeout) throws Exception { + + long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout, + new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (ctr.get() != oldval); } + }); + + if( timeWaited > 0) { + // when not timed out + assertEquals(newval, ctr.get()); } - return false; + return true; } @Before @@ -173,7 +179,7 @@ public class TestSplitLogWorker { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000); // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if // not it, that we fell through to the next counter in line and it was set. - assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000) || + assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000, false) || SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); SplitLogTask slt = SplitLogTask.parseFrom(bytes);