diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 28be6ac..d36d841 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -34,6 +34,8 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * Dispatches {@link Event}s in a separate thread. Currently only single thread * does that. Potentially there could be multiple channels for each event type @@ -282,4 +284,9 @@ public void run() { } }; } + + @VisibleForTesting + protected boolean isDrained() { + return this.drained; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 803b2bb..da5ae44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -23,68 +23,20 @@ @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { -// flagrant initialize abuse throughout, but safe per -// http://java.sun.com/docs/books/jls/third_edition/html/typesValues.html#96595 -// and similar grotesqueries - private volatile boolean drained = false; - private final BlockingQueue queue; - final Object mutex; - public DrainDispatcher() { this(new LinkedBlockingQueue()); } private DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); - this.queue = eventQueue; - this.mutex = this; } /** * Busy loop waiting for all queued events to drain. */ public void await() { - while (!drained) { + while (!isDrained()) { Thread.yield(); } } - - @Override - Runnable createThread() { - return new Runnable() { - @Override - public void run() { - while (!Thread.currentThread().isInterrupted()) { - synchronized (mutex) { - // !drained if dispatch queued new events on this dispatcher - drained = queue.isEmpty(); - } - Event event; - try { - event = queue.take(); - } catch(InterruptedException ie) { - return; - } - if (event != null) { - dispatch(event); - } - } - } - }; - } - - @Override - public EventHandler getEventHandler() { - final EventHandler actual = super.getEventHandler(); - return new EventHandler() { - @Override - public void handle(Event event) { - synchronized (mutex) { - actual.handle(event); - drained = false; - } - } - }; - } - } -- 1.8.4