diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index d4c129b3bf..76b97f7f8f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -17,10 +17,17 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; @@ -31,9 +38,10 @@ private final SchedulingEditPolicy scheduleEditPolicy; private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class); - //thread which runs periodically to see the last time since a heartbeat is - //received. - private Thread checkerThread; + // ScheduledExecutorService which schedules the PreemptionChecker to run + // periodically. + private ScheduledExecutorService ses; + private ScheduledFuture handler; private volatile boolean stopped; private long monitorInterval; private RMContext rmContext; @@ -64,17 +72,25 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { assert !stopped : "starting when already stopped"; - checkerThread = new Thread(new PreemptionChecker()); - checkerThread.setName(getName()); - checkerThread.start(); + ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(getName()); + return t; + } + }); + handler = ses.scheduleAtFixedRate(new PreemptionChecker(), + 0, monitorInterval, TimeUnit.MILLISECONDS); super.serviceStart(); } @Override public void serviceStop() throws Exception { stopped = true; - if (checkerThread != null) { - checkerThread.interrupt(); + if (handler != null) { + LOG.info("Stop " + getName()); + handler.cancel(true); + ses.shutdown(); } super.serviceStop(); } @@ -87,17 +103,12 @@ public void invokePolicy(){ private class PreemptionChecker implements Runnable { @Override public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - //invoke the preemption policy at a regular pace - //the policy will generate preemption or kill events - //managed by the dispatcher + try { + //invoke the preemption policy invokePolicy(); - try { - Thread.sleep(monitorInterval); - } catch (InterruptedException e) { - LOG.info(getName() + " thread interrupted"); - break; - } + } catch (YarnRuntimeException e) { + LOG.error("YarnRuntimeException raised while executing preemption" + + " checker, skip this run..., exception=", e); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java index 5bf61e347d..d53282ed67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java @@ -25,6 +25,7 @@ import org.junit.Test; import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; public class TestSchedulingMonitor { @@ -42,5 +43,23 @@ public void testRMStarts() { fail("ResourceManager does not start when " + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + " is set to true"); } + + SchedulingEditPolicy mPolicy = mock(SchedulingEditPolicy.class); + when(mPolicy.getMonitoringInterval()).thenReturn(1000L); + SchedulingMonitor monitor = new SchedulingMonitor(rm.getRMContext(), + mPolicy); + try { + monitor.serviceInit(conf); + monitor.serviceStart(); + } catch (Exception e) { + fail("SchedulingMonitor failes to start."); + } + verify(mPolicy, times(1)).editSchedule(); + try { + monitor.close(); + rm.close(); + } catch (Exception e) { + fail("Failed to close."); + } } }