diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index f1b0c794031..e2fbb5b76da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -604,4 +604,18 @@ public Long getTokenSequenceNo() { public void incrTokenSequenceNo() { this.tokenSequenceNo.incrementAndGet(); } + + /** + * + * @param appId + * @return + */ + public boolean isAppOnUAM(ApplicationId appId) { + RMApp rmApp = getRMApps().get(appId); + boolean isUAM = false; + if (rmApp != null && rmApp.getApplicationSubmissionContext() != null) { + isUAM = rmApp.getApplicationSubmissionContext().getUnmanagedAM(); + } + return isUAM; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 55420bd9270..ad0c47d9e45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -203,4 +203,6 @@ void setMultiNodeSortingManager( long getTokenSequenceNo(); void incrTokenSequenceNo(); + + boolean isAppOnUAM(ApplicationId appId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 7f10138494e..8fa6206fd6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -678,4 +678,9 @@ public long getTokenSequenceNo() { public void incrTokenSequenceNo() { this.activeServiceContext.incrTokenSequenceNo(); } + + @Override + public boolean isAppOnUAM(ApplicationId appId) { + return this.activeServiceContext.isAppOnUAM(appId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 8e65e6a42e3..de9b33a3ae7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -617,8 +617,14 @@ public void move(Queue newQueue) { ap.getPrimaryRequestedNodePartition(), delta); } } - oldMetrics.moveAppFrom(this); - newMetrics.moveAppTo(this); + + if(rmContext.isAppOnUAM(applicationId)) { + oldMetrics.moveUnmanagedAppFrom(this); + newMetrics.moveUnmanagedAppTo(this); + } else { + oldMetrics.moveAppFrom(this); + newMetrics.moveAppTo(this); + } abstractUsersManager.deactivateApplication(user, applicationId); abstractUsersManager = newQueue.getAbstractUsersManager(); if (!schedulerKeys.isEmpty()) { @@ -649,7 +655,12 @@ public void stop() { ask.getCount())); } } - metrics.finishAppAttempt(applicationId, pending, user); + + if (rmContext.isAppOnUAM(applicationId)) { + metrics.finishUnmanagedAppAttempt(applicationId, pending, user); + } else { + metrics.finishAppAttempt(applicationId, pending, user); + } // Clear requests themselves clearRequests(); @@ -695,7 +706,12 @@ public void recoverContainer(RMContainer rmContainer, String partition) { // If there was any container to recover, the application was // running from scheduler's POV. pending = false; - metrics.runAppAttempt(applicationId, user); + + if (rmContext.isAppOnUAM(applicationId)) { + metrics.runUnmanagedAppAttempt(applicationId, user); + } else { + metrics.runAppAttempt(applicationId, user); + } } // Container is completed. Skip recovering resources. @@ -736,7 +752,12 @@ private void updateMetricsForAllocatedContainer(NodeType type, // once an allocation is done we assume the application is // running from scheduler's POV. pending = false; - metrics.runAppAttempt(applicationId, user); + + if (rmContext.isAppOnUAM(applicationId)) { + metrics.runUnmanagedAppAttempt(applicationId, user); + } else { + metrics.runAppAttempt(applicationId, user); + } } updateMetrics(applicationId, type, node, containerAllocated, user, queue); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index c3d466a8354..3b2c85d8509 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -63,6 +63,20 @@ @Metric("# of apps completed") MutableCounterInt appsCompleted; @Metric("# of apps killed") MutableCounterInt appsKilled; @Metric("# of apps failed") MutableCounterInt appsFailed; + + @Metric("# of Unmanaged apps submitted") + MutableCounterInt unmanagedAppsSubmitted; + @Metric("# of Unmanaged running apps") + MutableGaugeInt unmanagedAppsRunning; + @Metric("# of Unmanaged pending apps") + MutableGaugeInt unmanagedAppsPending; + @Metric("# of Unmanaged apps completed") + MutableCounterInt unmanagedAppsCompleted; + @Metric("# of Unmanaged apps killed") + MutableCounterInt unmanagedAppsKilled; + @Metric("# of Unmanaged apps failed") + MutableCounterInt unmanagedAppsFailed; + @Metric("Aggregate # of allocated node-local containers") MutableCounterLong aggregateNodeLocalContainersAllocated; @Metric("Aggregate # of allocated rack-local containers") @@ -413,6 +427,17 @@ public void submitApp(String user) { } } + public void submitUnmanagedApp(String user) { + unmanagedAppsSubmitted.incr(); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.submitUnmanagedApp(user); + } + if (parent != null) { + parent.submitUnmanagedApp(user); + } + } + public void submitAppAttempt(String user) { appsPending.incr(); QueueMetrics userMetrics = getUserMetrics(user); @@ -424,6 +449,17 @@ public void submitAppAttempt(String user) { } } + public void submitUnmanagedAppAttempt(String user) { + unmanagedAppsPending.incr(); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.submitUnmanagedAppAttempt(user); + } + if (parent != null) { + parent.submitUnmanagedAppAttempt(user); + } + } + public void runAppAttempt(ApplicationId appId, String user) { runBuckets.add(appId, System.currentTimeMillis()); appsRunning.incr(); @@ -437,6 +473,19 @@ public void runAppAttempt(ApplicationId appId, String user) { } } + public void runUnmanagedAppAttempt(ApplicationId appId, String user) { + runBuckets.add(appId, System.currentTimeMillis()); + unmanagedAppsRunning.incr(); + unmanagedAppsPending.decr(); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.runUnmanagedAppAttempt(appId, user); + } + if (parent != null) { + parent.runUnmanagedAppAttempt(appId, user); + } + } + public void finishAppAttempt( ApplicationId appId, boolean isPending, String user) { runBuckets.remove(appId); @@ -454,6 +503,23 @@ public void finishAppAttempt( } } + public void finishUnmanagedAppAttempt( + ApplicationId appId, boolean isPending, String user) { + runBuckets.remove(appId); + if (isPending) { + unmanagedAppsPending.decr(); + } else { + unmanagedAppsRunning.decr(); + } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.finishUnmanagedAppAttempt(appId, isPending, user); + } + if (parent != null) { + parent.finishUnmanagedAppAttempt(appId, isPending, user); + } + } + public void finishApp(String user, RMAppState rmAppFinalState) { switch (rmAppFinalState) { case KILLED: appsKilled.incr(); break; @@ -468,6 +534,21 @@ public void finishApp(String user, RMAppState rmAppFinalState) { parent.finishApp(user, rmAppFinalState); } } + + public void finishUnmanagedApp(String user, RMAppState rmAppFinalState) { + switch (rmAppFinalState) { + case KILLED: unmanagedAppsKilled.incr(); break; + case FAILED: unmanagedAppsFailed.incr(); break; + default: unmanagedAppsCompleted.incr(); break; + } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.finishUnmanagedApp(user, rmAppFinalState); + } + if (parent != null) { + parent.finishUnmanagedApp(user, rmAppFinalState); + } + } public void moveAppFrom(AppSchedulingInfo app) { if (app.isPending()) { @@ -483,6 +564,21 @@ public void moveAppFrom(AppSchedulingInfo app) { parent.moveAppFrom(app); } } + + public void moveUnmanagedAppFrom(AppSchedulingInfo app) { + if (app.isPending()) { + unmanagedAppsPending.decr(); + } else { + unmanagedAppsRunning.decr(); + } + QueueMetrics userMetrics = getUserMetrics(app.getUser()); + if (userMetrics != null) { + userMetrics.moveUnmanagedAppFrom(app); + } + if (parent != null) { + parent.moveUnmanagedAppFrom(app); + } + } public void moveAppTo(AppSchedulingInfo app) { if (app.isPending()) { @@ -499,6 +595,21 @@ public void moveAppTo(AppSchedulingInfo app) { } } + public void moveUnmanagedAppTo(AppSchedulingInfo app) { + if (app.isPending()) { + unmanagedAppsPending.incr(); + } else { + unmanagedAppsRunning.incr(); + } + QueueMetrics userMetrics = getUserMetrics(app.getUser()); + if (userMetrics != null) { + userMetrics.moveUnmanagedAppTo(app); + } + if (parent != null) { + parent.moveUnmanagedAppTo(app); + } + } + /** * Set available resources. To be called by scheduler periodically as * resources become available. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 96288f8b9f3..d958f0d7a56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -30,6 +30,7 @@ private final String user; private volatile T currentAttempt; private volatile Priority priority; + private boolean isAppOnUAM = false; public SchedulerApplication(Queue queue, String user) { this.queue = queue; @@ -63,8 +64,16 @@ public void setCurrentAppAttempt(T currentAttempt) { this.currentAttempt = currentAttempt; } + public void setAppOnUAM(boolean isUAM) { + this.isAppOnUAM = isUAM; + } + public void stop(RMAppState rmAppFinalState) { - queue.getMetrics().finishApp(user, rmAppFinalState); + if(isAppOnUAM) { + queue.getMetrics().finishUnmanagedApp(user, rmAppFinalState); + } else { + queue.getMetrics().finishApp(user, rmAppFinalState); + } } public Priority getPriority() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index a632bfa7651..6b36149f58d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -883,9 +883,16 @@ private void addApplicationOnRecovery(ApplicationId applicationId, // Ignore the exception for recovered app as the app was previously // accepted. } - queue.getMetrics().submitApp(user); + boolean isAppOnUAM = rmContext.isAppOnUAM(applicationId); + if (isAppOnUAM) { + queue.getMetrics().submitUnmanagedApp(user); + } else { + queue.getMetrics().submitApp(user); + } + SchedulerApplication application = new SchedulerApplication(queue, user, priority); + application.setAppOnUAM(isAppOnUAM); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); @@ -1047,9 +1054,17 @@ private void addApplication(ApplicationId applicationId, String queueName, return; } // update the metrics - queue.getMetrics().submitApp(user); + boolean isAppOnUAM = rmContext.isAppOnUAM(applicationId); + if (isAppOnUAM) { + queue.getMetrics().submitUnmanagedApp(user); + } else { + queue.getMetrics().submitApp(user); + } + SchedulerApplication application = new SchedulerApplication(queue, user, priority); + + application.setAppOnUAM(isAppOnUAM); applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 96d309c547e..027dcb4bc47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -602,7 +602,11 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, // We don't want to update metrics for move app if (!isMoveApp) { - metrics.submitAppAttempt(userName); + if(csContext.getRMContext().isAppOnUAM(application.getApplicationId())) { + metrics.submitUnmanagedAppAttempt(userName); + } else { + metrics.submitAppAttempt(userName); + } } getParent().submitApplicationAttempt(application, userName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 813f87bd779..1b0519ca4e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -553,10 +553,16 @@ protected void addApplication(ApplicationId applicationId, } } + boolean isAppOnUAM = rmContext.isAppOnUAM(applicationId); SchedulerApplication application = new SchedulerApplication<>(queue, user); + application.setAppOnUAM(isAppOnUAM); applications.put(applicationId, application); - queue.getMetrics().submitApp(user); + if(isAppOnUAM) { + queue.getMetrics().submitUnmanagedApp(user); + } else { + queue.getMetrics().submitApp(user); + } LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName @@ -610,7 +616,11 @@ protected void addApplicationAttempt( maxRunningEnforcer.trackNonRunnableApp(attempt); } - queue.getMetrics().submitAppAttempt(user); + if(rmContext.isAppOnUAM(applicationAttemptId.getApplicationId())) { + queue.getMetrics().submitUnmanagedAppAttempt(user); + } else { + queue.getMetrics().submitAppAttempt(user); + } LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 9ec4822e513..2f9f8b2b712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -390,10 +390,16 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { + boolean isAppOnUAM = rmContext.isAppOnUAM(applicationId); SchedulerApplication application = new SchedulerApplication<>(DEFAULT_QUEUE, user); + application.setAppOnUAM(isAppOnUAM); applications.put(applicationId, application); - metrics.submitApp(user); + if(isAppOnUAM) { + metrics.submitUnmanagedApp(user); + } else { + metrics.submitApp(user); + } LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); if (isAppRecovering) { @@ -424,7 +430,12 @@ public synchronized void addApplication(ApplicationId applicationId, } application.setCurrentAppAttempt(schedulerApp); - metrics.submitAppAttempt(user); + if(rmContext.isAppOnUAM(appAttemptId.getApplicationId())) { + metrics.submitUnmanagedAppAttempt(user); + } else { + metrics.submitAppAttempt(user); + } + LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); if (isAttemptRecovering) {