diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 7c19c5e..2211a41 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -519,6 +519,7 @@ + @@ -552,4 +553,14 @@ +   +     +     +       +       +       +       +     +     +   diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 64eb777..755defd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -97,6 +98,8 @@ private volatile Priority maxClusterLevelAppPriority; + protected ActivitiesManager activitiesManager; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -789,4 +792,9 @@ private SchedContainerChangeRequest createSchedContainerChangeRequest( } return schedulerChangeRequests; } + + public ActivitiesManager getActivitiesManager() { + return this.activitiesManager; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java new file mode 100644 index 0000000..641d96d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * Utility for logging scheduler activities + */ +public class ActivitiesLogger { + + /** + * Methods for recording activities from an app + */ + public static class APP { + + /* + * Record skipped application activity when no container allocated / + * reserved / re-reserved. Scheduler will look at following applications + * within the same leaf queue. + */ + public static void recordSkippedAppActivityWithoutAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic) { + recordAppActivityWithoutAllocation(activitiesManager, node, application, + priority, diagnostic, ActivityState.SKIPPED); + } + + /* + * Record application activity when rejected because of queue maximum + * capacity or user limit. + */ + public static void recordRejectedAppActivityFromLeafQueue( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic) { + recordActivity(activitiesManager, node, application.getQueueName(), + application.getApplicationId().toString(), priority, + ActivityState.REJECTED, diagnostic); + finishSkippedAppAllocationRecording(activitiesManager, + application.getApplicationId(), ActivityState.REJECTED); + } + + /* + * Record application activity when no container allocated / + * reserved / re-reserved. Scheduler will look at following applications + * within the same leaf queue. + */ + public static void recordAppActivityWithoutAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic, ActivityState appState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + // Add application-container activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getApplicationId().toString(), null, + priority.toString(), ActivityState.SKIPPED, diagnostic); + // Add queue-application activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + } + // Add application-container activity into specific application allocation + // Under this condition, it fails to allocate a container to this + // application, so containerId is null. + if (activitiesManager.shouldRecordThisApp( + application.getApplicationId())) { + activitiesManager.addSchedulingActivityForApp( + application.getApplicationId(), null, priority.toString(), appState, + diagnostic); + } + } + + /* + * Record application activity when container allocated / reserved / + * re-reserved + */ + public static void recordAppActivityWithAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Container updatedContainer, + ActivityState activityState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + // Add application-container activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getApplicationId().toString(), + updatedContainer.getId().toString(), + updatedContainer.getPriority().toString(), activityState, + ActivityDiagnosticConstant.EMPTY); + // Add queue-application activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + } + // Add application-container activity into specific application allocation + if (activitiesManager.shouldRecordThisApp( + application.getApplicationId())) { + activitiesManager.addSchedulingActivityForApp( + application.getApplicationId(), updatedContainer.getId().toString(), + updatedContainer.getPriority().toString(), activityState, + ActivityDiagnosticConstant.EMPTY); + } + } + + /* + * Invoked when scheduler starts to look at this application within one node + * update. + */ + public static void startAppAllocationRecording( + ActivitiesManager activitiesManager, NodeId nodeId, long currentTime, + SchedulerApplicationAttempt application) { + if (activitiesManager == null) { + return; + } + activitiesManager.startAppAllocationRecording(nodeId, currentTime, + application); + } + + /* + * Invoked when scheduler finishes looking at this application within one + * node update, and the app has any container allocated/reserved during + * this allocation. + */ + public static void finishAllocatedAppAllocationRecording( + ActivitiesManager activitiesManager, ApplicationId applicationId, + ContainerId containerId, ActivityState containerState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisApp(applicationId)) { + activitiesManager.finishAppAllocationRecording(applicationId, + containerId, containerState); + } + } + + /* + * Invoked when scheduler finishes looking at this application within one + * node update, and the app DOESN'T have any container allocated/reserved + * during this allocation. + */ + public static void finishSkippedAppAllocationRecording( + ActivitiesManager activitiesManager, ApplicationId applicationId, + ActivityState containerState) { + finishAllocatedAppAllocationRecording(activitiesManager, applicationId, + null, containerState); + } + } + + /** + * Methods for recording activities from a queue + */ + public static class QUEUE { + /* + * Record activities of a queue + */ + public static void recordQueueActivity(ActivitiesManager activitiesManager, + SchedulerNode node, String parentQueueName, String queueName, + ActivityState state, String diagnostic) { + recordActivity(activitiesManager, node, parentQueueName, queueName, + Priority.UNDEFINED, state, diagnostic); + } + } + + /** + * Methods for recording overall activities from one node update + */ + public static class NODE { + + /* + * Invoked when node allocation finishes, and there's NO container + * allocated or reserved during the allocation + */ + public static void finishSkippedNodeAllocation( + ActivitiesManager activitiesManager, SchedulerNode node) { + finishAllocatedNodeAllocation(activitiesManager, node, null, + AllocationState.SKIPPED); + } + + /* + * Invoked when node allocation finishes, and there's any container + * allocated or reserved during the allocation + */ + public static void finishAllocatedNodeAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + ContainerId containerId, AllocationState containerState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + activitiesManager.updateAllocationFinalState(node.getNodeID(), + containerId, containerState); + } + } + + /* + * Invoked when node heartbeat finishes + */ + public static void finishNodeUpdateRecording( + ActivitiesManager activitiesManager, NodeId nodeID) { + if (activitiesManager == null) { + return; + } + activitiesManager.finishNodeUpdateRecording(nodeID); + } + + /* + * Invoked when node heartbeat starts + */ + public static void startNodeUpdateRecording( + ActivitiesManager activitiesManager, NodeId nodeID) { + if (activitiesManager == null) { + return; + } + activitiesManager.startNodeUpdateRecording(nodeID); + } + } + + // Add queue, application or container activity into specific node allocation. + private static void recordActivity(ActivitiesManager activitiesManager, + SchedulerNode node, String parentName, String childName, + Priority priority, ActivityState state, String diagnostic) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + parentName, childName, priority.toString(), state, diagnostic); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java new file mode 100644 index 0000000..d74f4e4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; +import org.apache.hadoop.yarn.util.SystemClock; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.List; +import java.util.Set; +import java.util.*; +import java.util.ArrayList; + +/** + * A class to store node or application allocations. + * It mainly contains operations for allocation start, add, update and finish. + */ +public class ActivitiesManager extends AbstractService { + private static final Log LOG = LogFactory.getLog(ActivitiesManager.class); + private ConcurrentMap> recordingNodesAllocation; + private ConcurrentMap> completedNodeAllocations; + private Set activeRecordedNodes; + private ConcurrentMap + recordingAppActivitiesUntilSpecifiedTime; + private ConcurrentMap appsAllocation; + private ConcurrentMap> + completedAppAllocations; + private boolean recordNextAvailableNode = false; + private List lastAvailableNodeActivities = null; + private Thread t; + private int timeThreshold = 600 * 1000; + + public ActivitiesManager() { + super(ActivitiesManager.class.getName()); + recordingNodesAllocation = new ConcurrentHashMap<>(); + completedNodeAllocations = new ConcurrentHashMap<>(); + appsAllocation = new ConcurrentHashMap<>(); + completedAppAllocations = new ConcurrentHashMap<>(); + activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>(); + } + + public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) { + List allocations = completedAppAllocations.get( + applicationId); + + return new AppActivitiesInfo(allocations, applicationId); + } + + public ActivitiesInfo getActivitiesInfo(String nodeId) { + List allocations; + if (nodeId == null) { + allocations = lastAvailableNodeActivities; + } else { + allocations = completedNodeAllocations.get(NodeId.fromString(nodeId)); + } + return new ActivitiesInfo(allocations, nodeId); + } + + public void recordNextNodeUpdateActivities(String nodeId) { + if (nodeId == null) { + recordNextAvailableNode = true; + } else { + activeRecordedNodes.add(NodeId.fromString(nodeId)); + } + } + + public void turnOnAppActivitiesRecording(ApplicationId applicationId, + double maxTime) { + long startTS = SystemClock.getInstance().getTime(); + long endTS = startTS + (long) (maxTime * 1000); + recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS); + } + + @Override + protected void serviceStart() throws Exception { + t = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + Iterator>> ite = + completedNodeAllocations.entrySet().iterator(); + while (ite.hasNext()) { + Map.Entry> nodeAllocation = ite.next(); + List allocations = nodeAllocation.getValue(); + long currTS = SystemClock.getInstance().getTime(); + if (allocations.size() > 0 && allocations.get(0).getTimeStamp() + - currTS > timeThreshold) { + ite.remove(); + } + } + } + } + }); + + t.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + t.interrupt(); + super.serviceStop(); + } + + void startNodeUpdateRecording(NodeId nodeID) { + if (recordNextAvailableNode) { + recordNextNodeUpdateActivities(nodeID.toString()); + } + if (activeRecordedNodes.contains(nodeID)) { + List nodeAllocation = new ArrayList<>(); + recordingNodesAllocation.put(nodeID, nodeAllocation); + } + } + + void startAppAllocationRecording(NodeId nodeID, long currTS, + SchedulerApplicationAttempt application) { + ApplicationId applicationId = application.getApplicationId(); + + if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + > currTS) { + appsAllocation.put(applicationId, + new AppAllocation(application.getPriority(), nodeID, + application.getQueueName())); + } + + if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + <= currTS) { + turnOffActivityMonitoringForApp(applicationId); + } + } + + // Add queue, application or container activity into specific node allocation. + void addSchedulingActivityForNode(NodeId nodeID, String parentName, + String childName, String priority, ActivityState state, + String diagnostic) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.addAllocationActivity(parentName, childName, priority, + state, diagnostic); + } + } + + // Add queue, application or container activity into specific application + // allocation. + void addSchedulingActivityForApp(ApplicationId applicationId, + String containerId, String priority, ActivityState state, + String diagnostic) { + if (shouldRecordThisApp(applicationId)) { + AppAllocation appAllocation = appsAllocation.get(applicationId); + appAllocation.addAppAllocationActivity(containerId, priority, state, + diagnostic); + } + } + + // Update container allocation meta status for this node allocation. + // It updates general container status but not the detailed activity state + // in updateActivityState. + void updateAllocationFinalState(NodeId nodeID, ContainerId containerId, + AllocationState containerState) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.updateContainerState(containerId, containerState); + } + } + + void finishAppAllocationRecording(ApplicationId applicationId, + ContainerId containerId, ActivityState appState) { + if (shouldRecordThisApp(applicationId)) { + long currTS = SystemClock.getInstance().getTime(); + AppAllocation appAllocation = appsAllocation.remove(applicationId); + appAllocation.updateAppContainerStateAndTime(containerId, appState, + currTS); + + List appAllocations; + if (completedAppAllocations.containsKey(applicationId)) { + appAllocations = completedAppAllocations.get(applicationId); + } else { + appAllocations = new ArrayList<>(); + completedAppAllocations.put(applicationId, appAllocations); + } + if (appAllocations.size() == 1000) { + appAllocations.remove(0); + } + appAllocations.add(appAllocation); + + if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + <= currTS) { + turnOffActivityMonitoringForApp(applicationId); + } + } + } + + void finishNodeUpdateRecording(NodeId nodeID) { + List value = recordingNodesAllocation.get(nodeID); + long timeStamp = SystemClock.getInstance().getTime(); + + if (value != null) { + if (value.size() > 0) { + lastAvailableNodeActivities = value; + for (NodeAllocation allocation : lastAvailableNodeActivities) { + allocation.transformToTree(); + allocation.setTimeStamp(timeStamp); + } + if (recordNextAvailableNode) { + recordNextAvailableNode = false; + } + } + + if (shouldRecordThisNode(nodeID)) { + recordingNodesAllocation.remove(nodeID); + completedNodeAllocations.put(nodeID, value); + stopRecordNodeUpdateActivities(nodeID); + } + } + } + + boolean shouldRecordThisApp(ApplicationId applicationId) { + return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && appsAllocation.containsKey(applicationId); + } + + boolean shouldRecordThisNode(NodeId nodeID) { + return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation + .containsKey(nodeID); + } + + private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) { + List nodeAllocations = recordingNodesAllocation.get(nodeID); + NodeAllocation nodeAllocation; + // When this node has already stored allocation activities, get the + // last allocation for this node. + if (nodeAllocations.size() != 0) { + nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1); + // When final state in last allocation is not DEFAULT, it means + // last allocation has finished. Create a new allocation for this node, + // and add it to the allocation list. Return this new allocation. + // + // When final state in last allocation is DEFAULT, + // it means last allocation has not finished. Just get last allocation. + if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) { + nodeAllocation = new NodeAllocation(nodeID); + nodeAllocations.add(nodeAllocation); + } + } + // When this node has not stored allocation activities, + // create a new allocation for this node, and add it to the allocation list. + // Return this new allocation. + else { + nodeAllocation = new NodeAllocation(nodeID); + nodeAllocations.add(nodeAllocation); + } + return nodeAllocation; + } + + private void stopRecordNodeUpdateActivities(NodeId nodeId) { + activeRecordedNodes.remove(nodeId); + } + + private void turnOffActivityMonitoringForApp(ApplicationId applicationId) { + recordingAppActivitiesUntilSpecifiedTime.remove(applicationId); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java new file mode 100644 index 0000000..62648dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of diagnostics. + */ +public class ActivityDiagnosticConstant { + // EMPTY means it does not have any diagnostic to display. + // In order not to show "diagnostic" line in frontend, + // we set the value to null. + public final static String EMPTY = null; + public final static String NOT_ABLE_TO_ACCESS_PARTITION = + "not able to access partition"; + public final static String DO_NOT_NEED_MORE_RESOURCE = + "do not need more resource"; + public final static String QUEUE_MAX_CAPACITY_LIMIT = + "queue max-capacity limit"; + public final static String USER_CAPACITY_MAXIMUM_LIMIT = + "user capacity maximum limit"; + public final static String SKIP_BLACK_LISTED_NODE = + "skip am allocation in black listed node"; + public final static String PRIORITY_SKIPPED = "priority skipped"; + public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE = + "skipping assigning to Node in Ignore Exclusivity mode"; + public final static String DO_NOT_NEED_ALLOCATIONATTEMPTINFOS = + "do not need allocationAttemptInfos based on reservation algo"; + public final static String QUEUE_SKIPPED_HEADROOM = + "queue skipped because of headroom"; + public final static String NON_PARTITIONED_PARTITION_FIRST = + "non-partitioned resource request should be scheduled to " + + "non-partitioned partition first"; + public final static String SKIP_NODE_LOCAL_REQUEST = + "skip node-local request"; + public final static String SKIP_RACK_LOCAL_REQUEST = + "skip rack-local request"; + public final static String SKIP_AM_ALLOCATION = + "skip am allocation due to locality"; + public final static String REQUEST_CAN_NOT_ACCESS = + "resource request can not access the label"; + public final static String NOT_SUFFICIENT_RESOURCE = + "node does not have sufficient resource for request"; + public final static String LOCALITY_SKIPPED = "locality skipped"; + public final static String FAIL_TO_ALLOCATE = "fail to allocate"; + public final static String COULD_NOT_GET_CONTAINER = + "couldn't get container for allocation"; + public final static String DO_NOT_NEED_RESOURCE = "do not need more resource"; + public final static String SKIPPED_ALL_PRIORITIES = + "skipped all priorities of the app"; + public final static String RESPECT_FIFO = "respect FIFO of applications"; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java new file mode 100644 index 0000000..5b98443 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import java.util.LinkedList; +import java.util.List; + +/* + * It represents tree node in "NodeAllocation" tree structure. + * Each node may represent queue, application or container in allocation activity. + * Node may have children node if successfully allocated to next level. + */ +public class ActivityNode { + private String activityNodeName; + private String parentName; + private String priority; + private ActivityState state; + private String diagnostic; + + private List childNode; + + public ActivityNode(String activityNodeName, String parentName, + String priority, ActivityState state, String diagnostic) { + this.activityNodeName = activityNodeName; + this.parentName = parentName; + this.priority = priority; + this.state = state; + this.diagnostic = diagnostic; + this.childNode = new LinkedList<>(); + } + + public String getName() { + return this.activityNodeName; + } + + public String getParentName() { + return this.parentName; + } + + public void addChild(ActivityNode node) { + childNode.add(0, node); + } + + public List getChildren() { + return this.childNode; + } + + public ActivityState getState() { + return this.state; + } + + public String getDiagnostic() { + return this.diagnostic; + } + + public String getPriority() { + return priority; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.activityNodeName + " "); + sb.append(this.priority + " "); + sb.append(this.state + " "); + if (!this.diagnostic.equals("")) { + sb.append(this.diagnostic + "\n"); + } + sb.append("\n"); + for (ActivityNode child : childNode) { + sb.append(child.toString() + "\n"); + } + return sb.toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java new file mode 100644 index 0000000..bce1fc9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of activity operation states. + */ +public enum ActivityState { + // default state when adding a new activity in node allocation + DEFAULT, + // container is allocated to sub-queues/applications or this queue/application + ACCEPTED, + // queue or application voluntarily give up to use the resource OR + // nothing allocated + SKIPPED, + // container could not be allocated to sub-queues or this application + REJECTED, + ALLOCATED, // successfully allocate a new non-reserved container + RESERVED, // successfully reserve a new container + RE_RESERVED // successfully reserve a new container +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java new file mode 100644 index 0000000..ffe7493 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * It records an activity operation in allocation, + * which can be classified as queue, application or container activity. + * Other information include state, diagnostic, priority. + */ +public class AllocationActivity { + private String childName = null; + private String parentName = null; + private String priority = null; + private ActivityState state; + private String diagnostic; + + private static final Log LOG = LogFactory.getLog(AllocationActivity.class); + + public AllocationActivity(String parentName, String queueName, + String priority, ActivityState state, String diagnostic) { + this.childName = queueName; + this.parentName = parentName; + this.priority = priority; + this.state = state; + this.diagnostic = diagnostic; + } + + public ActivityNode createTreeNode() { + return new ActivityNode(this.childName, this.parentName, this.priority, + this.state, this.diagnostic); + } + + public String getName() { + return this.childName; + } + + public String getState() { + return this.state.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java new file mode 100644 index 0000000..e38cefc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of allocation final states. + */ +public enum AllocationState { + DEFAULT, + // queue or application voluntarily give up to use the resource + // OR nothing allocated + SKIPPED, + // successfully allocate a new non-reserved container + ALLOCATED, + // successfully allocate a new container from an existing reserved container + ALLOCATED_FROM_RESERVED, + // successfully reserve a new container + RESERVED +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java new file mode 100644 index 0000000..fa0394c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; + +import java.util.ArrayList; +import java.util.List; + +/* + * It contains allocation information for one application within a period of + * time. + * Each application allocation may have several allocation attempts. + */ +public class AppAllocation { + private Priority priority = null; + private NodeId nodeId; + private ContainerId containerId = null; + private ActivityState appState = null; + private String queueName = null; + private List allocationAttempts; + private long timestamp; + + public AppAllocation(Priority priority, NodeId nodeId, String queueName) { + this.priority = priority; + this.nodeId = nodeId; + this.allocationAttempts = new ArrayList<>(); + this.queueName = queueName; + } + + public void updateAppContainerStateAndTime(ContainerId containerId, + ActivityState appState, long ts) { + this.timestamp = ts; + this.containerId = containerId; + this.appState = appState; + } + + public void addAppAllocationActivity(String containerId, String priority, + ActivityState state, String diagnostic) { + ActivityNode container = new ActivityNode(containerId, null, priority, + state, diagnostic); + this.allocationAttempts.add(container); + if (state == ActivityState.REJECTED) { + this.appState = ActivityState.SKIPPED; + } else { + this.appState = state; + } + } + + public String getNodeId() { + return nodeId.toString(); + } + + public String getQueueName() { + return queueName; + } + + public ActivityState getAppState() { + return appState; + } + + public String getPriority() { + if (priority == null) { + return null; + } + return priority.toString(); + } + + public String getContainerId() { + if (containerId == null) { + return null; + } + return containerId.toString(); + } + + public long getTime() { + return this.timestamp; + } + + public List getAllocationAttempts() { + return allocationAttempts; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java new file mode 100644 index 0000000..af47be6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/* + * It contains allocation information for one allocation in a node heartbeat. + * Detailed allocation activities are first stored in "AllocationActivity" + * as operations, then transformed to a tree structure. + * Tree structure starts from root queue and ends in leaf queue, + * application or container allocation. + */ +public class NodeAllocation { + private NodeId nodeId; + private long timeStamp; + private ContainerId containerId = null; + private AllocationState containerState = AllocationState.DEFAULT; + private List allocationOperations; + + private ActivityNode root = null; + + private static final Log LOG = LogFactory.getLog(NodeAllocation.class); + + public NodeAllocation(NodeId nodeId) { + this.nodeId = nodeId; + this.allocationOperations = new ArrayList<>(); + } + + public void addAllocationActivity(String parentName, String childName, + String priority, ActivityState state, String diagnostic) { + AllocationActivity allocate = new AllocationActivity(parentName, childName, + priority, state, diagnostic); + this.allocationOperations.add(allocate); + } + + public void updateContainerState(ContainerId containerId, + AllocationState containerState) { + this.containerId = containerId; + this.containerState = containerState; + } + + // In node allocation, transform each activity to a tree-like structure + // for frontend activity display. + // eg: root + // / \ + // a b + // / \ + // app1 app2 + // / \ + // CA1 CA2 + // CA means Container Attempt + public void transformToTree() { + List allocationTree = new ArrayList<>(); + + if (root == null) { + Set names = Collections.newSetFromMap(new ConcurrentHashMap<>()); + ListIterator ite = allocationOperations.listIterator( + allocationOperations.size()); + while (ite.hasPrevious()) { + String name = ite.previous().getName(); + if (name != null) { + if (!names.contains(name)) { + names.add(name); + } else { + ite.remove(); + } + } + } + + for (AllocationActivity allocationOperation : allocationOperations) { + ActivityNode node = allocationOperation.createTreeNode(); + String name = node.getName(); + for (int i = allocationTree.size() - 1; i > -1; i--) { + if (allocationTree.get(i).getParentName().equals(name)) { + node.addChild(allocationTree.get(i)); + allocationTree.remove(i); + } else { + break; + } + } + allocationTree.add(node); + } + root = allocationTree.get(0); + } + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public long getTimeStamp() { + return this.timeStamp; + } + + public AllocationState getFinalAllocationState() { + return containerState; + } + + public String getContainerId() { + if (containerId == null) + return null; + return containerId.toString(); + } + + public ActivityNode getRoot() { + return root; + } + + public String getNodeId() { + return nodeId.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9c88154..1d8f929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -91,12 +92,15 @@ protected CapacitySchedulerContext csContext; protected YarnAuthorizationProvider authorizer = null; - public AbstractCSQueue(CapacitySchedulerContext cs, + protected ActivitiesManager activitiesManager; + + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); this.parent = parent; this.queueName = queueName; this.resourceCalculator = cs.getResourceCalculator(); + this.activitiesManager = cs.getActivitiesManager(); // must be called after parent and queueName is set this.metrics = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee62a70..22b0ee4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -92,22 +92,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; @@ -307,6 +297,8 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); authorizer = YarnAuthorizationProvider.getInstance(yarnConf); + this.activitiesManager = new ActivitiesManager(); + activitiesManager.init(conf); initializeQueues(this.conf); this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); @@ -344,6 +336,7 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { startSchedulerThreads(); + activitiesManager.start(); super.serviceStart(); } @@ -523,7 +516,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) Map newQueues = new HashMap(); CSQueue newRoot = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - newQueues, queues, noop); + newQueues, queues, noop); // Ensure all existing queues are still present validateExistingQueues(queues, newQueues); @@ -650,7 +643,7 @@ static CSQueue parseQueue( throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = + ParentQueue parentQueue = new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests @@ -802,7 +795,7 @@ private synchronized void addApplicationAttempt( FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority(), isAttemptRecovering); + application.getPriority(), isAttemptRecovering, activitiesManager); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); @@ -1226,6 +1219,7 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { + FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(reservedContainer.getContainerId()); @@ -1255,6 +1249,19 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { tmp.getAssignmentInformation().incrAllocations(); updateSchedulerHealth(lastNodeUpdateTime, node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), + AllocationState.ALLOCATED_FROM_RESERVED); + } else { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), AllocationState.SKIPPED); } } @@ -1364,7 +1371,11 @@ public void handle(SchedulerEvent event) { setLastNodeUpdateTime(Time.now()); nodeUpdate(node); if (!scheduleAsynchronously) { + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + node.getNodeID()); allocateContainersToNode(getNode(node.getNodeID())); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + node.getNodeID()); } } break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index b39b289..c41a7bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; @@ -80,4 +81,6 @@ * cluster. */ ResourceUsage getClusterResourceUsage(); + + ActivitiesManager getActivitiesManager(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6dcafec..536b505 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -19,15 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -59,14 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -74,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -134,7 +124,7 @@ public LeafQueue(CapacitySchedulerContext cs, super(cs, queueName, parent, old); this.scheduler = cs; - this.activeUsersManager = new ActiveUsersManager(metrics); + this.activeUsersManager = new ActiveUsersManager(metrics); // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); @@ -143,7 +133,7 @@ public LeafQueue(CapacitySchedulerContext cs, LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); } - + setupQueueConfigs(cs.getClusterResource()); } @@ -861,7 +851,7 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } - + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, @@ -880,6 +870,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if (reservedContainer != null) { FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); + + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + synchronized (application) { CSAssignment assignment = application.assignContainers(clusterResource, node, @@ -894,6 +888,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); return CSAssignment.NULL_ASSIGNMENT; } @@ -906,17 +904,29 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.DO_NOT_NEED_MORE_RESOURCE); return CSAssignment.NULL_ASSIGNMENT; } for (Iterator assignmentIterator = orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) { FiCaSchedulerApp application = assignmentIterator.next(); + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); // Check queue max-capacity limit if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, + application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } @@ -929,6 +939,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, application, node.getPartition(), currentResourceLimits)) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, + application, application.getPriority(), + ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); continue; } @@ -970,16 +984,32 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, incReservedResource(node.getPartition(), reservedRes); } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + // Done return assignment; } else if (assignment.getSkipped()) { + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, + application.getApplicationId(), ActivityState.SKIPPED); application.updateNodeInfoForAMDiagnostics(node); } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.RESPECT_FIFO); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, + application.getApplicationId(), ActivityState.SKIPPED); return CSAssignment.NULL_ASSIGNMENT; } } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c1..ab416f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -37,18 +37,16 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -82,7 +80,7 @@ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - public ParentQueue(CapacitySchedulerContext cs, + public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; @@ -99,14 +97,14 @@ public ParentQueue(CapacitySchedulerContext cs, "capacity of " + rawCapacity + " for queue " + queueName + ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - + this.childQueues = new TreeSet(nonPartitionedQueueComparator); - + setupQueueConfigs(cs.getClusterResource()); - LOG.info("Initialized parent-queue " + queueName + - " name=" + queueName + - ", fullname=" + getQueuePath()); + LOG.info("Initialized parent-queue " + queueName + + " name=" + queueName + + ", fullname=" + getQueuePath()); } synchronized void setupQueueConfigs(Resource clusterResource) @@ -381,6 +379,10 @@ private synchronized void removeApplication(ApplicationId applicationId, " #applications: " + getNumApplications()); } + private String getParentName() { + return getParent() != null ? getParent().getQueueName() : ""; + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, @@ -393,6 +395,16 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it is not able to access partition=" + node .getPartition()); } + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + return CSAssignment.NULL_ASSIGNMENT; } @@ -405,6 +417,15 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.DO_NOT_NEED_MORE_RESOURCE); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + return CSAssignment.NULL_ASSIGNMENT; } @@ -424,9 +445,18 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, resourceLimits, Resources.createResource( getMetrics().getReservedMB(), getMetrics() .getReservedVirtualCores()), schedulingMode)) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + break; } - + // Schedule CSAssignment assignedToChild = assignContainersToChildQueues(clusterResource, node, resourceLimits, @@ -437,6 +467,29 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if (Resources.greaterThan( resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + + if (node.getReservedContainer() == null) { + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.ALLOCATED); + } + } else { + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.RESERVED); + } + } + // Track resource utilization for the parent-queue allocateResource(clusterResource, assignedToChild.getResource(), node.getPartition(), assignedToChild.isIncreasedAllocation()); @@ -474,6 +527,13 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, " cluster=" + clusterResource); } else { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } break; } @@ -585,8 +645,8 @@ private synchronized CSAssignment assignContainersToChildQueues( // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); - - assignment = childQueue.assignContainers(cluster, node, + + assignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + @@ -613,7 +673,7 @@ private synchronized CSAssignment assignContainersToChildQueues( break; } } - + return assignment; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235..7103e76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -24,6 +24,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -43,23 +46,31 @@ FiCaSchedulerApp application; final ResourceCalculator rc; final RMContext rmContext; - + ActivitiesManager activitiesManager; + public AbstractContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { + this(application, rc, rmContext, null); + } + + public AbstractContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext, + ActivitiesManager activitiesManager) { this.application = application; this.rc = rc; this.rmContext = rmContext; + this.activitiesManager = activitiesManager; } protected CSAssignment getCSAssignmentFromAllocateResult( Resource clusterResource, ContainerAllocation result, - RMContainer rmContainer) { + RMContainer rmContainer, FiCaSchedulerNode node) { // Handle skipped boolean skipped = (result.getAllocationState() == AllocationState.APP_SKIPPED); CSAssignment assignment = new CSAssignment(skipped); assignment.setApplication(application); - + // Handle excess reservation assignment.setExcessReservation(result.getContainerToBeUnreserved()); @@ -83,6 +94,22 @@ protected CSAssignment getCSAssignmentFromAllocateResult( assignment.getAssignmentInformation().incrReservations(); Resources.addTo(assignment.getAssignmentInformation().getReserved(), allocatedResource); + + if (rmContainer != null) { + ActivitiesLogger.APP.recordAppActivityWithAllocation( + activitiesManager, node, application, updatedContainer, + ActivityState.RE_RESERVED); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED); + } else { + ActivitiesLogger.APP.recordAppActivityWithAllocation( + activitiesManager, node, application, updatedContainer, + ActivityState.RESERVED); + ActivitiesLogger.APP.finishAllocatedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + updatedContainer.getId(), ActivityState.RESERVED); + } } else if (result.getAllocationState() == AllocationState.ALLOCATED){ // This is a new container // Inform the ordering policy @@ -103,21 +130,28 @@ protected CSAssignment getCSAssignmentFromAllocateResult( assignment.getAssignmentInformation().incrAllocations(); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), allocatedResource); - + if (rmContainer != null) { assignment.setFulfilledReservation(true); } + + ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager, + node, application, updatedContainer, ActivityState.ALLOCATED); + ActivitiesLogger.APP.finishAllocatedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + updatedContainer.getId(), ActivityState.ACCEPTED); + } assignment.setContainersToKill(result.getToKillContainers()); } - + return assignment; } - + /** * allocate needs to handle following stuffs: - * + * *
    *
  • Select request: Select a request to allocate. E.g. select a resource * request based on requirement/priority/locality.
  • diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 3be8e0e..4eaa24b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -36,12 +37,17 @@ public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { + this(application, rc, rmContext, null); + } + + public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, + RMContext rmContext, ActivitiesManager activitiesManager) { super(application, rc, rmContext); increaseContainerAllocator = new IncreaseContainerAllocator(application, rc, rmContext); - regularContainerAllocator = - new RegularContainerAllocator(application, rc, rmContext); + regularContainerAllocator = new RegularContainerAllocator(application, rc, + rmContext, activitiesManager); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index aae5292..6319be2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -29,10 +29,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -53,10 +54,11 @@ private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); private ResourceRequest lastResourceRequest = null; - + public RegularContainerAllocator(FiCaSchedulerApp application, - ResourceCalculator rc, RMContext rmContext) { - super(application, rc, rmContext); + ResourceCalculator rc, RMContext rmContext, + ActivitiesManager activitiesManager) { + super(application, rc, rmContext, activitiesManager); } private boolean checkHeadroom(Resource clusterResource, @@ -84,12 +86,18 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE); return ContainerAllocation.APP_SKIPPED; } ResourceRequest anyRequest = application.getResourceRequest(priority, ResourceRequest.ANY); if (null == anyRequest) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -98,6 +106,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, // Do we need containers at this 'priority'? if (application.getTotalRequiredResources(priority) <= 0) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -112,6 +123,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, } application.updateAppSkipNodeDiagnostics( "Skipping assigning to Node in Ignore Exclusivity mode. "); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE); return ContainerAllocation.APP_SKIPPED; } } @@ -122,6 +136,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( anyRequest.getNodeLabelExpression(), node.getPartition(), schedulingMode)) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -130,6 +147,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug("doesn't need containers based on reservation algo!"); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS); return ContainerAllocation.PRIORITY_SKIPPED; } } @@ -139,6 +159,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, LOG.debug("cannot allocate required resource=" + required + " because of headroom"); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM); return ContainerAllocation.QUEUE_SKIPPED; } @@ -170,7 +193,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, + missedNonPartitionedRequestSchedulingOpportunity + " required=" + rmContext.getScheduler().getNumClusterNodes()); } - + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST); return ContainerAllocation.APP_SKIPPED; } } @@ -294,6 +319,9 @@ private ContainerAllocation assignNodeLocalContainers( } // Skip node-local request, go to rack-local request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -308,6 +336,9 @@ private ContainerAllocation assignRackLocalContainers( } // Skip rack-local request, go to off-switch request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -323,6 +354,9 @@ private ContainerAllocation assignOffSwitchContainers( application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_AM_ALLOCATION); return ContainerAllocation.APP_SKIPPED; } @@ -354,6 +388,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, application.getResourceRequest(priority, node.getRackName()); if (rackLocalResourceRequest != null) { if (!rackLocalResourceRequest.getRelaxLocality()) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -377,6 +414,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, application.getResourceRequest(priority, ResourceRequest.ANY); if (offSwitchResourceRequest != null) { if (!offSwitchResourceRequest.getRelaxLocality()) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } if (requestType != NodeType.NODE_LOCAL @@ -398,7 +438,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, return allocation; } - + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -421,6 +463,10 @@ private ContainerAllocation assignContainer(Resource clusterResource, // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed // We should un-reserve this container. + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, priority, + ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS, + ActivityState.REJECTED); return new ContainerAllocation(rmContainer, null, AllocationState.LOCALITY_SKIPPED); } @@ -435,6 +481,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); // Skip this locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -513,6 +562,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, // continue. if (null == unreservedContainer) { // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -537,6 +589,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, LOG.debug("we needed to unreserve to be able to allocate"); } // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -549,6 +604,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, return result; } // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -623,6 +681,9 @@ private ContainerAllocation handleNewContainerAllocation( ContainerAllocation ret = new ContainerAllocation(allocationResult.containerToBeUnreserved, null, AllocationState.APP_SKIPPED); + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, priority, + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED); return ret; } @@ -649,6 +710,10 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, application .updateAppSkipNodeDiagnostics("Scheduling of container failed. "); LOG.warn("Couldn't get container for allocation!"); + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, priority, + ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER, + ActivityState.REJECTED); return ContainerAllocation.APP_SKIPPED; } @@ -721,6 +786,9 @@ public CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-label=" + node.getPartition()); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.DO_NOT_NEED_RESOURCE); return CSAssignment.SKIP_ASSIGNMENT; } @@ -735,18 +803,21 @@ public CSAssignment assignContainers(Resource clusterResource, continue; } return getCSAssignmentFromAllocateResult(clusterResource, result, - null); + null, node); } // We will reach here if we skipped all priorities of the app, so we will // skip the app. + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); return CSAssignment.SKIP_ASSIGNMENT; } else { ContainerAllocation result = allocate(clusterResource, node, schedulingMode, resourceLimits, reservedContainer.getReservedPriority(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, - reservedContainer); + reservedContainer, node); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 8009580..5e70b8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -48,13 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; @@ -107,8 +102,16 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { + this(applicationAttemptId, user, queue, activeUsersManager, rmContext, + appPriority, isAttemptRecovering, null); + } + + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, Priority appPriority, boolean isAttemptRecovering, + ActivitiesManager activitiesManager) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); - + RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); Resource amResource; @@ -138,8 +141,9 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, if (scheduler.getResourceCalculator() != null) { rc = scheduler.getResourceCalculator(); } - - containerAllocator = new ContainerAllocator(this, rc, rmContext); + + containerAllocator = new ContainerAllocator(this, rc, rmContext, + activitiesManager); if (scheduler instanceof CapacityScheduler) { capacitySchedulerContext = (CapacitySchedulerContext) scheduler; @@ -188,7 +192,7 @@ public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, return null; } - // Required sanity check - AM can call 'allocate' to update resource + // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check if (getTotalRequiredResources(priority) <= 0) { return null; @@ -489,7 +493,7 @@ public RMContainer findNodeToUnreserve(Resource clusterResource, public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 75bffc7..2e76d83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -130,9 +130,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; @@ -176,6 +180,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webapp.WebServices; @@ -577,6 +582,112 @@ public AppsInfo getApps(@Context HttpServletRequest hsr, } @GET + @Path("/scheduler/activities") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, + @QueryParam("nodeId") String nodeId) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + + if (scheduler instanceof AbstractYarnScheduler) { + String errMessage = ""; + + AbstractYarnScheduler abstractYarnScheduler = + (AbstractYarnScheduler) scheduler; + + ActivitiesManager activitiesManager = + abstractYarnScheduler.getActivitiesManager(); + if (null == activitiesManager) { + errMessage = "Not Capacity Scheduler"; + return new ActivitiesInfo(errMessage, nodeId); + } + + List nodeList = + abstractYarnScheduler.getNodeTracker().getAllNodes(); + + boolean illegalInput = false; + + if (nodeList.size() == 0) { + illegalInput = true; + errMessage = "No node manager running in the cluster"; + } else { + if (nodeId != null) { + String hostName = nodeId; + String portName = ""; + if (nodeId.contains(":")) { + int index = nodeId.indexOf(":"); + hostName = nodeId.substring(0, index); + portName = nodeId.substring(index + 1); + } + + boolean correctNodeId = false; + for (FiCaSchedulerNode node : nodeList) { + if ((portName.equals("") && node.getRMNode().getHostName().equals( + hostName)) || (!portName.equals("") && node.getRMNode() + .getHostName().equals(hostName) && String.valueOf( + node.getRMNode().getCommandPort()).equals(portName))) { + correctNodeId = true; + nodeId = node.getNodeID().toString(); + break; + } + } + if (!correctNodeId) { + illegalInput = true; + errMessage = "Cannot find node manager with given node id"; + } + } + } + + if (!illegalInput) { + activitiesManager.recordNextNodeUpdateActivities(nodeId); + return activitiesManager.getActivitiesInfo(nodeId); + } + + // Return a activities info with error message + return new ActivitiesInfo(errMessage, nodeId); + } + + return null; + } + + @GET + @Path("/scheduler/app-activities") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, + @QueryParam("appId") String appId, @QueryParam("maxTime") String time) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + + if (scheduler instanceof AbstractYarnScheduler) { + AbstractYarnScheduler abstractYarnScheduler = + (AbstractYarnScheduler) scheduler; + + ActivitiesManager activitiesManager = + abstractYarnScheduler.getActivitiesManager(); + if (null == activitiesManager) { + String errMessage = "Not Capacity Scheduler"; + return new AppActivitiesInfo(errMessage, appId); + } + + double maxTime = 3.0; + + if (time != null) { + if (time.contains(".")) { + maxTime = Double.parseDouble(time); + } else { + maxTime = Double.parseDouble(time + ".0"); + } + } + + ApplicationId applicationId = ApplicationId.fromString(appId); + activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime); + AppActivitiesInfo appActivitiesInfo = + activitiesManager.getAppActivitiesInfo(applicationId); + + return appActivitiesInfo; + } + return null; + } + + @GET @Path("/appstatistics") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public ApplicationStatisticsInfo getAppStatistics( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java new file mode 100644 index 0000000..ac0f37b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.List; +import java.util.ArrayList; + +/* + * DAO object to display node allocation activity. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class ActivitiesInfo { + protected String nodeId; + protected String timeStamp; + protected String diagnostic = null; + protected List allocations; + + private static final Log LOG = LogFactory.getLog(ActivitiesInfo.class); + + public ActivitiesInfo() { + } + + public ActivitiesInfo(String errorMessage, String nodeId) { + this.diagnostic = errorMessage; + this.nodeId = nodeId; + } + + public ActivitiesInfo(List nodeAllocations, String nodeId) { + this.nodeId = nodeId; + this.allocations = new ArrayList<>(); + + if (nodeAllocations == null) { + diagnostic = (nodeId != null ? + "waiting for display" : + "waiting for next allocation"); + } else { + if (nodeAllocations.size() == 0) { + diagnostic = "do not have available resources"; + } else { + this.nodeId = nodeAllocations.get(0).getNodeId(); + this.timeStamp = String.valueOf(nodeAllocations.get(0).getTimeStamp()); + for (int i = 0; i < nodeAllocations.size(); i++) { + NodeAllocation nodeAllocation = nodeAllocations.get(i); + NodeAllocationInfo allocationInfo = new NodeAllocationInfo( + nodeAllocation); + this.allocations.add(allocationInfo); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java new file mode 100644 index 0000000..7b8d00b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/* + * DAO object to display node information in allocation tree. + * It corresponds to "ActivityNode" class. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class ActivityNodeInfo { + protected String name; // The name for activity node + protected String priority; + protected String allocationState; + protected String diagnostic; + + protected List children; + + ActivityNodeInfo() { + } + + ActivityNodeInfo(ActivityNode node) { + this.name = node.getName(); + this.priority = node.getPriority(); + this.allocationState = node.getState().name(); + this.diagnostic = node.getDiagnostic(); + this.children = new ArrayList<>(); + + for (ActivityNode child : node.getChildren()) { + ActivityNodeInfo containerInfo = new ActivityNodeInfo(child); + this.children.add(containerInfo); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java new file mode 100644 index 0000000..b359f1e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation; +import org.apache.hadoop.yarn.util.SystemClock; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/* + * DAO object to display application activity. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class AppActivitiesInfo { + protected String applicationId; + protected String diagnostic; + protected String timeStamp; + protected List allocations; + + private static final Log LOG = LogFactory.getLog(AppActivitiesInfo.class); + + public AppActivitiesInfo() { + } + + public AppActivitiesInfo(String errorMessage, String applicationId) { + this.diagnostic = errorMessage; + this.applicationId = applicationId; + this.timeStamp = String.valueOf(SystemClock.getInstance().getTime()); + } + + public AppActivitiesInfo(List appAllocations, + ApplicationId applicationId) { + this.applicationId = applicationId.toString(); + this.allocations = new ArrayList<>(); + + if (appAllocations == null) { + diagnostic = "waiting for display"; + timeStamp = String.valueOf(SystemClock.getInstance().getTime()); + } else { + for (int i = 0; i < appAllocations.size(); i++) { + AppAllocation appAllocation = appAllocations.get(i); + AppAllocationInfo appAllocationInfo = new AppAllocationInfo( + appAllocation); + this.allocations.add(appAllocationInfo); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java new file mode 100644 index 0000000..a4a8d57 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/* + * DAO object to display application allocation detailed information. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class AppAllocationInfo { + protected String nodeId; + protected String queueName; + protected String priority; + protected String allocatedContainerId; + protected String allocationState; + protected String timeStamp; + protected List allocationAttempt; + + private static final Log LOG = LogFactory.getLog(AppAllocationInfo.class); + + AppAllocationInfo() { + } + + AppAllocationInfo(AppAllocation allocation) { + this.allocationAttempt = new ArrayList<>(); + + this.nodeId = allocation.getNodeId(); + this.queueName = allocation.getQueueName(); + this.priority = allocation.getPriority(); + this.allocatedContainerId = allocation.getContainerId(); + this.allocationState = allocation.getAppState().name(); + this.timeStamp = String.valueOf(allocation.getTime()); + + for (ActivityNode attempt : allocation.getAllocationAttempts()) { + ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt); + this.allocationAttempt.add(containerInfo); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java new file mode 100644 index 0000000..1350a76 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/* + * DAO object to display each node allocation in node heartbeat. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class NodeAllocationInfo { + protected String allocatedContainerId; + protected String finalAllocationState; + protected ActivityNodeInfo root = null; + + private static final Log LOG = LogFactory.getLog(NodeAllocationInfo.class); + + NodeAllocationInfo() { + } + + NodeAllocationInfo(NodeAllocation allocation) { + this.allocatedContainerId = allocation.getContainerId(); + this.finalAllocationState = allocation.getFinalAllocationState().name(); + + root = new ActivityNodeInfo(allocation.getRoot()); + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index 649d719..bbdfdd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -62,9 +62,9 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase { - private static MockRM rm; - private static CapacitySchedulerConfiguration csConf; - private static YarnConfiguration conf; + protected static MockRM rm; + protected static CapacitySchedulerConfiguration csConf; + protected static YarnConfiguration conf; private class QueueInfo { float capacity; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java new file mode 100644 index 0000000..c84526f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -0,0 +1,772 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Test; + +import javax.ws.rs.core.MediaType; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class TestRMWebServicesSchedulerActivities + extends TestRMWebServicesCapacitySched { + + @Test + public void testAssignMultipleContainersPerNodeHeartbeat() + throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1:1234"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 11); + + JSONArray allocations = json.getJSONArray("allocations"); + for (int i = 0; i < allocations.length(); i++) { + if (i != allocations.length() - 1) { + verifyStateOfAllocations(allocations.getJSONObject(i), + "finalAllocationState", "ALLOCATED"); + verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1"); + } else { + verifyStateOfAllocations(allocations.getJSONObject(i), + "finalAllocationState", "SKIPPED"); + verifyQueueOrder(allocations.getJSONObject(i), "root-a-b"); + } + } + } + finally { + rm.stop(); + } + } + + @Test + public void testAssignWithoutAvailableResource() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testNoNM() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + try { + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1:1234"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testWrongNodeId() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.0"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testReserveNewContainer() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, + rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, + rm.getResourceTrackerService()); + + nm1.registerNode(); + nm2.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096), + 10)), null); + + // Reserve new container + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b3-b1"); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", "RESERVED"); + + // Do a node heartbeat again without releasing container from app2 + r = resource(); + params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "b1"); + + allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", "SKIPPED"); + + // Finish application 2 + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ContainerId containerId = ContainerId.newContainerId( + am2.getApplicationAttemptId(), 1); + cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED); + + // Do a node heartbeat again + r = resource(); + params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "b1"); + + allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", + "ALLOCATED_FROM_RESERVED"); + } + finally { + rm.stop(); + } + } + + @Test + public void testActivityJSON() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", + "ALLOCATED"); + + verifyNumberOfNodes(allocations, 6); + + verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1"); + } + finally { + rm.stop(); + } + } + + private void verifyNumberOfNodes(JSONObject allocation, int realValue) + throws Exception { + if (allocation.isNull("root")) { + assertEquals("State of allocation is wrong", 0, realValue); + } else { + assertEquals("State of allocation is wrong", + 1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue); + } + } + + private int getNumberOfNodes(JSONObject allocation) throws Exception { + if (!allocation.isNull("children")) { + Object object = allocation.get("children"); + if (object.getClass() == JSONObject.class) { + return 1 + getNumberOfNodes((JSONObject) object); + } else { + int count = 0; + for (int i = 0; i < ((JSONArray) object).length(); i++) { + count += (1 + getNumberOfNodes( + ((JSONArray) object).getJSONObject(i))); + } + return count; + } + } else { + return 0; + } + } + + private void verifyStateOfAllocations(JSONObject allocation, + String nameToCheck, String realState) throws Exception { + assertEquals("State of allocation is wrong", allocation.get(nameToCheck), + realState); + } + + private void verifyNumberOfAllocations(JSONObject json, int realValue) + throws Exception { + if (json.isNull("allocations")) { + assertEquals("Number of allocations is wrong", 0, realValue); + } else { + Object object = json.get("allocations"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations is wrong", + ((JSONArray) object).length(), realValue); + } + } + } + + private void verifyQueueOrder(JSONObject json, String realOrder) + throws Exception { + String order = ""; + if (!json.isNull("root")) { + JSONObject root = json.getJSONObject("root"); + order = root.getString("name") + "-" + getQueueOrder(root); + } + assertEquals("Order of queue is wrong", + order.substring(0, order.length() - 1), realOrder); + } + + private String getQueueOrder(JSONObject node) throws Exception { + if (!node.isNull("children")) { + Object children = node.get("children"); + if (children.getClass() == JSONObject.class) { + if (!((JSONObject) children).getString("priority").equals("-1")) { + return ""; + } + return ((JSONObject) children).getString("name") + "-" + getQueueOrder( + (JSONObject) children); + } else if (children.getClass() == JSONArray.class) { + String order = ""; + for (int i = 0; i < ((JSONArray) children).length(); i++) { + JSONObject child = (JSONObject) ((JSONArray) children).get(i); + if (!child.getString("priority").equals("-1")) { + return ""; + } + order += (child.getString("name") + "-" + getQueueOrder(child)); + } + return order; + } + } + return ""; + } + + private void verifyNumberOfAllocationAttempts(JSONObject allocation, + int realValue) throws Exception { + if (allocation.isNull("allocationAttempt")) { + assertEquals("Number of allocation attempts is wrong", 0, realValue); + } else { + Object object = allocation.get("allocationAttempt"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations attempts is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations attempts is wrong", + ((JSONArray) object).length(), realValue); + } + } + } + + @Test + public void testAppActivityJSON() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED"); + + verifyNumberOfAllocationAttempts(allocations, 1); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppAssignMultipleContainersPerNodeHeartbeat() + throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 10); + + JSONArray allocations = json.getJSONArray("allocations"); + for (int i = 0; i < allocations.length(); i++) { + verifyStateOfAllocations(allocations.getJSONObject(i), + "allocationState", "ACCEPTED"); + } + } + finally { + rm.stop(); + } + } + + @Test + public void testAppAssignWithoutAvailableResource() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppNoNM() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppReserveNewContainer() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, + rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, + rm.getResourceTrackerService()); + + nm1.registerNode(); + nm2.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096), + 10)), null); + + // Reserve new container + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + // Do a node heartbeat again without releasing container from app2 + r = resource(); + params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 2); + + // Finish application 2 + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ContainerId containerId = ContainerId.newContainerId( + am2.getApplicationAttemptId(), 1); + cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED); + + // Do a node heartbeat again + r = resource(); + params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 3); + } + finally { + rm.stop(); + } + } + +}