diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index e79e7b3..803b2bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -28,6 +28,7 @@ // and similar grotesqueries private volatile boolean drained = false; private final BlockingQueue queue; + final Object mutex; public DrainDispatcher() { this(new LinkedBlockingQueue()); @@ -36,6 +37,7 @@ public DrainDispatcher() { private DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); this.queue = eventQueue; + this.mutex = this; } /** @@ -53,8 +55,10 @@ Runnable createThread() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { - // !drained if dispatch queued new events on this dispatcher - drained = queue.isEmpty(); + synchronized (mutex) { + // !drained if dispatch queued new events on this dispatcher + drained = queue.isEmpty(); + } Event event; try { event = queue.take(); @@ -75,8 +79,10 @@ public EventHandler getEventHandler() { return new EventHandler() { @Override public void handle(Event event) { - drained = false; - actual.handle(event); + synchronized (mutex) { + actual.handle(event); + drained = false; + } } }; }