diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index f5361c8..7478946 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -59,6 +59,9 @@ // Indicates all the remaining dispatcher's events on stop have been drained // and processed. + // FIXME data race happens if dispatcher thread set drained to true between + // handler setting drained to false and enqueueing event. Ignored for now + // because of its tiny impact. See YARN-5436. private volatile boolean drained = true; private final Object waitForDrained = new Object(); @@ -300,9 +303,4 @@ public void run() { protected boolean isEventThreadWaiting() { return eventHandlingThread.getState() == Thread.State.WAITING; } - - @VisibleForTesting - protected boolean isDrained() { - return this.drained; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index e4a5a82..5d02fa0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -22,6 +22,10 @@ @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { + private volatile boolean drained = false; + private volatile boolean stopped = false; + private final BlockingQueue queue; + private final Object mutex; public DrainDispatcher() { this(new LinkedBlockingQueue()); @@ -29,6 +33,8 @@ public DrainDispatcher() { public DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); + this.queue = eventQueue; + this.mutex = this; } /** @@ -44,8 +50,50 @@ public void waitForEventThreadToWait() { * Busy loop waiting for all queued events to drain. */ public void await() { - while (!isDrained()) { + while (!drained) { Thread.yield(); } } + + @Override + Runnable createThread() { + return () -> { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (mutex) { + // !drained if dispatch queued new events on this dispatcher + drained = queue.isEmpty(); + } + Event event; + try { + event = queue.take(); + } catch(InterruptedException ie) { + return; + } + if (event != null) { + dispatch(event); + } + } + }; + } + + @SuppressWarnings("unchecked") + @Override + public EventHandler getEventHandler() { + final EventHandler actual = super.getEventHandler(); + return new EventHandler() { + @Override + public void handle(Event event) { + synchronized (mutex) { + actual.handle(event); + drained = false; + } + } + }; + } + + @Override + protected void serviceStop() throws Exception { + stopped = true; + super.serviceStop(); + } }