diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 03e180d..3def27f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -17,6 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,9 +38,10 @@ private final SchedulingEditPolicy scheduleEditPolicy; private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class); - //thread which runs periodically to see the last time since a heartbeat is - //received. - private Thread checkerThread; + // ScheduledExecutorService which schedules the PreemptionChecker to run + // periodically. + private ScheduledExecutorService ses; + private ScheduledFuture handler; private volatile boolean stopped; private long monitorInterval; private RMContext rmContext; @@ -61,17 +68,25 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { assert !stopped : "starting when already stopped"; - checkerThread = new Thread(new PreemptionChecker()); - checkerThread.setName(getName()); - checkerThread.start(); + ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(getName()); + return t; + } + }); + handler = ses.scheduleAtFixedRate(new PreemptionChecker(), + 0, monitorInterval, TimeUnit.MILLISECONDS); super.serviceStart(); } @Override public void serviceStop() throws Exception { stopped = true; - if (checkerThread != null) { - checkerThread.interrupt(); + if (handler != null) { + LOG.info("Stop " + getName()); + handler.cancel(true); + ses.shutdown(); } super.serviceStop(); } @@ -84,24 +99,12 @@ public void invokePolicy(){ private class PreemptionChecker implements Runnable { @Override public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - try { - //invoke the preemption policy at a regular pace - //the policy will generate preemption or kill events - //managed by the dispatcher - invokePolicy(); - } catch (YarnRuntimeException e) { - LOG.error("YarnRuntimeException raised while executing preemption" - + " checker, skip this run..., exception=", e); - } - - // Wait before next run - try { - Thread.sleep(monitorInterval); - } catch (InterruptedException e) { - LOG.info(getName() + " thread interrupted"); - break; - } + try { + //invoke the preemption policy + invokePolicy(); + } catch (YarnRuntimeException e) { + LOG.error("YarnRuntimeException raised while executing preemption" + + " checker, skip this run..., exception=", e); } } }