diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java new file mode 100644 index 0000000..cdbcbbb --- /dev/null +++ hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java @@ -0,0 +1,173 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +import java.text.MessageFormat; + +/** + * A class that provides a standard waitFor implementation pattern + * See details at https://issues.apache.org/jira/browse/HBASE-7384 + */ +public abstract class Waiter { + + private static final Log LOG = LogFactory.getLog(Bytes.class); + + public static final String TEST_WAITFOR_RATIO_PROP = "test.waitfor.ratio"; + + private static float WAITFOR_RATIO_DEFAULT = Float.parseFloat(System.getProperty( + TEST_WAITFOR_RATIO_PROP, "1")); + + private static float waitForRatio = WAITFOR_RATIO_DEFAULT; + + /** + * Sets the 'wait for ratio' used in the {@link #sleep(long)}, + * {@link #waitFor(long, long, Predicate)} and {@link #waitFor(long, long, boolean, Predicate)} method + * for the current test class. + *
+ * This is useful when running tests in slow machines for tests that are time sensitive. + * @param ratio the 'wait for ratio' to set. + */ + public static void setWaitForRatio(float ratio) { + waitForRatio = ratio; + } + + /* + * Returns the 'wait for ratio' used in the {@link #sleep(long)}, {@link #waitFor(int, Predicate)} + * and {@link #waitFor(int, boolean, Predicate)} methods for the current test class. This is + * useful when running tests in slow machines for tests that are time sensitive. The default + * value is obtained from the Java System propertytest.wait.for.ratio which defaults
+ * to 1.
+ * @return the 'wait for ratio' for the current test class.
+ */
+ public static float getWaitForRatio() {
+ return waitForRatio;
+ }
+
+ /**
+ * A predicate 'closure' used by the {@link #waitFor(long, Predicate)} and
+ * {@link #waitFor(long, long, Predicate)} and {@link #waitFor(long, long, boolean, Predicate)}
+ * methods.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static interface Predicate {
+
+ /**
+ * Perform a predicate evaluation.
+ * @return the boolean result of the evaluation.
+ * @throws Exception thrown if the predicate evaluation could not evaluate.
+ */
+ public boolean evaluate() throws Exception;
+
+ }
+
+ /**
+ * Makes the current thread sleep for the specified number of milliseconds.
+ *
+ * The sleep time is multiplied by the {@link #getWaitForRatio()}.
+ * @param time the number of milliseconds to sleep.
+ */
+ public static void sleep(long time) {
+ try {
+ Thread.sleep((long) (getWaitForRatio() * time));
+ } catch (InterruptedException ex) {
+ LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString()));
+ }
+ }
+
+ /**
+ * Waits up to the specified timeout for the given {@link Predicate} to become true,
+ * failing the test if the timeout is reached and the Predicate is still false.
+ *
+ * The timeout time is multiplied by the {@link #getWaitForRatio()}.
+ * @param timeout the timeout in milliseconds to wait for the predicate.
+ * @param predicate the predicate ot evaluate.
+ * @return the effective wait, in milli-seconds until the predicate become true.
+ */
+ public static long waitFor(long timeout, Predicate predicate) {
+ return waitFor(timeout, 100, true, predicate);
+ }
+
+ /**
+ * Waits up to the specified timeout for the given {@link Predicate} to become true,
+ * failing the test if the timeout is reached and the Predicate is still false.
+ *
+ * The timeout time is multiplied by the {@link #getWaitForRatio()}.
+ * @param timeout the max timeout in milliseconds to wait for the predicate.
+ * @param interval the time interval in milliseconds to evaluate predicate.
+ * @param predicate the predicate ot evaluate.
+ * @return the effective wait, in milli-seconds until the predicate become true.
+ */
+ public static long waitFor(long timeout, long interval, Predicate predicate) {
+ return waitFor(timeout, interval, true, predicate);
+ }
+
+ /**
+ * Waits up to the specified timeout for the given {@link Predicate} to become true.
+ *
+ * The timeout time is multiplied by the {@link #getWaitForRatio()}.
+ * @param timeout the timeout in milliseconds to wait for the predicate.
+ * @param interval the time interval in milliseconds to evaluate predicate.
+ * @param failIfTimeout indicates if the test should be failed if the predicate times out.
+ * @param predicate the predicate ot evaluate.
+ * @return the effective wait, in milli-seconds until the predicate become true or
+ * -1 if the predicate did not evaluate to true.
+ */
+ public static long waitFor(long timeout, long interval, boolean failIfTimeout, Predicate predicate) {
+ long started = System.currentTimeMillis();
+ long adjustedTimeout = (long)(getWaitForRatio() * timeout);
+ long mustEnd = started + adjustedTimeout;
+ Boolean eval = false;
+ Boolean interrupted = false;
+ try {
+ LOG.info(MessageFormat.format("Waiting up to [{0}] msec", adjustedTimeout));
+ while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ eval = predicate.evaluate();
+ interrupted = true;
+ break;
+ }
+ }
+ if (!eval) {
+ if (interrupted) {
+ LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec",
+ System.currentTimeMillis() - started));
+ } else if (failIfTimeout) {
+ Assert.fail(MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout));
+ } else {
+ LOG.warn(MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout));
+ }
+ }
+ return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index 7f6b50c..8c52082 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -18,39 +18,12 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SplitLogTask;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressManager.NodeCreationListener;
@@ -69,6 +42,13 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hbase.SplitLogCounters.*;
+import static org.junit.Assert.*;
+
@Category(MediumTests.class)
public class TestSplitLogManager {
private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
@@ -152,23 +132,17 @@ public class TestSplitLogManager {
return;
}
- private void waitForCounter(Expr e, long oldval, long newval,
+ private void waitForCounter(final Expr e, final long oldval, long newval,
long timems) {
- long curt = System.currentTimeMillis();
- long endt = curt + timems;
- while (curt < endt) {
- if (e.eval() == oldval) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException eintr) {
+
+ Waiter.waitFor(timems, 10, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (e.eval() != oldval);
}
- curt = System.currentTimeMillis();
- } else {
- assertEquals(newval, e.eval());
- return;
- }
- }
- assertTrue(false);
+ });
+
+ assertEquals(newval, e.eval());
}
private String submitTaskAndWait(TaskBatch batch, String name)
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 41a9168..fdb3010 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -18,19 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SplitLogCounters;
-import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -44,6 +34,12 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
@Category(MediumTests.class)
public class TestSplitLogWorker {
private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
@@ -62,23 +58,27 @@ public class TestSplitLogWorker {
waitForCounterBoolean(ctr, oldval, newval, timems));
}
- private boolean waitForCounterBoolean(AtomicLong ctr, long oldval, long newval,
+ private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
long timems) {
- long curt = System.currentTimeMillis();
- long endt = curt + timems;
- while (curt < endt) {
- if (ctr.get() == oldval) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- }
- curt = System.currentTimeMillis();
- } else {
- assertEquals(newval, ctr.get());
- return true;
+
+ return waitForCounterBoolean(ctr, oldval, newval, timems, true);
+ }
+
+ private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
+ long timems, boolean failIfTimeout) {
+
+ long waitedTime = Waiter.waitFor(timems, 10, failIfTimeout, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return (ctr.get() != oldval);
}
+ });
+
+ if( waitedTime > 0) {
+ // when not timed out
+ assertEquals(newval, ctr.get());
}
- return false;
+ return true;
}
@Before
@@ -173,7 +173,7 @@ public class TestSplitLogWorker {
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
// Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
// not it, that we fell through to the next counter in line and it was set.
- assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000) ||
+ assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000, false) ||
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
SplitLogTask slt = SplitLogTask.parseFrom(bytes);