diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 6998d75..68d3662 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -502,6 +502,7 @@ + @@ -525,4 +526,15 @@ +   +     +     +       +       +       +       + +     +     +   diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index b8032ac..45415de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -97,6 +98,8 @@ private volatile Priority maxClusterLevelAppPriority; + protected ActivitiesManager activitiesManager; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -790,4 +793,9 @@ private SchedContainerChangeRequest createSchedContainerChangeRequest( } return schedulerChangeRequests; } + + public ActivitiesManager getActivitiesManager() { + return this.activitiesManager; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java new file mode 100644 index 0000000..8fa1bb5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * Utility for logging scheduler activities + */ +public class ActivitiesLogger { + private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class); + + /** + * Methods for recording activities from an app + */ + public static class APP { + + /* + * Record skipped application activity when no container allocated / + * reserved / re-reserved. Scheduler will look at following applications + * within the same leaf queue. + */ + public static void recordSkippedAppActivityWithoutAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic) { + recordAppActivityWithoutAllocation(activitiesManager, node, application, + priority, diagnostic, ActivityState.SKIPPED); + } + + /* + * Record application activity when rejected because of queue maximum + * capacity or user limit. + */ + public static void recordRejectedAppActivityFromLeafQueue( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic) { + String type = "app"; + recordActivity(activitiesManager, node, application.getQueueName(), + application.getApplicationId().toString(), priority, + ActivityState.REJECTED, diagnostic, type); + finishSkippedAppAllocationRecording(activitiesManager, + application.getApplicationId(), ActivityState.REJECTED, diagnostic); + } + + /* + * Record application activity when no container allocated / + * reserved / re-reserved. Scheduler will look at following applications + * within the same leaf queue. + */ + public static void recordAppActivityWithoutAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic, ActivityState appState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + String type = "container"; + // Add application-container activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getApplicationId().toString(), null, + priority.toString(), ActivityState.SKIPPED, diagnostic, type); + type = "app"; + // Add queue-application activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY, type); + } + // Add application-container activity into specific application allocation + // Under this condition, it fails to allocate a container to this + // application, so containerId is null. + if (activitiesManager.shouldRecordThisApp( + application.getApplicationId())) { + String type = "container"; + activitiesManager.addSchedulingActivityForApp( + application.getApplicationId(), null, priority.toString(), appState, + diagnostic, type); + } + } + + /* + * Record application activity when container allocated / reserved / + * re-reserved + */ + public static void recordAppActivityWithAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Container updatedContainer, + ActivityState activityState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + String type = "container"; + // Add application-container activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getApplicationId().toString(), + updatedContainer.getId().toString(), + updatedContainer.getPriority().toString(), activityState, + ActivityDiagnosticConstant.EMPTY, type); + type = "app"; + // Add queue-application activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY, type); + } + // Add application-container activity into specific application allocation + if (activitiesManager.shouldRecordThisApp( + application.getApplicationId())) { + String type = "container"; + activitiesManager.addSchedulingActivityForApp( + application.getApplicationId(), updatedContainer.getId().toString(), + updatedContainer.getPriority().toString(), activityState, + ActivityDiagnosticConstant.EMPTY, type); + } + } + + /* + * Invoked when scheduler starts to look at this application within one node + * update. + */ + public static void startAppAllocationRecording( + ActivitiesManager activitiesManager, NodeId nodeId, long currentTime, + SchedulerApplicationAttempt application) { + if (activitiesManager == null) { + return; + } + activitiesManager.startAppAllocationRecording(nodeId, currentTime, + application); + } + + /* + * Invoked when scheduler finishes looking at this application within one + * node update, and the app has any container allocated/reserved during + * this allocation. + */ + public static void finishAllocatedAppAllocationRecording( + ActivitiesManager activitiesManager, ApplicationId applicationId, + ContainerId containerId, ActivityState containerState, + String diagnostic) { + if (activitiesManager == null) { + return; + } + + if (activitiesManager.shouldRecordThisApp(applicationId)) { + activitiesManager.finishAppAllocationRecording(applicationId, + containerId, containerState, diagnostic); + } + } + + /* + * Invoked when scheduler finishes looking at this application within one + * node update, and the app DOESN'T have any container allocated/reserved + * during this allocation. + */ + public static void finishSkippedAppAllocationRecording( + ActivitiesManager activitiesManager, ApplicationId applicationId, + ActivityState containerState, String diagnostic) { + finishAllocatedAppAllocationRecording(activitiesManager, applicationId, + null, containerState, diagnostic); + } + } + + /** + * Methods for recording activities from a queue + */ + public static class QUEUE { + /* + * Record activities of a queue + */ + public static void recordQueueActivity(ActivitiesManager activitiesManager, + SchedulerNode node, String parentQueueName, String queueName, + ActivityState state, String diagnostic) { + recordActivity(activitiesManager, node, parentQueueName, queueName, null, + state, diagnostic, null); + } + } + + /** + * Methods for recording overall activities from one node update + */ + public static class NODE { + + /* + * Invoked when node allocation finishes, and there's NO container + * allocated or reserved during the allocation + */ + public static void finishSkippedNodeAllocation( + ActivitiesManager activitiesManager, SchedulerNode node) { + finishAllocatedNodeAllocation(activitiesManager, node, null, + AllocationState.SKIPPED); + } + + /* + * Invoked when node allocation finishes, and there's any container + * allocated or reserved during the allocation + */ + public static void finishAllocatedNodeAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + ContainerId containerId, AllocationState containerState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + activitiesManager.updateAllocationFinalState(node.getNodeID(), + containerId, containerState); + } + } + + /* + * Invoked when node heartbeat finishes + */ + public static void finishNodeUpdateRecording( + ActivitiesManager activitiesManager, NodeId nodeID) { + if (activitiesManager == null) { + return; + } + activitiesManager.finishNodeUpdateRecording(nodeID); + } + + /* + * Invoked when node heartbeat starts + */ + public static void startNodeUpdateRecording( + ActivitiesManager activitiesManager, NodeId nodeID) { + if (activitiesManager == null) { + return; + } + activitiesManager.startNodeUpdateRecording(nodeID); + } + } + + // Add queue, application or container activity into specific node allocation. + private static void recordActivity(ActivitiesManager activitiesManager, + SchedulerNode node, String parentName, String childName, + Priority priority, ActivityState state, String diagnostic, String type) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + parentName, childName, priority != null ? priority.toString() : null, + state, diagnostic, type); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java new file mode 100644 index 0000000..bfab770 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; +import org.apache.hadoop.yarn.util.SystemClock; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.List; +import java.util.Set; +import java.util.*; +import java.util.ArrayList; + +/** + * A class to store node or application allocations. + * It mainly contains operations for allocation start, add, update and finish. + */ +public class ActivitiesManager extends AbstractService { + private static final Log LOG = LogFactory.getLog(ActivitiesManager.class); + private ConcurrentMap> recordingNodesAllocation; + private ConcurrentMap> completedNodeAllocations; + private Set activeRecordedNodes; + private ConcurrentMap + recordingAppActivitiesUntilSpecifiedTime; + private ConcurrentMap appsAllocation; + private ConcurrentMap> + completedAppAllocations; + private boolean recordNextAvailableNode = false; + private List lastAvailableNodeActivities = null; + private Thread cleanUpThread; + private int timeThreshold = 600 * 1000; + private final RMContext rmContext; + + public ActivitiesManager(RMContext rmContext) { + super(ActivitiesManager.class.getName()); + recordingNodesAllocation = new ConcurrentHashMap<>(); + completedNodeAllocations = new ConcurrentHashMap<>(); + appsAllocation = new ConcurrentHashMap<>(); + completedAppAllocations = new ConcurrentHashMap<>(); + activeRecordedNodes = Collections.newSetFromMap( + new ConcurrentHashMap()); + recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>(); + this.rmContext = rmContext; + } + + public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) { + if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus() + == FinalApplicationStatus.UNDEFINED) { + List allocations = completedAppAllocations.get( + applicationId); + + return new AppActivitiesInfo(allocations, applicationId); + } else { + return new AppActivitiesInfo( + "fail to get application activities after finished", + applicationId.toString()); + } + } + + public ActivitiesInfo getActivitiesInfo(String nodeId) { + List allocations; + if (nodeId == null) { + allocations = lastAvailableNodeActivities; + } else { + allocations = completedNodeAllocations.get(NodeId.fromString(nodeId)); + } + return new ActivitiesInfo(allocations, nodeId); + } + + public void recordNextNodeUpdateActivities(String nodeId) { + if (nodeId == null) { + recordNextAvailableNode = true; + } else { + activeRecordedNodes.add(NodeId.fromString(nodeId)); + } + } + + public void turnOnAppActivitiesRecording(ApplicationId applicationId, + double maxTime) { + long startTS = SystemClock.getInstance().getTime(); + long endTS = startTS + (long) (maxTime * 1000); + recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS); + } + + @Override + protected void serviceStart() throws Exception { + cleanUpThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + Iterator>> ite = + completedNodeAllocations.entrySet().iterator(); + while (ite.hasNext()) { + Map.Entry> nodeAllocation = ite.next(); + List allocations = nodeAllocation.getValue(); + long currTS = SystemClock.getInstance().getTime(); + if (allocations.size() > 0 && allocations.get(0).getTimeStamp() + - currTS > timeThreshold) { + ite.remove(); + } + } + + Iterator>> iteApp = + completedAppAllocations.entrySet().iterator(); + while (iteApp.hasNext()) { + Map.Entry> appAllocation = + iteApp.next(); + if (rmContext.getRMApps().get(appAllocation.getKey()) + .getFinalApplicationStatus() + != FinalApplicationStatus.UNDEFINED) { + iteApp.remove(); + } + } + + try { + Thread.sleep(5000); + } catch (Exception e) { + // ignore + } + } + } + }); + + cleanUpThread.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + cleanUpThread.interrupt(); + super.serviceStop(); + } + + void startNodeUpdateRecording(NodeId nodeID) { + if (recordNextAvailableNode) { + recordNextNodeUpdateActivities(nodeID.toString()); + } + if (activeRecordedNodes.contains(nodeID)) { + List nodeAllocation = new ArrayList<>(); + recordingNodesAllocation.put(nodeID, nodeAllocation); + } + } + + void startAppAllocationRecording(NodeId nodeID, long currTS, + SchedulerApplicationAttempt application) { + ApplicationId applicationId = application.getApplicationId(); + + if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + > currTS) { + appsAllocation.put(applicationId, + new AppAllocation(application.getPriority(), nodeID, + application.getQueueName())); + } + + if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + <= currTS) { + turnOffActivityMonitoringForApp(applicationId); + } + } + + // Add queue, application or container activity into specific node allocation. + void addSchedulingActivityForNode(NodeId nodeID, String parentName, + String childName, String priority, ActivityState state, String diagnostic, + String type) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.addAllocationActivity(parentName, childName, priority, + state, diagnostic, type); + } + } + + // Add queue, application or container activity into specific application + // allocation. + void addSchedulingActivityForApp(ApplicationId applicationId, + String containerId, String priority, ActivityState state, + String diagnostic, String type) { + if (shouldRecordThisApp(applicationId)) { + AppAllocation appAllocation = appsAllocation.get(applicationId); + appAllocation.addAppAllocationActivity(containerId, priority, state, + diagnostic, type); + } + } + + // Update container allocation meta status for this node allocation. + // It updates general container status but not the detailed activity state + // in updateActivityState. + void updateAllocationFinalState(NodeId nodeID, ContainerId containerId, + AllocationState containerState) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.updateContainerState(containerId, containerState); + } + } + + void finishAppAllocationRecording(ApplicationId applicationId, + ContainerId containerId, ActivityState appState, String diagnostic) { + if (shouldRecordThisApp(applicationId)) { + long currTS = SystemClock.getInstance().getTime(); + AppAllocation appAllocation = appsAllocation.remove(applicationId); + appAllocation.updateAppContainerStateAndTime(containerId, appState, + currTS, diagnostic); + + List appAllocations; + if (completedAppAllocations.containsKey(applicationId)) { + appAllocations = completedAppAllocations.get(applicationId); + } else { + appAllocations = new ArrayList<>(); + completedAppAllocations.put(applicationId, appAllocations); + } + if (appAllocations.size() == 1000) { + appAllocations.remove(0); + } + appAllocations.add(appAllocation); + + if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + <= currTS) { + turnOffActivityMonitoringForApp(applicationId); + } + } + } + + void finishNodeUpdateRecording(NodeId nodeID) { + List value = recordingNodesAllocation.get(nodeID); + long timeStamp = SystemClock.getInstance().getTime(); + + if (value != null) { + if (value.size() > 0) { + lastAvailableNodeActivities = value; + for (NodeAllocation allocation : lastAvailableNodeActivities) { + allocation.transformToTree(); + allocation.setTimeStamp(timeStamp); + } + if (recordNextAvailableNode) { + recordNextAvailableNode = false; + } + } + + if (shouldRecordThisNode(nodeID)) { + recordingNodesAllocation.remove(nodeID); + completedNodeAllocations.put(nodeID, value); + stopRecordNodeUpdateActivities(nodeID); + } + } + } + + boolean shouldRecordThisApp(ApplicationId applicationId) { + return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && appsAllocation.containsKey(applicationId); + } + + boolean shouldRecordThisNode(NodeId nodeID) { + return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation + .containsKey(nodeID); + } + + private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) { + List nodeAllocations = recordingNodesAllocation.get(nodeID); + NodeAllocation nodeAllocation; + // When this node has already stored allocation activities, get the + // last allocation for this node. + if (nodeAllocations.size() != 0) { + nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1); + // When final state in last allocation is not DEFAULT, it means + // last allocation has finished. Create a new allocation for this node, + // and add it to the allocation list. Return this new allocation. + // + // When final state in last allocation is DEFAULT, + // it means last allocation has not finished. Just get last allocation. + if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) { + nodeAllocation = new NodeAllocation(nodeID); + nodeAllocations.add(nodeAllocation); + } + } + // When this node has not stored allocation activities, + // create a new allocation for this node, and add it to the allocation list. + // Return this new allocation. + else { + nodeAllocation = new NodeAllocation(nodeID); + nodeAllocations.add(nodeAllocation); + } + return nodeAllocation; + } + + private void stopRecordNodeUpdateActivities(NodeId nodeId) { + activeRecordedNodes.remove(nodeId); + } + + private void turnOffActivityMonitoringForApp(ApplicationId applicationId) { + recordingAppActivitiesUntilSpecifiedTime.remove(applicationId); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java new file mode 100644 index 0000000..fc4738e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of diagnostics. + */ +public class ActivityDiagnosticConstant { + // EMPTY means it does not have any diagnostic to display. + // In order not to show "diagnostic" line in frontend, + // we set the value to null. + public final static String EMPTY = null; + public final static String NOT_ABLE_TO_ACCESS_PARTITION = + "Not able to access partition"; + public final static String QUEUE_DO_NOT_NEED_MORE_RESOURCE = + "Queue does not need more resource"; + public final static String QUEUE_MAX_CAPACITY_LIMIT = + "Hit queue max-capacity limit"; + public final static String USER_CAPACITY_MAXIMUM_LIMIT = + "Hit user capacity maximum limit"; + public final static String SKIP_BLACK_LISTED_NODE = "Skip black listed node"; + public final static String PRIORITY_SKIPPED = "Priority skipped"; + public final static String PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST = + "Priority skipped because off-switch request is null"; + public final static String + PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST = + "Priority skipped because partition of node doesn't match request"; + public final static String SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY = + "Priority skipped because of relax locality is not allowed"; + public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE = + "Skipping assigning to Node in Ignore Exclusivity mode"; + public final static String DO_NOT_NEED_ALLOCATIONATTEMPTINFOS = + "Doesn't need containers based on reservation algo!"; + public final static String QUEUE_SKIPPED_HEADROOM = + "Queue skipped because of headroom"; + public final static String NON_PARTITIONED_PARTITION_FIRST = + "Non-partitioned resource request should be scheduled to " + + "non-partitioned partition first"; + public final static String SKIP_NODE_LOCAL_REQUEST = + "Skip node-local request"; + public final static String SKIP_RACK_LOCAL_REQUEST = + "Skip rack-local request"; + public final static String SKIP_OFF_SWITCH_REQUEST = + "Skip offswitch request"; + public final static String REQUEST_CAN_NOT_ACCESS_NODE_LABEL = + "Resource request can not access the label"; + public final static String NOT_SUFFICIENT_RESOURCE = + "Node does not have sufficient resource for request"; + public final static String LOCALITY_SKIPPED = "Locality skipped"; + public final static String FAIL_TO_ALLOCATE = "Fail to allocate"; + public final static String COULD_NOT_GET_CONTAINER = + "Couldn't get container for allocation"; + public final static String APPLICATION_DO_NOT_NEED_RESOURCE = + "Application does not need more resource"; + public final static String APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE = + "Application priority does not need more resource"; + public final static String SKIPPED_ALL_PRIORITIES = + "All priorities are skipped of the app"; + public final static String RESPECT_FIFO = "To respect FIFO of applications, " + + "skipped following applications in the queue"; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java new file mode 100644 index 0000000..a03814c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import java.util.LinkedList; +import java.util.List; + +/* + * It represents tree node in "NodeAllocation" tree structure. + * Each node may represent queue, application or container in allocation activity. + * Node may have children node if successfully allocated to next level. + */ +public class ActivityNode { + private String activityNodeName; + private String parentName; + private String appPriority; + private String requestPriority; + private ActivityState state; + private String diagnostic; + + private List childNode; + + public ActivityNode(String activityNodeName, String parentName, + String priority, ActivityState state, String diagnostic, String type) { + this.activityNodeName = activityNodeName; + this.parentName = parentName; + if (type != null) { + if (type.equals("app")) { + this.appPriority = priority; + } else if (type.equals("container")) { + this.requestPriority = priority; + } + } + this.state = state; + this.diagnostic = diagnostic; + this.childNode = new LinkedList<>(); + } + + public String getName() { + return this.activityNodeName; + } + + public String getParentName() { + return this.parentName; + } + + public void addChild(ActivityNode node) { + childNode.add(0, node); + } + + public List getChildren() { + return this.childNode; + } + + public ActivityState getState() { + return this.state; + } + + public String getDiagnostic() { + return this.diagnostic; + } + + public String getAppPriority() { + return appPriority; + } + + public String getRequestPriority() { + return requestPriority; + } + + public boolean getType() { + if (appPriority != null) { + return true; + } else { + return false; + } + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.activityNodeName + " "); + sb.append(this.appPriority + " "); + sb.append(this.state + " "); + if (!this.diagnostic.equals("")) { + sb.append(this.diagnostic + "\n"); + } + sb.append("\n"); + for (ActivityNode child : childNode) { + sb.append(child.toString() + "\n"); + } + return sb.toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java new file mode 100644 index 0000000..bce1fc9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of activity operation states. + */ +public enum ActivityState { + // default state when adding a new activity in node allocation + DEFAULT, + // container is allocated to sub-queues/applications or this queue/application + ACCEPTED, + // queue or application voluntarily give up to use the resource OR + // nothing allocated + SKIPPED, + // container could not be allocated to sub-queues or this application + REJECTED, + ALLOCATED, // successfully allocate a new non-reserved container + RESERVED, // successfully reserve a new container + RE_RESERVED // successfully reserve a new container +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java new file mode 100644 index 0000000..da768e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * It records an activity operation in allocation, + * which can be classified as queue, application or container activity. + * Other information include state, diagnostic, priority. + */ +public class AllocationActivity { + private String childName = null; + private String parentName = null; + private String appPriority = null; + private String requestPriority = null; + private ActivityState state; + private String diagnostic = null; + + private static final Log LOG = LogFactory.getLog(AllocationActivity.class); + + public AllocationActivity(String parentName, String queueName, + String priority, ActivityState state, String diagnostic, String type) { + this.childName = queueName; + this.parentName = parentName; + if (type != null) { + if (type.equals("app")) { + this.appPriority = priority; + } else if (type.equals("container")) { + this.requestPriority = priority; + } + } + this.state = state; + this.diagnostic = diagnostic; + } + + public ActivityNode createTreeNode() { + if (appPriority != null) { + return new ActivityNode(this.childName, this.parentName, this.appPriority, + this.state, this.diagnostic, "app"); + } else if (requestPriority != null) { + return new ActivityNode(this.childName, this.parentName, + this.requestPriority, this.state, this.diagnostic, "container"); + } else { + return new ActivityNode(this.childName, this.parentName, null, this.state, + this.diagnostic, null); + } + } + + public String getName() { + return this.childName; + } + + public String getState() { + return this.state.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java new file mode 100644 index 0000000..e38cefc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of allocation final states. + */ +public enum AllocationState { + DEFAULT, + // queue or application voluntarily give up to use the resource + // OR nothing allocated + SKIPPED, + // successfully allocate a new non-reserved container + ALLOCATED, + // successfully allocate a new container from an existing reserved container + ALLOCATED_FROM_RESERVED, + // successfully reserve a new container + RESERVED +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java new file mode 100644 index 0000000..15850c0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; + +import java.util.ArrayList; +import java.util.List; + +/* + * It contains allocation information for one application within a period of + * time. + * Each application allocation may have several allocation attempts. + */ +public class AppAllocation { + private Priority priority = null; + private NodeId nodeId; + private ContainerId containerId = null; + private ActivityState appState = null; + private String diagnostic = null; + private String queueName = null; + private List allocationAttempts; + private long timestamp; + + public AppAllocation(Priority priority, NodeId nodeId, String queueName) { + this.priority = priority; + this.nodeId = nodeId; + this.allocationAttempts = new ArrayList<>(); + this.queueName = queueName; + } + + public void updateAppContainerStateAndTime(ContainerId containerId, + ActivityState appState, long ts, String diagnostic) { + this.timestamp = ts; + this.containerId = containerId; + this.appState = appState; + this.diagnostic = diagnostic; + } + + public void addAppAllocationActivity(String containerId, String priority, + ActivityState state, String diagnostic, String type) { + ActivityNode container = new ActivityNode(containerId, null, priority, + state, diagnostic, type); + this.allocationAttempts.add(container); + if (state == ActivityState.REJECTED) { + this.appState = ActivityState.SKIPPED; + } else { + this.appState = state; + } + } + + public String getNodeId() { + return nodeId.toString(); + } + + public String getQueueName() { + return queueName; + } + + public ActivityState getAppState() { + return appState; + } + + public String getPriority() { + if (priority == null) { + return null; + } + return priority.toString(); + } + + public String getContainerId() { + if (containerId == null) { + return null; + } + return containerId.toString(); + } + + public String getDiagnostic() { + return diagnostic; + } + + public long getTime() { + return this.timestamp; + } + + public List getAllocationAttempts() { + return allocationAttempts; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java new file mode 100644 index 0000000..5a8f955 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/* + * It contains allocation information for one allocation in a node heartbeat. + * Detailed allocation activities are first stored in "AllocationActivity" + * as operations, then transformed to a tree structure. + * Tree structure starts from root queue and ends in leaf queue, + * application or container allocation. + */ +public class NodeAllocation { + private NodeId nodeId; + private long timeStamp; + private ContainerId containerId = null; + private AllocationState containerState = AllocationState.DEFAULT; + private List allocationOperations; + + private ActivityNode root = null; + + private static final Log LOG = LogFactory.getLog(NodeAllocation.class); + + public NodeAllocation(NodeId nodeId) { + this.nodeId = nodeId; + this.allocationOperations = new ArrayList<>(); + } + + public void addAllocationActivity(String parentName, String childName, + String priority, ActivityState state, String diagnostic, String type) { + AllocationActivity allocate = new AllocationActivity(parentName, childName, + priority, state, diagnostic, type); + this.allocationOperations.add(allocate); + } + + public void updateContainerState(ContainerId containerId, + AllocationState containerState) { + this.containerId = containerId; + this.containerState = containerState; + } + + // In node allocation, transform each activity to a tree-like structure + // for frontend activity display. + // eg: root + // / \ + // a b + // / \ + // app1 app2 + // / \ + // CA1 CA2 + // CA means Container Attempt + public void transformToTree() { + List allocationTree = new ArrayList<>(); + + if (root == null) { + Set names = Collections.newSetFromMap( + new ConcurrentHashMap()); + ListIterator ite = allocationOperations.listIterator( + allocationOperations.size()); + while (ite.hasPrevious()) { + String name = ite.previous().getName(); + if (name != null) { + if (!names.contains(name)) { + names.add(name); + } else { + ite.remove(); + } + } + } + + for (AllocationActivity allocationOperation : allocationOperations) { + ActivityNode node = allocationOperation.createTreeNode(); + String name = node.getName(); + for (int i = allocationTree.size() - 1; i > -1; i--) { + if (allocationTree.get(i).getParentName().equals(name)) { + node.addChild(allocationTree.get(i)); + allocationTree.remove(i); + } else { + break; + } + } + allocationTree.add(node); + } + root = allocationTree.get(0); + } + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public long getTimeStamp() { + return this.timeStamp; + } + + public AllocationState getFinalAllocationState() { + return containerState; + } + + public String getContainerId() { + if (containerId == null) + return null; + return containerId.toString(); + } + + public ActivityNode getRoot() { + return root; + } + + public String getNodeId() { + return nodeId.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9c88154..1d8f929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -91,12 +92,15 @@ protected CapacitySchedulerContext csContext; protected YarnAuthorizationProvider authorizer = null; - public AbstractCSQueue(CapacitySchedulerContext cs, + protected ActivitiesManager activitiesManager; + + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); this.parent = parent; this.queueName = queueName; this.resourceCalculator = cs.getResourceCalculator(); + this.activitiesManager = cs.getActivitiesManager(); // must be called after parent and queueName is set this.metrics = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 54dcfdf..5696c71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -109,6 +109,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; @@ -308,6 +314,8 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); authorizer = YarnAuthorizationProvider.getInstance(yarnConf); + this.activitiesManager = new ActivitiesManager(rmContext); + activitiesManager.init(conf); initializeQueues(this.conf); this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); @@ -345,6 +353,7 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { startSchedulerThreads(); + activitiesManager.start(); super.serviceStart(); } @@ -524,7 +533,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) Map newQueues = new HashMap(); CSQueue newRoot = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - newQueues, queues, noop); + newQueues, queues, noop); // Ensure all existing queues are still present validateExistingQueues(queues, newQueues); @@ -651,7 +660,7 @@ static CSQueue parseQueue( throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = + ParentQueue parentQueue = new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests @@ -803,7 +812,7 @@ private synchronized void addApplicationAttempt( FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority(), isAttemptRecovering); + application.getPriority(), isAttemptRecovering, activitiesManager); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); @@ -1234,6 +1243,7 @@ public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { + FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(reservedContainer.getContainerId()); @@ -1263,6 +1273,19 @@ public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { tmp.getAssignmentInformation().incrAllocations(); updateSchedulerHealth(lastNodeUpdateTime, node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), + AllocationState.ALLOCATED_FROM_RESERVED); + } else { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), AllocationState.SKIPPED); } } @@ -1372,7 +1395,11 @@ public void handle(SchedulerEvent event) { setLastNodeUpdateTime(Time.now()); nodeUpdate(node); if (!scheduleAsynchronously) { + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + node.getNodeID()); allocateContainersToNode(getNode(node.getNodeID())); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + node.getNodeID()); } } break; @@ -1946,13 +1973,9 @@ public synchronized String moveApplication(ApplicationId appId, return targetQueueName; } - /** + /* * Check application can be moved to queue with labels enabled. All labels in * application life time will be checked - * - * @param appId - * @param dest - * @throws YarnException */ private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest) throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index b39b289..c41a7bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; @@ -80,4 +81,6 @@ * cluster. */ ResourceUsage getClusterResourceUsage(); + + ActivitiesManager getActivitiesManager(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3594bb0..109df3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -58,6 +58,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -65,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -142,7 +146,7 @@ public LeafQueue(CapacitySchedulerContext cs, super(cs, queueName, parent, old); this.scheduler = cs; - this.activeUsersManager = new ActiveUsersManager(metrics); + this.activeUsersManager = new ActiveUsersManager(metrics); // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); @@ -153,7 +157,7 @@ public LeafQueue(CapacitySchedulerContext cs, LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); } - + setupQueueConfigs(cs.getClusterResource()); } @@ -872,7 +876,7 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } - + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, @@ -891,6 +895,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if (reservedContainer != null) { FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); + + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + synchronized (application) { CSAssignment assignment = application.assignContainers(clusterResource, node, @@ -905,6 +913,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); return CSAssignment.NULL_ASSIGNMENT; } @@ -917,6 +929,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); return CSAssignment.NULL_ASSIGNMENT; } @@ -924,13 +939,23 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) { FiCaSchedulerApp application = assignmentIterator.next(); + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + // Check queue max-capacity limit if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, + application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } - + Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, node.getPartition(), schedulingMode); @@ -940,6 +965,10 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, application, node.getPartition(), currentResourceLimits)) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, + application, application.getPriority(), + ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); continue; } @@ -981,10 +1010,17 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, incReservedResource(node.getPartition(), reservedRes); } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + // Done return assignment; } else if (assignment.getSkippedType() == CSAssignment.SkippedType.OTHER) { + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); application.updateNodeInfoForAMDiagnostics(node); } else if(assignment.getSkippedType() == CSAssignment.SkippedType.QUEUE_LIMIT) { @@ -992,9 +1028,18 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.RESPECT_FIFO); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 9ae35ee..a245e3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -42,12 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -81,7 +80,7 @@ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - public ParentQueue(CapacitySchedulerContext cs, + public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; @@ -98,14 +97,14 @@ public ParentQueue(CapacitySchedulerContext cs, "capacity of " + rawCapacity + " for queue " + queueName + ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - + this.childQueues = new TreeSet(nonPartitionedQueueComparator); - + setupQueueConfigs(cs.getClusterResource()); - LOG.info("Initialized parent-queue " + queueName + - " name=" + queueName + - ", fullname=" + getQueuePath()); + LOG.info("Initialized parent-queue " + queueName + + " name=" + queueName + + ", fullname=" + getQueuePath()); } synchronized void setupQueueConfigs(Resource clusterResource) @@ -380,6 +379,10 @@ private synchronized void removeApplication(ApplicationId applicationId, " #applications: " + getNumApplications()); } + private String getParentName() { + return getParent() != null ? getParent().getQueueName() : ""; + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, @@ -392,6 +395,16 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it is not able to access partition=" + node .getPartition()); } + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + return CSAssignment.NULL_ASSIGNMENT; } @@ -404,6 +417,15 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + return CSAssignment.NULL_ASSIGNMENT; } @@ -423,9 +445,18 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, resourceLimits, Resources.createResource( getMetrics().getReservedMB(), getMetrics() .getReservedVirtualCores()), schedulingMode)) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + break; } - + // Schedule CSAssignment assignedToChild = assignContainersToChildQueues(clusterResource, node, resourceLimits, @@ -436,6 +467,29 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if (Resources.greaterThan( resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + + if (node.getReservedContainer() == null) { + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.ALLOCATED); + } + } else { + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.RESERVED); + } + } + // Track resource utilization for the parent-queue allocateResource(clusterResource, assignedToChild.getResource(), node.getPartition(), assignedToChild.isIncreasedAllocation()); @@ -474,6 +528,15 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } else { assignment.setSkippedType(assignedToChild.getSkippedType()); + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + break; } @@ -631,7 +694,7 @@ private synchronized CSAssignment assignContainersToChildQueues( resourceToSubtract); } } - + return assignment; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index 4d5a7dc..fa13df4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -24,6 +24,10 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -43,17 +47,25 @@ FiCaSchedulerApp application; final ResourceCalculator rc; final RMContext rmContext; - + ActivitiesManager activitiesManager; + public AbstractContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { + this(application, rc, rmContext, null); + } + + public AbstractContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext, + ActivitiesManager activitiesManager) { this.application = application; this.rc = rc; this.rmContext = rmContext; + this.activitiesManager = activitiesManager; } protected CSAssignment getCSAssignmentFromAllocateResult( Resource clusterResource, ContainerAllocation result, - RMContainer rmContainer) { + RMContainer rmContainer, FiCaSchedulerNode node) { // Handle skipped CSAssignment.SkippedType skipped = (result.getAllocationState() == AllocationState.APP_SKIPPED) ? @@ -61,7 +73,7 @@ protected CSAssignment getCSAssignmentFromAllocateResult( CSAssignment.SkippedType.NONE; CSAssignment assignment = new CSAssignment(skipped); assignment.setApplication(application); - + // Handle excess reservation assignment.setExcessReservation(result.getContainerToBeUnreserved()); @@ -85,6 +97,23 @@ protected CSAssignment getCSAssignmentFromAllocateResult( assignment.getAssignmentInformation().incrReservations(); Resources.addTo(assignment.getAssignmentInformation().getReserved(), allocatedResource); + + if (rmContainer != null) { + ActivitiesLogger.APP.recordAppActivityWithAllocation( + activitiesManager, node, application, updatedContainer, + ActivityState.RE_RESERVED); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); + } else { + ActivitiesLogger.APP.recordAppActivityWithAllocation( + activitiesManager, node, application, updatedContainer, + ActivityState.RESERVED); + ActivitiesLogger.APP.finishAllocatedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + updatedContainer.getId(), ActivityState.RESERVED, + ActivityDiagnosticConstant.EMPTY); + } } else if (result.getAllocationState() == AllocationState.ALLOCATED){ // This is a new container // Inform the ordering policy @@ -105,10 +134,18 @@ protected CSAssignment getCSAssignmentFromAllocateResult( assignment.getAssignmentInformation().incrAllocations(); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), allocatedResource); - + if (rmContainer != null) { assignment.setFulfilledReservation(true); } + + ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager, + node, application, updatedContainer, ActivityState.ALLOCATED); + ActivitiesLogger.APP.finishAllocatedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + updatedContainer.getId(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + } assignment.setContainersToKill(result.getToKillContainers()); @@ -118,13 +155,13 @@ protected CSAssignment getCSAssignmentFromAllocateResult( CSAssignment.SkippedType.QUEUE_LIMIT); } } - + return assignment; } - + /** * allocate needs to handle following stuffs: - * + * *
    *
  • Select request: Select a request to allocate. E.g. select a resource * request based on requirement/priority/locality.
  • diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 3be8e0e..4eaa24b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -36,12 +37,17 @@ public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { + this(application, rc, rmContext, null); + } + + public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, + RMContext rmContext, ActivitiesManager activitiesManager) { super(application, rc, rmContext); increaseContainerAllocator = new IncreaseContainerAllocator(application, rc, rmContext); - regularContainerAllocator = - new RegularContainerAllocator(application, rc, rmContext); + regularContainerAllocator = new RegularContainerAllocator(application, rc, + rmContext, activitiesManager); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 111a1fe..8d4042c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -27,16 +27,24 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -54,10 +62,11 @@ private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); private ResourceRequest lastResourceRequest = null; - + public RegularContainerAllocator(FiCaSchedulerApp application, - ResourceCalculator rc, RMContext rmContext) { - super(application, rc, rmContext); + ResourceCalculator rc, RMContext rmContext, + ActivitiesManager activitiesManager) { + super(application, rc, rmContext, activitiesManager); } private boolean checkHeadroom(Resource clusterResource, @@ -82,15 +91,23 @@ private boolean checkHeadroom(Resource clusterResource, private ContainerAllocation preCheckForNewContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { + Priority priority = schedulerKey.getPriority(); + if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE); return ContainerAllocation.APP_SKIPPED; } ResourceRequest anyRequest = application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (null == anyRequest) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -99,6 +116,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, // Do we need containers at this 'priority'? if (application.getTotalRequiredResources(schedulerKey) <= 0) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -113,6 +133,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, } application.updateAppSkipNodeDiagnostics( "Skipping assigning to Node in Ignore Exclusivity mode. "); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE); return ContainerAllocation.APP_SKIPPED; } } @@ -123,6 +146,10 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( anyRequest.getNodeLabelExpression(), node.getPartition(), schedulingMode)) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant. + PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -131,6 +158,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug("doesn't need containers based on reservation algo!"); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS); return ContainerAllocation.PRIORITY_SKIPPED; } } @@ -140,6 +170,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, LOG.debug("cannot allocate required resource=" + required + " because of headroom"); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM); return ContainerAllocation.QUEUE_SKIPPED; } @@ -171,7 +204,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, + missedNonPartitionedRequestSchedulingOpportunity + " required=" + rmContext.getScheduler().getNumClusterNodes()); } - + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST); return ContainerAllocation.APP_SKIPPED; } } @@ -298,6 +333,9 @@ private ContainerAllocation assignNodeLocalContainers( } // Skip node-local request, go to rack-local request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -313,6 +351,9 @@ private ContainerAllocation assignRackLocalContainers( } // Skip rack-local request, go to off-switch request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -329,6 +370,9 @@ private ContainerAllocation assignOffSwitchContainers( application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.SKIP_OFF_SWITCH_REQUEST); return ContainerAllocation.APP_SKIPPED; } @@ -336,6 +380,7 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + Priority priority = schedulerKey.getPriority(); ContainerAllocation allocation; @@ -361,6 +406,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, application.getResourceRequest(schedulerKey, node.getRackName()); if (rackLocalResourceRequest != null) { if (!rackLocalResourceRequest.getRelaxLocality()) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -384,6 +432,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, application.getResourceRequest(schedulerKey, ResourceRequest.ANY); if (offSwitchResourceRequest != null) { if (!offSwitchResourceRequest.getRelaxLocality()) { + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY); return ContainerAllocation.PRIORITY_SKIPPED; } if (requestType != NodeType.NODE_LOCAL @@ -405,7 +456,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, return allocation; } - + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -413,6 +466,7 @@ private ContainerAllocation assignContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, ResourceRequest request, NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { + Priority priority = schedulerKey.getPriority(); lastResourceRequest = request; if (LOG.isDebugEnabled()) { @@ -429,6 +483,10 @@ private ContainerAllocation assignContainer(Resource clusterResource, // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed // We should un-reserve this container. + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, priority, + ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS_NODE_LABEL, + ActivityState.REJECTED); return new ContainerAllocation(rmContainer, null, AllocationState.LOCALITY_SKIPPED); } @@ -443,6 +501,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); // Skip this locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -521,6 +582,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, // continue. if (null == unreservedContainer) { // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -545,6 +609,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, LOG.debug("we needed to unreserve to be able to allocate"); } // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -557,6 +624,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, return result; } // Skip the locality request + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.LOCALITY_SKIPPED); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -635,6 +705,9 @@ private ContainerAllocation handleNewContainerAllocation( ContainerAllocation ret = new ContainerAllocation(allocationResult.containerToBeUnreserved, null, AllocationState.APP_SKIPPED); + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED); return ret; } @@ -661,6 +734,10 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, application .updateAppSkipNodeDiagnostics("Scheduling of container failed. "); LOG.warn("Couldn't get container for allocation!"); + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, + node, application, schedulerKey.getPriority(), + ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER, + ActivityState.REJECTED); return ContainerAllocation.APP_SKIPPED; } @@ -740,6 +817,9 @@ public CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-label=" + node.getPartition()); } + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE); return CSAssignment.SKIP_ASSIGNMENT; } @@ -754,18 +834,21 @@ public CSAssignment assignContainers(Resource clusterResource, continue; } return getCSAssignmentFromAllocateResult(clusterResource, result, - null); + null, node); } // We will reach here if we skipped all priorities of the app, so we will // skip the app. + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); return CSAssignment.SKIP_ASSIGNMENT; } else { ContainerAllocation result = allocate(clusterResource, node, schedulingMode, resourceLimits, reservedContainer.getReservedSchedulerKey(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, - reservedContainer); + reservedContainer, node); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 490e545..947f293 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -18,12 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -57,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; @@ -72,7 +68,11 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -110,8 +110,16 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { + this(applicationAttemptId, user, queue, activeUsersManager, rmContext, + appPriority, isAttemptRecovering, null); + } + + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, Priority appPriority, boolean isAttemptRecovering, + ActivitiesManager activitiesManager) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); - + RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); Resource amResource; @@ -141,8 +149,9 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, if (scheduler.getResourceCalculator() != null) { rc = scheduler.getResourceCalculator(); } - - containerAllocator = new ContainerAllocator(this, rc, rmContext); + + containerAllocator = new ContainerAllocator(this, rc, rmContext, + activitiesManager); if (scheduler instanceof CapacityScheduler) { capacitySchedulerContext = (CapacitySchedulerContext) scheduler; @@ -191,7 +200,7 @@ public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, return null; } - // Required sanity check - AM can call 'allocate' to update resource + // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check if (getTotalRequiredResources(schedulerKey) <= 0) { return null; @@ -495,7 +504,7 @@ public RMContainer findNodeToUnreserve(Resource clusterResource, public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 75bffc7..4305fd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -130,9 +130,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; @@ -176,6 +180,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webapp.WebServices; @@ -577,6 +582,124 @@ public AppsInfo getApps(@Context HttpServletRequest hsr, } @GET + @Path("/scheduler/activities") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, + @QueryParam("nodeId") String nodeId) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + + if (scheduler instanceof AbstractYarnScheduler) { + String errMessage = ""; + + AbstractYarnScheduler abstractYarnScheduler = + (AbstractYarnScheduler) scheduler; + + ActivitiesManager activitiesManager = + abstractYarnScheduler.getActivitiesManager(); + if (null == activitiesManager) { + errMessage = "Not Capacity Scheduler"; + return new ActivitiesInfo(errMessage, nodeId); + } + + List nodeList = + abstractYarnScheduler.getNodeTracker().getAllNodes(); + + boolean illegalInput = false; + + if (nodeList.size() == 0) { + illegalInput = true; + errMessage = "No node manager running in the cluster"; + } else { + if (nodeId != null) { + String hostName = nodeId; + String portName = ""; + if (nodeId.contains(":")) { + int index = nodeId.indexOf(":"); + hostName = nodeId.substring(0, index); + portName = nodeId.substring(index + 1); + } + + boolean correctNodeId = false; + for (FiCaSchedulerNode node : nodeList) { + if ((portName.equals("") && node.getRMNode().getHostName().equals( + hostName)) || (!portName.equals("") && node.getRMNode() + .getHostName().equals(hostName) && String.valueOf( + node.getRMNode().getCommandPort()).equals(portName))) { + correctNodeId = true; + nodeId = node.getNodeID().toString(); + break; + } + } + if (!correctNodeId) { + illegalInput = true; + errMessage = "Cannot find node manager with given node id"; + } + } + } + + if (!illegalInput) { + activitiesManager.recordNextNodeUpdateActivities(nodeId); + return activitiesManager.getActivitiesInfo(nodeId); + } + + // Return a activities info with error message + return new ActivitiesInfo(errMessage, nodeId); + } + + return null; + } + + @GET + @Path("/scheduler/app-activities") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, + @QueryParam("appId") String appId, @QueryParam("maxTime") String time) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + + if (scheduler instanceof AbstractYarnScheduler) { + AbstractYarnScheduler abstractYarnScheduler = + (AbstractYarnScheduler) scheduler; + + ActivitiesManager activitiesManager = + abstractYarnScheduler.getActivitiesManager(); + if (null == activitiesManager) { + String errMessage = "Not Capacity Scheduler"; + return new AppActivitiesInfo(errMessage, appId); + } + + if(appId == null) { + String errMessage = "Must provide an application Id"; + return new AppActivitiesInfo(errMessage, null); + } + + double maxTime = 3.0; + + if (time != null) { + if (time.contains(".")) { + maxTime = Double.parseDouble(time); + } else { + maxTime = Double.parseDouble(time + ".0"); + } + } + + ApplicationId applicationId; + try { + applicationId = ApplicationId.fromString(appId); + activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime); + AppActivitiesInfo appActivitiesInfo = + activitiesManager.getAppActivitiesInfo(applicationId); + + return appActivitiesInfo; + } catch (Exception e) { + String errMessage = "Cannot find application with given appId"; + return new AppActivitiesInfo(errMessage, appId); + } + + } + return null; + } + + @GET @Path("/appstatistics") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public ApplicationStatisticsInfo getAppStatistics( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java new file mode 100644 index 0000000..0de340a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Date; +import java.util.List; +import java.util.ArrayList; + +/* + * DAO object to display node allocation activity. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class ActivitiesInfo { + protected String nodeId; + protected String timeStamp; + protected String diagnostic = null; + protected List allocations; + + private static final Log LOG = LogFactory.getLog(ActivitiesInfo.class); + + public ActivitiesInfo() { + } + + public ActivitiesInfo(String errorMessage, String nodeId) { + this.diagnostic = errorMessage; + this.nodeId = nodeId; + } + + public ActivitiesInfo(List nodeAllocations, String nodeId) { + this.nodeId = nodeId; + this.allocations = new ArrayList<>(); + + if (nodeAllocations == null) { + diagnostic = (nodeId != null ? + "waiting for display" : + "waiting for next allocation"); + } else { + if (nodeAllocations.size() == 0) { + diagnostic = "do not have available resources"; + } else { + this.nodeId = nodeAllocations.get(0).getNodeId(); + + Date date = new Date(); + date.setTime(nodeAllocations.get(0).getTimeStamp()); + this.timeStamp = date.toString(); + + for (int i = 0; i < nodeAllocations.size(); i++) { + NodeAllocation nodeAllocation = nodeAllocations.get(i); + NodeAllocationInfo allocationInfo = new NodeAllocationInfo( + nodeAllocation); + this.allocations.add(allocationInfo); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java new file mode 100644 index 0000000..9553a720 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/* + * DAO object to display node information in allocation tree. + * It corresponds to "ActivityNode" class. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class ActivityNodeInfo { + protected String name; // The name for activity node + protected String appPriority; + protected String requestPriority; + protected String allocationState; + protected String diagnostic; + + protected List children; + + ActivityNodeInfo() { + } + + ActivityNodeInfo(ActivityNode node) { + this.name = node.getName(); + getPriority(node); + this.allocationState = node.getState().name(); + this.diagnostic = node.getDiagnostic(); + this.children = new ArrayList<>(); + + for (ActivityNode child : node.getChildren()) { + ActivityNodeInfo containerInfo = new ActivityNodeInfo(child); + this.children.add(containerInfo); + } + } + + private void getPriority(ActivityNode node) { + if (node.getType()) { + this.appPriority = node.getAppPriority(); + } else { + this.requestPriority = node.getRequestPriority(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java new file mode 100644 index 0000000..38c45a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation; +import org.apache.hadoop.yarn.util.SystemClock; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/* + * DAO object to display application activity. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class AppActivitiesInfo { + protected String applicationId; + protected String diagnostic; + protected String timeStamp; + protected List allocations; + + private static final Log LOG = LogFactory.getLog(AppActivitiesInfo.class); + + public AppActivitiesInfo() { + } + + public AppActivitiesInfo(String errorMessage, String applicationId) { + this.diagnostic = errorMessage; + this.applicationId = applicationId; + + Date date = new Date(); + date.setTime(SystemClock.getInstance().getTime()); + this.timeStamp = date.toString(); + } + + public AppActivitiesInfo(List appAllocations, + ApplicationId applicationId) { + this.applicationId = applicationId.toString(); + this.allocations = new ArrayList<>(); + + if (appAllocations == null) { + diagnostic = "waiting for display"; + + Date date = new Date(); + date.setTime(SystemClock.getInstance().getTime()); + this.timeStamp = date.toString(); + } else { + for (int i = appAllocations.size() - 1; i > -1; i--) { + AppAllocation appAllocation = appAllocations.get(i); + AppAllocationInfo appAllocationInfo = new AppAllocationInfo( + appAllocation); + this.allocations.add(appAllocationInfo); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java new file mode 100644 index 0000000..21d3788 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/* + * DAO object to display application allocation detailed information. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class AppAllocationInfo { + protected String nodeId; + protected String queueName; + protected String appPriority; + protected String allocatedContainerId; + protected String allocationState; + protected String diagnostic; + protected String timeStamp; + protected List allocationAttempt; + + private static final Log LOG = LogFactory.getLog(AppAllocationInfo.class); + + AppAllocationInfo() { + } + + AppAllocationInfo(AppAllocation allocation) { + this.allocationAttempt = new ArrayList<>(); + + this.nodeId = allocation.getNodeId(); + this.queueName = allocation.getQueueName(); + this.appPriority = allocation.getPriority(); + this.allocatedContainerId = allocation.getContainerId(); + this.allocationState = allocation.getAppState().name(); + this.diagnostic = allocation.getDiagnostic(); + + Date date = new Date(); + date.setTime(allocation.getTime()); + this.timeStamp = date.toString(); + + for (ActivityNode attempt : allocation.getAllocationAttempts()) { + ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt); + this.allocationAttempt.add(containerInfo); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java new file mode 100644 index 0000000..1350a76 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/* + * DAO object to display each node allocation in node heartbeat. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class NodeAllocationInfo { + protected String allocatedContainerId; + protected String finalAllocationState; + protected ActivityNodeInfo root = null; + + private static final Log LOG = LogFactory.getLog(NodeAllocationInfo.class); + + NodeAllocationInfo() { + } + + NodeAllocationInfo(NodeAllocation allocation) { + this.allocatedContainerId = allocation.getContainerId(); + this.finalAllocationState = allocation.getFinalAllocationState().name(); + + root = new ActivityNodeInfo(allocation.getRoot()); + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index 9379367..95f7c02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -18,17 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.StringReader; - -import javax.ws.rs.core.MediaType; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; - +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.servlet.GuiceServletContextListener; +import com.google.inject.servlet.ServletModule; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; +import com.sun.jersey.test.framework.WebAppDescriptor; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -52,20 +49,21 @@ import org.w3c.dom.NodeList; import org.xml.sax.InputSource; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.servlet.GuiceServletContextListener; -import com.google.inject.servlet.ServletModule; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; -import com.sun.jersey.test.framework.WebAppDescriptor; +import javax.ws.rs.core.MediaType; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import java.io.StringReader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestRMWebServicesCapacitySched extends JerseyTestBase { - private static MockRM rm; - private CapacitySchedulerConfiguration csConf; - private YarnConfiguration conf; + protected static MockRM rm; + protected static CapacitySchedulerConfiguration csConf; + protected static YarnConfiguration conf; private class QueueInfo { float capacity; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java new file mode 100644 index 0000000..d7b0581 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -0,0 +1,777 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Test; + +import javax.ws.rs.core.MediaType; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class TestRMWebServicesSchedulerActivities + extends TestRMWebServicesCapacitySched { + + private static final Log LOG = LogFactory.getLog( + TestRMWebServicesSchedulerActivities.class); + + @Test + public void testAssignMultipleContainersPerNodeHeartbeat() + throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1:1234"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 11); + + JSONArray allocations = json.getJSONArray("allocations"); + for (int i = 0; i < allocations.length(); i++) { + if (i != allocations.length() - 1) { + verifyStateOfAllocations(allocations.getJSONObject(i), + "finalAllocationState", "ALLOCATED"); + verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1"); + } else { + verifyStateOfAllocations(allocations.getJSONObject(i), + "finalAllocationState", "SKIPPED"); + verifyQueueOrder(allocations.getJSONObject(i), "root-a-b"); + } + } + } + finally { + rm.stop(); + } + } + + @Test + public void testAssignWithoutAvailableResource() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testNoNM() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + try { + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1:1234"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testWrongNodeId() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.0"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testReserveNewContainer() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, + rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, + rm.getResourceTrackerService()); + + nm1.registerNode(); + nm2.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096), + 10)), null); + + // Reserve new container + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b3-b1"); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", "RESERVED"); + + // Do a node heartbeat again without releasing container from app2 + r = resource(); + params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "b1"); + + allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", "SKIPPED"); + + // Finish application 2 + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ContainerId containerId = ContainerId.newContainerId( + am2.getApplicationAttemptId(), 1); + cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED); + + // Do a node heartbeat again + r = resource(); + params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.2"); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + verifyQueueOrder(json.getJSONObject("allocations"), "b1"); + + allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", + "ALLOCATED_FROM_RESERVED"); + } + finally { + rm.stop(); + } + } + + @Test + public void testActivityJSON() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", + "ALLOCATED"); + + verifyNumberOfNodes(allocations, 6); + + verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1"); + } + finally { + rm.stop(); + } + } + + private void verifyNumberOfNodes(JSONObject allocation, int realValue) + throws Exception { + if (allocation.isNull("root")) { + assertEquals("State of allocation is wrong", 0, realValue); + } else { + assertEquals("State of allocation is wrong", + 1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue); + } + } + + private int getNumberOfNodes(JSONObject allocation) throws Exception { + if (!allocation.isNull("children")) { + Object object = allocation.get("children"); + if (object.getClass() == JSONObject.class) { + return 1 + getNumberOfNodes((JSONObject) object); + } else { + int count = 0; + for (int i = 0; i < ((JSONArray) object).length(); i++) { + count += (1 + getNumberOfNodes( + ((JSONArray) object).getJSONObject(i))); + } + return count; + } + } else { + return 0; + } + } + + private void verifyStateOfAllocations(JSONObject allocation, + String nameToCheck, String realState) throws Exception { + assertEquals("State of allocation is wrong", allocation.get(nameToCheck), + realState); + } + + private void verifyNumberOfAllocations(JSONObject json, int realValue) + throws Exception { + if (json.isNull("allocations")) { + assertEquals("Number of allocations is wrong", 0, realValue); + } else { + Object object = json.get("allocations"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations is wrong", + ((JSONArray) object).length(), realValue); + } + } + } + + private void verifyQueueOrder(JSONObject json, String realOrder) + throws Exception { + String order = ""; + if (!json.isNull("root")) { + JSONObject root = json.getJSONObject("root"); + order = root.getString("name") + "-" + getQueueOrder(root); + } + assertEquals("Order of queue is wrong", + order.substring(0, order.length() - 1), realOrder); + } + + private String getQueueOrder(JSONObject node) throws Exception { + if (!node.isNull("children")) { + Object children = node.get("children"); + if (children.getClass() == JSONObject.class) { + if (!((JSONObject) children).isNull("appPriority")) { + return ""; + } + return ((JSONObject) children).getString("name") + "-" + getQueueOrder( + (JSONObject) children); + } else if (children.getClass() == JSONArray.class) { + String order = ""; + for (int i = 0; i < ((JSONArray) children).length(); i++) { + JSONObject child = (JSONObject) ((JSONArray) children).get(i); + if (!child.isNull("appPriority")) { + return ""; + } + order += (child.getString("name") + "-" + getQueueOrder(child)); + } + return order; + } + } + return ""; + } + + private void verifyNumberOfAllocationAttempts(JSONObject allocation, + int realValue) throws Exception { + if (allocation.isNull("allocationAttempt")) { + assertEquals("Number of allocation attempts is wrong", 0, realValue); + } else { + Object object = allocation.get("allocationAttempt"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations attempts is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations attempts is wrong", + ((JSONArray) object).length(), realValue); + } + } + } + + @Test + public void testAppActivityJSON() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED"); + + verifyNumberOfAllocationAttempts(allocations, 1); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppAssignMultipleContainersPerNodeHeartbeat() + throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 10); + + JSONArray allocations = json.getJSONArray("allocations"); + for (int i = 0; i < allocations.length(); i++) { + verifyStateOfAllocations(allocations.getJSONObject(i), + "allocationState", "ACCEPTED"); + } + } + finally { + rm.stop(); + } + } + + @Test + public void testAppAssignWithoutAvailableResource() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 10), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 10)), null); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppNoNM() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 0); + } + finally { + rm.stop(); + } + } + + @Test + public void testAppReserveNewContainer() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024, + rm.getResourceTrackerService()); + MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024, + rm.getResourceTrackerService()); + + nm1.registerNode(); + nm2.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096), + 10)), null); + + // Reserve new container + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + // Do a node heartbeat again without releasing container from app2 + r = resource(); + params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 2); + + // Finish application 2 + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ContainerId containerId = ContainerId.newContainerId( + am2.getApplicationAttemptId(), 1); + cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED); + + // Do a node heartbeat again + r = resource(); + params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + nm2.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 3); + } + finally { + rm.stop(); + } + } + +}