diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java new file mode 100644 index 0000000..03b0357 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.PrintWriter; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Thread Utility + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class Threads { + protected static final Log LOG = LogFactory.getLog(Threads.class); + private static final AtomicInteger poolNumber = new AtomicInteger(1); + + /** + * Utility method that sets name, daemon status and starts passed thread. + * @param t thread to run + * @return Returns the passed Thread t. + */ + public static Thread setDaemonThreadRunning(final Thread t) { + return setDaemonThreadRunning(t, t.getName()); + } + + /** + * Utility method that sets name, daemon status and starts passed thread. + * @param t thread to frob + * @param name new name + * @return Returns the passed Thread t. + */ + public static Thread setDaemonThreadRunning(final Thread t, + final String name) { + return setDaemonThreadRunning(t, name, null); + } + + /** + * Utility method that sets name, daemon status and starts passed thread. + * @param t thread to frob + * @param name new name + * @param handler A handler to set on the thread. Pass null if want to + * use default handler. + * @return Returns the passed Thread t. + */ + public static Thread setDaemonThreadRunning(final Thread t, + final String name, final UncaughtExceptionHandler handler) { + t.setName(name); + if (handler != null) { + t.setUncaughtExceptionHandler(handler); + } + t.setDaemon(true); + t.start(); + return t; + } + + /** + * Shutdown passed thread using isAlive and join. + * @param t Thread to shutdown + */ + public static void shutdown(final Thread t) { + shutdown(t, 0); + } + + /** + * Shutdown passed thread using isAlive and join. + * @param joinwait Pass 0 if we're to wait forever. + * @param t Thread to shutdown + */ + public static void shutdown(final Thread t, final long joinwait) { + if (t == null) return; + while (t.isAlive()) { + try { + t.join(joinwait); + } catch (InterruptedException e) { + LOG.warn(t.getName() + "; joinwait=" + joinwait, e); + } + } + } + + + /** + * @param t Waits on the passed thread to die dumping a threaddump every + * minute while its up. + * @throws InterruptedException + */ + public static void threadDumpingIsAlive(final Thread t) + throws InterruptedException { + if (t == null) { + return; + } + + while (t.isAlive()) { + t.join(60 * 1000); + if (t.isAlive()) { + ReflectionUtils.printThreadInfo(new PrintWriter(System.out), + "Automatic Stack Trace every 60 seconds waiting on " + + t.getName()); + } + } + } + + /** + * @param millis How long to sleep for in milliseconds. + */ + public static void sleep(int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * Sleeps for the given amount of time even if interrupted. Preserves + * the interrupt status. + * @param msToWait the amount of time to sleep in milliseconds + */ + public static void sleepWithoutInterrupt(final long msToWait) { + long timeMillis = System.currentTimeMillis(); + long endTime = timeMillis + msToWait; + boolean interrupted = false; + while (timeMillis < endTime) { + try { + Thread.sleep(endTime - timeMillis); + } catch (InterruptedException ex) { + interrupted = true; + } + timeMillis = System.currentTimeMillis(); + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + + /** + * Create a new CachedThreadPool with a bounded number as the maximum thread size in the pool. + * @param maxCachedThread the maximum thread could be created in the pool + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @param threadFactory the factory to use when creating new threads + * @return threadPoolExecutor the cachedThreadPool with a bounded number as the maximum thread + * size in the pool. + */ + public static ThreadPoolExecutor getBoundedCachedThreadPool( + int maxCachedThread, long timeout, TimeUnit unit, + ThreadFactory threadFactory) { + ThreadPoolExecutor boundedCachedThreadPool = + new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout, + TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); + // allow the core pool threads timeout and terminate + boundedCachedThreadPool.allowCoreThreadTimeOut(true); + return boundedCachedThreadPool; + } + + /** + * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, + * with a common prefix. + * @param prefix The prefix of every created Thread's name + * @return a {@link java.util.concurrent.ThreadFactory} that names threads + */ + public static ThreadFactory getNamedThreadFactory(final String prefix) { + SecurityManager s = System.getSecurityManager(); + final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() + .getThreadGroup(); + + return new ThreadFactory() { + final AtomicInteger threadNumber = new AtomicInteger(1); + private final int poolNumber = Threads.poolNumber.getAndIncrement(); + final ThreadGroup group = threadGroup; + + @Override + public Thread newThread(Runnable r) { + final String name = prefix + "pool-" + poolNumber + "-thread-" + + threadNumber.getAndIncrement(); + return new Thread(group, r, name); + } + }; + } + + /** + * Get a named {@link ThreadFactory} that just builds daemon threads + * @param prefix name prefix for all threads created from the factory + * @return a thread factory that creates named, daemon threads + */ + public static ThreadFactory newDaemonThreadFactory(final String prefix) { + final ThreadFactory namedFactory = getNamedThreadFactory(prefix); + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = namedFactory.newThread(r); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + + }; + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java new file mode 100644 index 0000000..f70aef6 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java @@ -0,0 +1,75 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertTrue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; + +public class TestThreads { + private static final Log LOG = LogFactory.getLog(TestThreads.class); + + private static final int SLEEP_TIME_MS = 5000; + private static final int TOLERANCE_MS = (int) (0.05 * SLEEP_TIME_MS); + + private volatile boolean wasInterrupted; + + @Test(timeout=6000) + public void testSleepWithoutInterrupt() throws InterruptedException { + Thread sleeper = new Thread(new Runnable() { + @Override + public void run() { + LOG.debug("Sleeper thread: sleeping for " + SLEEP_TIME_MS); + Threads.sleepWithoutInterrupt(SLEEP_TIME_MS); + LOG.debug("Sleeper thread: finished sleeping"); + wasInterrupted = Thread.currentThread().isInterrupted(); + } + }); + LOG.debug("Starting sleeper thread (" + SLEEP_TIME_MS + " ms)"); + sleeper.start(); + long startTime = System.currentTimeMillis(); + LOG.debug("Main thread: sleeping for 500 ms"); + Threads.sleep(500); + + LOG.debug("Interrupting the sleeper thread and sleeping for 2000 ms"); + sleeper.interrupt(); + Threads.sleep(2000); + + LOG.debug("Interrupting the sleeper thread and sleeping for 1000 ms"); + sleeper.interrupt(); + Threads.sleep(1000); + + LOG.debug("Interrupting the sleeper thread again"); + sleeper.interrupt(); + sleeper.join(); + + assertTrue("sleepWithoutInterrupt did not preserve the thread's " + + "interrupted status", wasInterrupted); + + long timeElapsed = System.currentTimeMillis() - startTime; + assertTrue("Elapsed time " + timeElapsed + " ms is out of the expected " + + "range of the sleep time " + SLEEP_TIME_MS, + Math.abs(timeElapsed - SLEEP_TIME_MS) < TOLERANCE_MS); + LOG.debug("Target sleep time: " + SLEEP_TIME_MS + ", time elapsed: " + + timeElapsed); + } +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/DaemonThreadFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/DaemonThreadFactory.java deleted file mode 100644 index 113bbcd..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/DaemonThreadFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Thread factory that creates daemon threads - */ -public class DaemonThreadFactory implements ThreadFactory { - static final AtomicInteger poolNumber = new AtomicInteger(1); - final ThreadGroup group; - final AtomicInteger threadNumber = new AtomicInteger(1); - final String namePrefix; - - public DaemonThreadFactory(String name) { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - namePrefix = name + poolNumber.getAndIncrement() + "-thread-"; - } - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); - if (!t.isDaemon()) { - t.setDaemon(true); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } -} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java index d8d5c04..84c64a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -42,7 +42,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -70,6 +69,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import com.google.protobuf.ServiceException; @@ -183,7 +183,7 @@ public class HTable implements HTableInterface { // we only create as many Runnables as there are region servers. It means // it also scales when new region servers are added. this.pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, - new SynchronousQueue(), new DaemonThreadFactory("hbase-table-pool")); + new SynchronousQueue(), Threads.newDaemonThreadFactory("hbase-table")); ((ThreadPoolExecutor) this.pool).allowCoreThreadTimeOut(true); this.finishSetup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Threads.java deleted file mode 100644 index f23ada7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import java.io.PrintWriter; -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * Thread Utility - */ -@InterfaceAudience.Public -@InterfaceStability.Stable -public class Threads { - protected static final Log LOG = LogFactory.getLog(Threads.class); - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to run - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t) { - return setDaemonThreadRunning(t, t.getName()); - } - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to frob - * @param name new name - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t, - final String name) { - return setDaemonThreadRunning(t, name, null); - } - - /** - * Utility method that sets name, daemon status and starts passed thread. - * @param t thread to frob - * @param name new name - * @param handler A handler to set on the thread. Pass null if want to - * use default handler. - * @return Returns the passed Thread t. - */ - public static Thread setDaemonThreadRunning(final Thread t, - final String name, final UncaughtExceptionHandler handler) { - t.setName(name); - if (handler != null) { - t.setUncaughtExceptionHandler(handler); - } - t.setDaemon(true); - t.start(); - return t; - } - - /** - * Shutdown passed thread using isAlive and join. - * @param t Thread to shutdown - */ - public static void shutdown(final Thread t) { - shutdown(t, 0); - } - - /** - * Shutdown passed thread using isAlive and join. - * @param joinwait Pass 0 if we're to wait forever. - * @param t Thread to shutdown - */ - public static void shutdown(final Thread t, final long joinwait) { - if (t == null) return; - while (t.isAlive()) { - try { - t.join(joinwait); - } catch (InterruptedException e) { - LOG.warn(t.getName() + "; joinwait=" + joinwait, e); - } - } - } - - - /** - * @param t Waits on the passed thread to die dumping a threaddump every - * minute while its up. - * @throws InterruptedException - */ - public static void threadDumpingIsAlive(final Thread t) - throws InterruptedException { - if (t == null) { - return; - } - - while (t.isAlive()) { - t.join(60 * 1000); - if (t.isAlive()) { - ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - "Automatic Stack Trace every 60 seconds waiting on " + - t.getName()); - } - } - } - - /** - * @param millis How long to sleep for in milliseconds. - */ - public static void sleep(int millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - /** - * Sleeps for the given amount of time even if interrupted. Preserves - * the interrupt status. - * @param msToWait the amount of time to sleep in milliseconds - */ - public static void sleepWithoutInterrupt(final long msToWait) { - long timeMillis = System.currentTimeMillis(); - long endTime = timeMillis + msToWait; - boolean interrupted = false; - while (timeMillis < endTime) { - try { - Thread.sleep(endTime - timeMillis); - } catch (InterruptedException ex) { - interrupted = true; - } - timeMillis = System.currentTimeMillis(); - } - - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - - /** - * Create a new CachedThreadPool with a bounded number as the maximum - * thread size in the pool. - * - * @param maxCachedThread the maximum thread could be created in the pool - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @param threadFactory the factory to use when creating new threads - * @return threadPoolExecutor the cachedThreadPool with a bounded number - * as the maximum thread size in the pool. - */ - public static ThreadPoolExecutor getBoundedCachedThreadPool( - int maxCachedThread, long timeout, TimeUnit unit, - ThreadFactory threadFactory) { - ThreadPoolExecutor boundedCachedThreadPool = - new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout, - TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); - // allow the core pool threads timeout and terminate - boundedCachedThreadPool.allowCoreThreadTimeOut(true); - return boundedCachedThreadPool; - } - - - /** - * Returns a {@link java.util.concurrent.ThreadFactory} that names each - * created thread uniquely, with a common prefix. - * - * @param prefix The prefix of every created Thread's name - * @return a {@link java.util.concurrent.ThreadFactory} that names threads - */ - public static ThreadFactory getNamedThreadFactory(final String prefix) { - return new ThreadFactory() { - - private final AtomicInteger threadNumber = new AtomicInteger(1); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, prefix + threadNumber.getAndIncrement()); - } - }; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 96d3c04..1762293 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.DataInputBuffer; import org.junit.After; @@ -3952,7 +3953,7 @@ public class TestFromClientSide { ExecutorService pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), - new DaemonThreadFactory("test-from-client-pool")); + Threads.newDaemonThreadFactory("test-from-client")); ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true); HTable t = new HTable(tableName, conn, pool); HBaseAdmin ha = new HBaseAdmin(conn); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 86187c3..6f8ddb4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -288,7 +289,7 @@ public class TestHCM { ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new SynchronousQueue(), - new DaemonThreadFactory("test-hcm-pool")); + Threads.newDaemonThreadFactory("test-hcm")); HTable table = new HTable(TABLE_NAME1, conn, pool); table.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java deleted file mode 100644 index 3bd39af..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.util; - -import static org.junit.Assert.assertTrue; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.SmallTests; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(SmallTests.class) -public class TestThreads { - private static final Log LOG = LogFactory.getLog(TestThreads.class); - - private static final int SLEEP_TIME_MS = 5000; - private static final int TOLERANCE_MS = (int) (0.05 * SLEEP_TIME_MS); - - private volatile boolean wasInterrupted; - - @Test(timeout=6000) - public void testSleepWithoutInterrupt() throws InterruptedException { - Thread sleeper = new Thread(new Runnable() { - @Override - public void run() { - LOG.debug("Sleeper thread: sleeping for " + SLEEP_TIME_MS); - Threads.sleepWithoutInterrupt(SLEEP_TIME_MS); - LOG.debug("Sleeper thread: finished sleeping"); - wasInterrupted = Thread.currentThread().isInterrupted(); - } - }); - LOG.debug("Starting sleeper thread (" + SLEEP_TIME_MS + " ms)"); - sleeper.start(); - long startTime = System.currentTimeMillis(); - LOG.debug("Main thread: sleeping for 500 ms"); - Threads.sleep(500); - - LOG.debug("Interrupting the sleeper thread and sleeping for 2000 ms"); - sleeper.interrupt(); - Threads.sleep(2000); - - LOG.debug("Interrupting the sleeper thread and sleeping for 1000 ms"); - sleeper.interrupt(); - Threads.sleep(1000); - - LOG.debug("Interrupting the sleeper thread again"); - sleeper.interrupt(); - sleeper.join(); - - assertTrue("sleepWithoutInterrupt did not preserve the thread's " + - "interrupted status", wasInterrupted); - - long timeElapsed = System.currentTimeMillis() - startTime; - assertTrue("Elapsed time " + timeElapsed + " ms is out of the expected " + - "range of the sleep time " + SLEEP_TIME_MS, - Math.abs(timeElapsed - SLEEP_TIME_MS) < TOLERANCE_MS); - LOG.debug("Target sleep time: " + SLEEP_TIME_MS + ", time elapsed: " + - timeElapsed); - } - - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} -