diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 5b03d51..0573bae 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -62,10 +62,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -335,13 +331,12 @@ public RegisterApplicationMasterResponse run() throws Exception { private void trackApp() { if (isTracked) { ((SchedulerWrapper) rm.getResourceScheduler()) - .addTrackedApp(appAttemptId, oldAppId); + .addTrackedApp(appId, oldAppId); } } public void untrackApp() { if (isTracked) { - ((SchedulerWrapper) rm.getResourceScheduler()) - .removeTrackedApp(appAttemptId, oldAppId); + ((SchedulerWrapper) rm.getResourceScheduler()).removeTrackedApp(oldAppId); } } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java index 3b539fa..523fb86 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java @@ -20,7 +20,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair .FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; @@ -62,15 +63,21 @@ public FairSchedulerMetrics() { } @Override - public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) { - super.trackApp(appAttemptId, oldAppId); - FairScheduler fair = (FairScheduler) scheduler; - final FSAppAttempt app = fair.getSchedulerApp(appAttemptId); + public void trackApp(ApplicationId appId, String oldAppId) { + super.trackApp(appId, oldAppId); + SchedulerApplication app = (SchedulerApplication) + scheduler.getSchedulerApplications().get(appId); + metrics.register("variable.app." + oldAppId + ".demand.memory", new Gauge() { @Override public Long getValue() { - return app.getDemand().getMemorySize(); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getDemand().getMemorySize(); + } else { + return 0L; + } } } ); @@ -78,7 +85,12 @@ public Long getValue() { new Gauge() { @Override public Integer getValue() { - return app.getDemand().getVirtualCores(); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getDemand().getVirtualCores(); + } else { + return 0; + } } } ); @@ -86,7 +98,12 @@ public Integer getValue() { new Gauge() { @Override public Long getValue() { - return app.getResourceUsage().getMemorySize(); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getResourceUsage().getMemorySize(); + } else { + return 0L; + } } } ); @@ -94,7 +111,12 @@ public Long getValue() { new Gauge() { @Override public Integer getValue() { - return app.getResourceUsage().getVirtualCores(); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getResourceUsage().getVirtualCores(); + } else { + return 0; + } } } ); @@ -102,7 +124,12 @@ public Integer getValue() { new Gauge() { @Override public Long getValue() { - return app.getMinShare().getMemorySize(); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getMinShare().getMemorySize(); + } else { + return 0L; + } } } ); @@ -110,7 +137,12 @@ public Long getValue() { new Gauge() { @Override public Long getValue() { - return app.getMinShare().getMemorySize(); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getMinShare().getMemorySize(); + } else { + return 0L; + } } } ); @@ -118,7 +150,13 @@ public Long getValue() { new Gauge() { @Override public Long getValue() { - return Math.min(app.getMaxShare().getMemorySize(), totalMemoryMB); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return Math + .min(appAttempt.getMaxShare().getMemorySize(), totalMemoryMB); + } else { + return 0L; + } } } ); @@ -126,7 +164,13 @@ public Long getValue() { new Gauge() { @Override public Integer getValue() { - return Math.min(app.getMaxShare().getVirtualCores(), totalVCores); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return Math + .min(appAttempt.getMaxShare().getVirtualCores(), totalVCores); + } else { + return 0; + } } } ); @@ -134,7 +178,12 @@ public Integer getValue() { new Gauge() { @Override public Integer getValue() { - return app.getFairShare().getVirtualCores(); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getFairShare().getVirtualCores(); + } else { + return 0; + } } } ); @@ -142,7 +191,12 @@ public Integer getValue() { new Gauge() { @Override public Integer getValue() { - return app.getFairShare().getVirtualCores(); + FSAppAttempt appAttempt = (FSAppAttempt) app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getFairShare().getVirtualCores(); + } else { + return 0; + } } } ); diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index df8323a..7d3b64f 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -122,7 +121,7 @@ private Lock queueLock; private Configuration conf; - private ResourceScheduler scheduler; + private AbstractYarnScheduler scheduler; private Map appQueueMap = new ConcurrentHashMap(); private BufferedWriter jobRuntimeLogBW; @@ -169,8 +168,8 @@ public ResourceSchedulerWrapper() { public void setConf(Configuration conf) { this.conf = conf; // set scheduler - Class klass = conf.getClass( - SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class); + Class klass = conf.getClass( + SLSConfiguration.RM_SCHEDULER, null, AbstractYarnScheduler.class); scheduler = ReflectionUtils.newInstance(klass, conf); // start metrics @@ -793,17 +792,15 @@ public SchedulerMetrics getSchedulerMetrics() { } // API open to out classes - public void addTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + public void addTrackedApp(ApplicationId appId, String oldAppId) { if (metricsON) { - schedulerMetrics.trackApp(appAttemptId, oldAppId); + schedulerMetrics.trackApp(appId, oldAppId); } } - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + public void removeTrackedApp(String oldAppId) { if (metricsON) { - schedulerMetrics.untrackApp(appAttemptId, oldAppId); + schedulerMetrics.untrackApp(oldAppId); } } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index cd4377e..6ea2ab0 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -839,17 +839,16 @@ public SchedulerMetrics getSchedulerMetrics() { } // API open to out classes - public void addTrackedApp(ApplicationAttemptId appAttemptId, + public void addTrackedApp(ApplicationId appId, String oldAppId) { if (metricsON) { - schedulerMetrics.trackApp(appAttemptId, oldAppId); + schedulerMetrics.trackApp(appId, oldAppId); } } - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + public void removeTrackedApp(String oldAppId) { if (metricsON) { - schedulerMetrics.untrackApp(appAttemptId, oldAppId); + schedulerMetrics.untrackApp(oldAppId); } } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java index ecf516d..6680d33 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java @@ -23,61 +23,68 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerAppReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @Private @Unstable public abstract class SchedulerMetrics { - protected ResourceScheduler scheduler; + protected AbstractYarnScheduler scheduler; protected Set trackedQueues; protected MetricRegistry metrics; protected Set appTrackedMetrics; protected Set queueTrackedMetrics; public SchedulerMetrics() { - appTrackedMetrics = new HashSet(); + appTrackedMetrics = new HashSet<>(); appTrackedMetrics.add("live.containers"); appTrackedMetrics.add("reserved.containers"); - queueTrackedMetrics = new HashSet(); + queueTrackedMetrics = new HashSet<>(); } - public void init(ResourceScheduler scheduler, MetricRegistry metrics) { + public void init(AbstractYarnScheduler scheduler, MetricRegistry metrics) { this.scheduler = scheduler; - this.trackedQueues = new HashSet(); + this.trackedQueues = new HashSet<>(); this.metrics = metrics; } - - public void trackApp(final ApplicationAttemptId appAttemptId, - String oldAppId) { + + public void trackApp(final ApplicationId appId, String oldAppId) { + SchedulerApplication app = (SchedulerApplication) + scheduler.getSchedulerApplications().get(appId); metrics.register("variable.app." + oldAppId + ".live.containers", - new Gauge() { - @Override - public Integer getValue() { - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - return app.getLiveContainers().size(); + new Gauge() { + @Override + public Integer getValue() { + SchedulerApplicationAttempt appAttempt = app.getCurrentAppAttempt(); + if (appAttempt != null) { + return appAttempt.getLiveContainers().size(); + } else { + return 0; + } + } } - } ); metrics.register("variable.app." + oldAppId + ".reserved.containers", - new Gauge() { - @Override - public Integer getValue() { - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - return app.getReservedContainers().size(); + new Gauge() { + @Override + public Integer getValue() { + SchedulerApplicationAttempt appAttempt = app.getCurrentAppAttempt(); + if (appAttempt != null) { + return app.getCurrentAppAttempt().getLiveContainers().size(); + } else { + return 0; + } + } } - } ); } - - public void untrackApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + + public void untrackApp(String oldAppId) { for (String m : appTrackedMetrics) { metrics.remove("variable.app." + oldAppId + "." + m); } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java index 524b8bf..235e3e4 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -21,7 +21,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import com.codahale.metrics.MetricRegistry; @@ -30,18 +29,16 @@ @Unstable public interface SchedulerWrapper { - public MetricRegistry getMetrics(); - public SchedulerMetrics getSchedulerMetrics(); - public Set getQueueSet(); - public void setQueueSet(Set queues); - public Set getTrackedAppSet(); - public void setTrackedAppSet(Set apps); - public void addTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId); - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId); - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS); + MetricRegistry getMetrics(); + SchedulerMetrics getSchedulerMetrics(); + Set getQueueSet(); + void setQueueSet(Set queues); + Set getTrackedAppSet(); + void setTrackedAppSet(Set apps); + void addTrackedApp(ApplicationId appId, String oldAppId); + void removeTrackedApp(String oldAppId); + void addAMRuntime(ApplicationId appId, + long traceStartTimeMS, long traceEndTimeMS, + long simulateStartTimeMS, long simulateEndTimeMS); }