diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 92dde949647..3c70c2da0bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; - import java.util.List; import java.util.Map; import java.util.Set; @@ -32,10 +31,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -104,6 +100,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings("unchecked") @@ -287,13 +288,8 @@ public SchedulingMonitorManager getSchedulingMonitorManager() { * @param app application attempt. */ public List getBlacklistedNodes(final SchedulerApplicationAttempt app) { - - NodeFilter nodeFilter = new NodeFilter() { - @Override - public boolean accept(SchedulerNode node) { - return SchedulerAppUtils.isPlaceBlacklisted(app, node, LOG); - } - }; + NodeFilter nodeFilter = + node -> SchedulerAppUtils.isPlaceBlacklisted(app, node, LOG); return nodeTracker.getNodes(nodeFilter); } @@ -484,8 +480,7 @@ public void recoverContainersOnNode(List containerReports, writeLock.lock(); try { if (!rmContext.isWorkPreservingRecoveryEnabled() - || containerReports == null || (containerReports != null - && containerReports.isEmpty())) { + || CollectionUtils.isEmpty(containerReports)) { return; } @@ -771,16 +766,9 @@ public void moveAllApps(String sourceQueue, String destQueue) LOG.warn(e.toString()); throw new YarnException(e); } - // check if source queue is a valid - List apps = getAppsInQueue(sourceQueue); - if (apps == null) { - String errMsg = - "The specified Queue: " + sourceQueue + " doesn't exist"; - LOG.warn(errMsg); - throw new YarnException(errMsg); - } + // generate move events for each pending/running app - for (ApplicationAttemptId appAttemptId : apps) { + for (ApplicationAttemptId appAttemptId : getValidQueues(sourceQueue)) { this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppManagerEvent(appAttemptId.getApplicationId(), destQueue, RMAppManagerEventType.APP_MOVE)); @@ -795,15 +783,8 @@ public void killAllAppsInQueue(String queueName) throws YarnException { writeLock.lock(); try { - // check if queue is a valid - List apps = getAppsInQueue(queueName); - if (apps == null) { - String errMsg = "The specified Queue: " + queueName + " doesn't exist"; - LOG.warn(errMsg); - throw new YarnException(errMsg); - } // generate kill events for each pending/running app - for (ApplicationAttemptId app : apps) { + for (ApplicationAttemptId app : getValidQueues(queueName)) { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL, "Application killed due to expiry of reservation queue " @@ -1529,4 +1510,24 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, public void resetSchedulerMetrics() { // reset scheduler metrics } + + /** + * Get valid queues. Always need to check if queue is a valid, regardless + * of scheduler. So that it guarantee valid queues. + * @param queueName queue name + * @return apps which in specified queue, it maybe empty. + * @throws YarnException if {@link YarnScheduler#getAppsInQueue(String)} + * return null, will throw this exception. + */ + private List getValidQueues(String queueName) + throws YarnException { + List apps = getAppsInQueue(queueName); + if (apps == null) { + String errMsg = "The specified Queue: " + queueName + " doesn't exist"; + LOG.warn(errMsg); + throw new YarnException(errMsg); + } + return apps; + } + }