diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 64eb777..675de13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -97,6 +97,8 @@ private volatile Priority maxClusterLevelAppPriority; + protected ActivityManager activityManager; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -789,4 +791,9 @@ private SchedContainerChangeRequest createSchedContainerChangeRequest( } return schedulerChangeRequests; } + + public ActivityManager getActivityManager() { + return this.activityManager; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityDiagnosticConstant.java new file mode 100644 index 0000000..300a35f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityDiagnosticConstant.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +/* + * Collection of diagnostics. + */ +public class ActivityDiagnosticConstant { + // EMPTY means it does not have any diagnostic to display. In order not to show "diagnostic" line in frontend, we set the value to null. + public final static String EMPTY = null; + public final static String NOT_ABLE_TO_ACCESS_PARTITION = + "not able to access partition"; + public final static String DO_NOT_NEED_MORE_RESOURCE = + "do not need more resource"; + public final static String QUEUE_MAX_CAPACITY_LIMIT = + "queue max-capacity limit"; + public final static String USER_CAPACITY_MAXIMUM_LIMIT = + "user capacity maximum limit"; + public final static String SKIP_BLACK_LISTED_NODE = + "skip am allocation in black listed node"; + public final static String PRIORITY_SKIPPED = "priority skipped"; + public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE = + "skipping assigning to Node in Ignore Exclusivity mode"; + public final static String DO_NOT_NEED_ALLOCATIONATTEMPTINFOS = + "do not need allocationAttemptInfos based on reservation algo"; + public final static String QUEUE_SKIPPED_HEADROOM = + "queue skipped because of headroom"; + public final static String NON_PARTITIONED_PARTITION_FIRST = + "non-partitioned resource request should be scheduled to non-partitioned partition first"; + public final static String SKIP_NODE_LOCAL_REQUEST = + "skip node-local request"; + public final static String SKIP_RACK_LOCAL_REQUEST = + "skip rack-local request"; + public final static String SKIP_AM_ALLOCATION = + "skip am allocation due to locality"; + public final static String REQUEST_CAN_NOT_ACCESS = + "resource request can not access the label"; + public final static String NOT_SUFFICIENT_RESOURCE = + "node does not have sufficient resource for request"; + public final static String LOCALITY_SKIPPED = "locality skipped"; + public final static String FAIL_TO_ALLOCATE = "fail to allocate"; + public final static String COULD_NOT_GET_CONTAINER = + "couldn't get container for allocation"; + public final static String DO_NOT_NEED_RESOURCE = "do not need more resource"; + public final static String SKIPPED_ALL_PRIORITIES = + "skipped all priorities of the app"; + public final static String RESPECT_FIFO = "respect FIFO of applications"; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityManager.java new file mode 100644 index 0000000..81085fd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityManager.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.List; +import java.util.Set; +import java.util.*; +import java.util.ArrayList; +import java.util.Date; + +/* + * A class to store node or application allocations. + * It mainly contains operations for allocation start, add, update and finish. + */ +public class ActivityManager { + private static final Log LOG = LogFactory.getLog(ActivityManager.class); + private ConcurrentMap> recordingNodesAllocation; + private ConcurrentMap> completedNodeAllocations; + private Set activeRecordedNodes; + private ConcurrentMap + recordingAppActivitiesUntilSpecifiedTime; + private ConcurrentMap appsAllocation; + private ConcurrentMap> + completedAppAllocations; + private boolean recordNextAvailableNode = false; + private List lastAvailableNodeActivities = null; + + public ActivityManager() { + recordingNodesAllocation = new ConcurrentHashMap<>(); + completedNodeAllocations = new ConcurrentHashMap<>(); + appsAllocation = new ConcurrentHashMap<>(); + completedAppAllocations = new ConcurrentHashMap<>(); + activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>(); + } + + public void startNodeUpdateRecording(NodeId nodeID) { + if (recordNextAvailableNode) { + startRecordNodeUpdateActivities(nodeID.toString()); + } + if (activeRecordedNodes.contains(nodeID)) { + List nodeAllocation = new ArrayList<>(); + recordingNodesAllocation.put(nodeID, nodeAllocation); + } + } + + public void startAppUpdateRecording(NodeId nodeID, + ApplicationId applicationId, Long currTS, String queueName, + Priority priority) { + if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + > currTS) { + appsAllocation.put(applicationId, + new AppAllocation(priority, nodeID, queueName)); + } + + if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + <= currTS) + turnOffAppUpdate(applicationId); + } + + // Add queue, application or container activity into specific node allocation. + public void addActivity(NodeId nodeID, String parentName, String childName, + String priority, ActivityState state, String diagnostic, + AllocationActivityType type) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.addAllocationActivity(parentName, childName, priority, + state, diagnostic, type); + } + } + + public void addAppActivity(ApplicationId applicationId, String containerId, + String priority, ActivityState state, String diagnostic) { + if (shouldRecordThisApp(applicationId)) { + AppAllocation appAllocation = appsAllocation.get(applicationId); + appAllocation.addAppAllocationActivity(containerId, priority, state, + diagnostic); + } + } + + // Update state and diagnostic of existed activity in node allocation. Unlike addActivity, it does not add new activity. + public void updateActivityState(NodeId nodeID, String name, + ActivityState state, String diagnostic) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.updateState(name, state, diagnostic); + } + } + + // Update container allocation meta status for this node allocation. + // It updates general container status but not the detailed activity state in updateActivityState. + public void updateAllocationFinalState(NodeId nodeID, ContainerId containerId, + AllocationState containerState) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.updateContainerState(containerId, containerState); + } + } + + public void finishAppAllocationRecording(ApplicationId applicationId, + ContainerId containerId, ActivityState appState) { + if (shouldRecordThisApp(applicationId)) { + Date date = new Date(); + long currTS = date.getTime(); + AppAllocation appAllocation = appsAllocation.get(applicationId); + appAllocation.updateAppContainerStateAndTime(containerId, appState, + currTS); + + List appAllocations; + if (completedAppAllocations.containsKey(applicationId)) + appAllocations = completedAppAllocations.get(applicationId); + else { + appAllocations = new ArrayList<>(); + completedAppAllocations.put(applicationId, appAllocations); + } + appAllocations.add(appsAllocation.remove(applicationId)); + + if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId) <= currTS) + turnOffAppUpdate(applicationId); + } + } + + public void finishNodeUpdateRecording(NodeId nodeID) { + List value = recordingNodesAllocation.get(nodeID); + Date date = new Date(); + long timeStamp = date.getTime(); + + if (value != null) { + if (value.size() > 0) { + lastAvailableNodeActivities = value; + for (NodeAllocation allocation : lastAvailableNodeActivities) { + allocation.transformToTree(); + allocation.setTimeStamp(timeStamp); + } + if (recordNextAvailableNode) { + recordNextAvailableNode = false; + } + } + + if (shouldRecordThisNode(nodeID)) { + recordingNodesAllocation.remove(nodeID); + completedNodeAllocations.put(nodeID, value); + stopRecordNodeUpdateActivities(nodeID); + } + } + } + + public boolean shouldRecordThisApp(ApplicationId applicationId) { + return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && appsAllocation.containsKey(applicationId); + } + + public boolean shouldRecordThisNode(NodeId nodeID) { + return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation + .containsKey(nodeID); + } + + private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) { + List nodeAllocations = recordingNodesAllocation.get(nodeID); + NodeAllocation nodeAllocation; + // When this node has already stored allocation activities, get the last allocation for this node. + if (nodeAllocations.size() != 0) { + nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1); + // When final state in last allocation is not DEFAULT, it means last allocation has finished. Create a new allocation for this node, and add it to the allocation list. Return this new allocation. + // When final state in last allocation is DEFAULT, it means last allocation has not finished. Just get last allocation. + if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) { + nodeAllocation = new NodeAllocation(nodeID); + nodeAllocations.add(nodeAllocation); + } + } + // When this node has not stored allocation activities, create a new allocation for this node, and add it to the allocation list. Return this new allocation. + else { + nodeAllocation = new NodeAllocation(nodeID); + nodeAllocations.add(nodeAllocation); + } + return nodeAllocation; + } + + public List getAllocations(String nodeId) { + if (nodeId == null) { + return lastAvailableNodeActivities; + } else { + return completedNodeAllocations.get(ConverterUtils.toNodeId(nodeId)); + } + } + + public void startRecordNodeUpdateActivities(String nodeId) { + if (nodeId == null) { + recordNextAvailableNode = true; + } else { + activeRecordedNodes.add(ConverterUtils.toNodeId(nodeId)); + } + } + + public void turnOnAppUpdate(ApplicationId applicationId, double maxTime) { + Date date = new Date(); + long startTS = date.getTime(); + long endTS = startTS + (long) (maxTime * 1000); + recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS); + } + + private void stopRecordNodeUpdateActivities(NodeId nodeId) { + activeRecordedNodes.remove(nodeId); + } + + private void turnOffAppUpdate(ApplicationId applicationId) { + recordingAppActivitiesUntilSpecifiedTime.remove(applicationId); + } + + public List getAppAllocations(ApplicationId applicationId) { + List allocations = completedAppAllocations.get( + applicationId); + if (allocations != null) { + completedAppAllocations.remove(applicationId); + } + return allocations; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityNode.java new file mode 100644 index 0000000..434cf6b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityNode.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.ArrayList; +import java.util.List; + +/* + * It represents tree node in "NodeAllocation" tree structure. + * Each node may represent queue, application or container in allocation activity. + * Node may have children node if successfully allocated to next level. + */ +public class ActivityNode { + private String activityNodeName; + private String priority; + private ActivityState state; + private String diagnostic; + + private List childNode; + + public ActivityNode(String activityNodeName, String priority, + ActivityState state, String diagnostic) { + this.activityNodeName = activityNodeName; + this.priority = priority; + this.state = state; + this.diagnostic = diagnostic; + this.childNode = new ArrayList<>(); + } + + public String getName() { + return this.activityNodeName; + } + + public void addChild(ActivityNode node) { + childNode.add(node); + } + + public List getChildren() { + return this.childNode; + } + + public ActivityState getState() { + return this.state; + } + + public String getDiagnostic() { + return this.diagnostic; + } + + public String getPriority() { + return priority; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.activityNodeName + " "); + sb.append(this.priority + " "); + sb.append(this.state + " "); + if (!this.diagnostic.equals("")) + sb.append(this.diagnostic + "\n"); + sb.append("\n"); + for (ActivityNode child : childNode) { + sb.append(child.toString() + "\n"); + } + return sb.toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityState.java new file mode 100644 index 0000000..4e2f800 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActivityState.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +/* + * Collection of activity operation states. + */ +public enum ActivityState { + DEFAULT, //default state when adding a new activity in node allocation + ACCEPTED, + //container is allocated to sub-queues/applications or this queue/application + SKIPPED, + //queue or application voluntarily give up to use the resource OR nothing allocated + REJECTED, //container could not be allocated to sub-queues or this application + ALLOCATED, //successfully allocate a new non-reserved container +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationActivity.java new file mode 100644 index 0000000..8959044 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationActivity.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * It records an activity operation in allocation, + * which can be classified as queue, application or container activity. + * Other information include state, diagnostic, priority. + */ +public class AllocationActivity { + private String childName = null; + private String parentName = null; + private String priority = null; + private ActivityState state; + private String diagnostic; + private AllocationActivityType type = null; + + private static final Log LOG = LogFactory.getLog(AllocationActivity.class); + + public AllocationActivity(String parentName, String queueName, + String priority, ActivityState state, String diagnostic, + AllocationActivityType type) { + this.childName = queueName; + this.parentName = parentName; + this.priority = priority; + this.state = state; + this.diagnostic = diagnostic; + this.type = type; + } + + public ActivityNode createTreeNode() { + return new ActivityNode(this.childName, this.priority, this.state, + this.diagnostic); + } + + public String getParentName() { + return this.parentName; + } + + public String getName() { + return this.childName; + } + + public void changeState(ActivityState state, String diagnostic) { + this.state = state; + this.diagnostic = diagnostic; + } + + public AllocationActivityType getType() { + return this.type; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationActivityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationActivityType.java new file mode 100644 index 0000000..d0a93b5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationActivityType.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +/* + * Collection of types for activity operation. + */ +public enum AllocationActivityType { + queue, + app, + container +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationState.java new file mode 100644 index 0000000..2a54773 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AllocationState.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +/* + * Collection of allocation final states. + */ +public enum AllocationState { + DEFAULT, + SKIPPED, //queue or application voluntarily give up to use the resource OR nothing allocated + ALLOCATED, //successfully allocate a new non-reserved container + ALLOCATED_FROM_RESERVED //successfully allocate a new container from an existing reserved container +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppAllocation.java new file mode 100644 index 0000000..3d6404b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppAllocation.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; + +import java.util.ArrayList; +import java.util.List; + +/* + * It contains allocation information for one application within a period of time. + * Each application allocation may have several allocation attempts. + */ +public class AppAllocation { + private Priority priority = null; + private NodeId nodeId; + private ContainerId containerId = null; + private ActivityState appState = null; + private String queueName = null; + private List allocationAttempts; + private long timestamp; + + public AppAllocation(Priority priority, NodeId nodeId, String queueName) { + this.priority = priority; + this.nodeId = nodeId; + this.allocationAttempts = new ArrayList<>(); + this.queueName = queueName; + } + + public void updateAppContainerStateAndTime(ContainerId containerId, + ActivityState appState, long ts) { + this.timestamp = ts; + this.containerId = containerId; + this.appState = appState; + } + + public void addAppAllocationActivity(String containerId, String priority, + ActivityState state, String diagnostic) { + ActivityNode container = new ActivityNode(containerId, priority, state, + diagnostic); + this.allocationAttempts.add(container); + if (state == ActivityState.REJECTED) + this.appState = ActivityState.SKIPPED; + else + this.appState = state; + } + + public String getNodeId() { + return nodeId.toString(); + } + + public String getQueueName() { + return queueName; + } + + public ActivityState getAppState() { + return appState; + } + + public String getPriority() { + if (priority == null) + return null; + return priority.toString(); + } + + public String getContainerId() { + if (containerId == null) + return null; + return containerId.toString(); + } + + public long getTime() { + return this.timestamp; + } + + public List getAllocationAttempts() { + return allocationAttempts; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeAllocation.java new file mode 100644 index 0000000..5acd60e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeAllocation.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/* + * It contains allocation information for one allocation in a node heartbeat. + * Detailed allocation activities are first stored in "AllocationActivity" as operations, + * then transformed to a tree structure. + * Tree structure starts from root queue and ends in leaf queue, application or container allocation. + */ +public class NodeAllocation { + private NodeId nodeId; + private long timeStamp; + private ContainerId containerId = null; + private AllocationState containerState = AllocationState.DEFAULT; + private List allocationOperations; + + // For each activity, use its own nameId as index for future lookup when creating allocation activity tree. + private ConcurrentMap nameToActivity; + + private ActivityNode root = null; + + private static final Log LOG = LogFactory.getLog(NodeAllocation.class); + + public NodeAllocation(NodeId nodeId) { + this.nodeId = nodeId; + this.allocationOperations = new ArrayList<>(); + this.nameToActivity = new ConcurrentHashMap<>(); + } + + public void addAllocationActivity(String parentName, String childName, + String priority, ActivityState state, String diagnostic, + AllocationActivityType type) { + AllocationActivity allocate = new AllocationActivity(parentName, childName, + priority, state, diagnostic, type); + this.allocationOperations.add(allocate); + if (type == AllocationActivityType.queue + || type == AllocationActivityType.app) { + this.nameToActivity.put(childName, allocate); + } + } + + public void updateState(String name, + ActivityState state, String diagnostic) { + LOG.info(nameToActivity.size()); + nameToActivity.get(name).changeState(state, + diagnostic); + } + + public void updateContainerState(ContainerId containerId, + AllocationState containerState) { + this.containerId = containerId; + this.containerState = containerState; + } + + // In node allocation, transform each activity to a tree-like structure for frontend activity display. + // eg: root + // / \ + // a b + // / \ + // app1 app2 + // / \ + // CA1 CA2 + // CA means Container Attempt + public void transformToTree() { + ConcurrentMap relations = new ConcurrentHashMap<>(); + if (root == null) { + for (AllocationActivity allocationOperation : allocationOperations) { + if (root == null) { + root = allocationOperation.createTreeNode(); + relations.put(root.getName(), root); + } else { + AllocationActivityType type = allocationOperation.getType(); + String parentName = allocationOperation.getParentName(); + + ActivityNode queue = relations.get(parentName); + ActivityNode node = allocationOperation.createTreeNode(); + queue.addChild(node); + + if (type == AllocationActivityType.queue + || type == AllocationActivityType.app) { + relations.put(node.getName(), node); + } + } + } + } + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public long getTimeStamp() { + return this.timeStamp; + } + + public AllocationState getFinalAllocationState() { + return containerState; + } + + public String getContainerId() { + if (containerId == null) + return null; + return containerId.toString(); + } + + public ActivityNode getRoot() { + return root; + } + + public String getNodeId() { + return nodeId.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index dc90c5b..49456f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -47,14 +47,18 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec; public abstract class AbstractCSQueue implements CSQueue { private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); @@ -90,12 +94,16 @@ protected CapacitySchedulerContext csContext; protected YarnAuthorizationProvider authorizer = null; - public AbstractCSQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) throws IOException { + protected ActivityManager activityManager; + + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old, ActivityManager activityManager) + throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); this.parent = parent; this.queueName = queueName; this.resourceCalculator = cs.getResourceCalculator(); + this.activityManager = activityManager; // must be called after parent and queueName is set this.metrics = @@ -687,4 +695,12 @@ public Resource getTotalKillableResource(String partition) { return csContext.getPreemptionManager().getKillableContainers(queueName, partition); } + + protected void updateActivityState(FiCaSchedulerNode node, + ActivityState updatedState, String updatedDiagnostic) { + if (activityManager.shouldRecordThisNode(node.getNodeID())) { + activityManager.updateActivityState(node.getNodeID(), getQueueName(), + updatedState, updatedDiagnostic); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee62a70..99e91b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -92,22 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; @@ -307,6 +292,7 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); authorizer = YarnAuthorizationProvider.getInstance(yarnConf); + this.activityManager = new ActivityManager(); initializeQueues(this.conf); this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); @@ -523,7 +509,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) Map newQueues = new HashMap(); CSQueue newRoot = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - newQueues, queues, noop); + newQueues, queues, noop); // Ensure all existing queues are still present validateExistingQueues(queues, newQueues); @@ -640,7 +626,8 @@ static CSQueue parseQueue( } else { queue = new LeafQueue(csContext, queueName, parent, - oldQueues.get(queueName)); + oldQueues.get(queueName), + ((CapacityScheduler) csContext).activityManager); // Used only for unit tests queue = hook.hook(queue); @@ -650,8 +637,9 @@ static CSQueue parseQueue( throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); + ParentQueue parentQueue = new ParentQueue(csContext, queueName, parent, + oldQueues.get(queueName), + ((CapacityScheduler) csContext).activityManager); // Used only for unit tests queue = hook.hook(parentQueue); @@ -802,7 +790,7 @@ private synchronized void addApplicationAttempt( FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority(), isAttemptRecovering); + application.getPriority(), isAttemptRecovering, activityManager); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); @@ -1255,6 +1243,17 @@ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { tmp.getAssignmentInformation().incrAllocations(); updateSchedulerHealth(lastNodeUpdateTime, node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + + if (activityManager.shouldRecordThisNode(node.getNodeID())) { + activityManager.addActivity(node.getNodeID(), queue.getQueueName(), + reservedApplication.getApplicationId().toString(), + reservedApplication.getPriority().toString(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY, + AllocationActivityType.app); + activityManager.updateAllocationFinalState(node.getNodeID(), + reservedContainer.getContainerId(), + AllocationState.ALLOCATED_FROM_RESERVED); + } } } @@ -1364,7 +1363,9 @@ public void handle(SchedulerEvent event) { setLastNodeUpdateTime(Time.now()); nodeUpdate(node); if (!scheduleAsynchronously) { + activityManager.startNodeUpdateRecording(node.getNodeID()); allocateContainersToNode(getNode(node.getNodeID())); + activityManager.finishNodeUpdateRecording(node.getNodeID()); } } break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6dcafec..72e9684 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -19,15 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -42,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -59,14 +52,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -129,12 +116,13 @@ new HashMap<>(); @SuppressWarnings({ "unchecked", "rawtypes" }) - public LeafQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) throws IOException { - super(cs, queueName, parent, old); + public LeafQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old, ActivityManager activityManager) + throws IOException { + super(cs, queueName, parent, old, activityManager); this.scheduler = cs; - this.activeUsersManager = new ActiveUsersManager(metrics); + this.activeUsersManager = new ActiveUsersManager(metrics); // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); @@ -143,10 +131,15 @@ public LeafQueue(CapacitySchedulerContext cs, LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); } - + setupQueueConfigs(cs.getClusterResource()); } + public LeafQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old) throws IOException { + this(cs, queueName, parent, old, null); + } + protected synchronized void setupQueueConfigs(Resource clusterResource) throws IOException { super.setupQueueConfigs(clusterResource); @@ -861,7 +854,35 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } - + + private void recordActivity(FiCaSchedulerNode node, String parentName, + String childName, String priority, ActivityState state, String diagnostic, + AllocationActivityType type) { + if (activityManager.shouldRecordThisNode(node.getNodeID())) { + activityManager.addActivity(node.getNodeID(), parentName, childName, + priority, state, diagnostic, type); + } + } + + private void recordActivity(FiCaSchedulerNode node, ActivityState state, + String diagnostic, AllocationActivityType type) { + recordActivity(node, getParent().getQueueName(), getQueueName(), + Priority.UNDEFINED.toString(), state, diagnostic, type); + } + + private void finishAppAllocationRecording(ApplicationId applicationId, + ContainerId containerId, ActivityState containerState) { + if (activityManager.shouldRecordThisApp(applicationId)) { + activityManager.finishAppAllocationRecording(applicationId, containerId, + containerState); + } + } + + private void finishAppAllocationRecording(ApplicationId applicationId, + ActivityState containerState) { + finishAppAllocationRecording(applicationId, null, containerState); + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, @@ -894,6 +915,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { + recordActivity(node, ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition(), AllocationActivityType.queue); return CSAssignment.NULL_ASSIGNMENT; } @@ -906,17 +930,37 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } + recordActivity(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.DO_NOT_NEED_MORE_RESOURCE, + AllocationActivityType.queue); return CSAssignment.NULL_ASSIGNMENT; } + recordActivity(node, ActivityState.DEFAULT, + ActivityDiagnosticConstant.EMPTY, AllocationActivityType.queue); + for (Iterator assignmentIterator = orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) { FiCaSchedulerApp application = assignmentIterator.next(); + Date date = new Date(); + activityManager.startAppUpdateRecording(node.getNodeID(), + application.getApplicationId(), date.getTime(), getQueueName(), + application.getPriority()); // Check queue max-capacity limit if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { + recordActivity(node, getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.REJECTED, + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT, + AllocationActivityType.app); + updateActivityState(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + finishAppAllocationRecording(application.getApplicationId(), + ActivityState.REJECTED); + return CSAssignment.NULL_ASSIGNMENT; } @@ -929,9 +973,24 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, application, node.getPartition(), currentResourceLimits)) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); + recordActivity(node, getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.REJECTED, + ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT, + AllocationActivityType.app); + updateActivityState(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + finishAppAllocationRecording(application.getApplicationId(), + ActivityState.REJECTED); + continue; } + recordActivity(node, getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.DEFAULT, + ActivityDiagnosticConstant.EMPTY, AllocationActivityType.app); + // Try to schedule CSAssignment assignment = application.assignContainers(clusterResource, node, @@ -970,13 +1029,28 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, incReservedResource(node.getPartition(), reservedRes); } + updateActivityState(node, ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + finishAppAllocationRecording(application.getApplicationId(), + assignment.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + ActivityState.ACCEPTED); + // Done return assignment; } else if (assignment.getSkipped()) { + updateActivityState(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + finishAppAllocationRecording(application.getApplicationId(), + ActivityState.SKIPPED); application.updateNodeInfoForAMDiagnostics(node); } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications + updateActivityState(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.RESPECT_FIFO); + finishAppAllocationRecording(application.getApplicationId(), + ActivityState.SKIPPED); return CSAssignment.NULL_ASSIGNMENT; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c1..ec1383d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -28,7 +28,9 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; @@ -37,18 +39,12 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -82,9 +78,10 @@ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - public ParentQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) throws IOException { - super(cs, queueName, parent, old); + public ParentQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old, ActivityManager activityManager) + throws IOException { + super(cs, queueName, parent, old, activityManager); this.scheduler = cs; this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator(); this.partitionQueueComparator = cs.getPartitionedQueueComparator(); @@ -99,14 +96,19 @@ public ParentQueue(CapacitySchedulerContext cs, "capacity of " + rawCapacity + " for queue " + queueName + ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - + this.childQueues = new TreeSet(nonPartitionedQueueComparator); - + setupQueueConfigs(cs.getClusterResource()); - LOG.info("Initialized parent-queue " + queueName + - " name=" + queueName + - ", fullname=" + getQueuePath()); + LOG.info("Initialized parent-queue " + queueName + + " name=" + queueName + + ", fullname=" + getQueuePath()); + } + + public ParentQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old) throws IOException { + this(cs, queueName, parent, old, null); } synchronized void setupQueueConfigs(Resource clusterResource) @@ -381,6 +383,30 @@ private synchronized void removeApplication(ApplicationId applicationId, " #applications: " + getNumApplications()); } + private void recordActivity(FiCaSchedulerNode node, ActivityState state, + String diagnostic, AllocationActivityType type) { + String parentName = ""; + if (getParent() != null) + parentName = getParent().getQueueName(); + if (activityManager.shouldRecordThisNode(node.getNodeID())) { + activityManager.addActivity(node.getNodeID(), parentName, getQueueName(), + Priority.UNDEFINED.toString(), state, diagnostic, type); + } + } + + private void updateAllocationFinalState(FiCaSchedulerNode node, + AllocationState containerState) { + updateAllocationFinalState(node, null, containerState); + } + + private void updateAllocationFinalState(FiCaSchedulerNode node, + ContainerId containerId, AllocationState containerState) { + if (activityManager.shouldRecordThisNode(node.getNodeID()) && rootQueue) { + activityManager.updateAllocationFinalState(node.getNodeID(), containerId, + containerState); + } + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, @@ -393,6 +419,12 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it is not able to access partition=" + node .getPartition()); } + + recordActivity(node, ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition(), AllocationActivityType.queue); + updateAllocationFinalState(node, AllocationState.SKIPPED); + return CSAssignment.NULL_ASSIGNMENT; } @@ -405,6 +437,12 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } + + recordActivity(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.DO_NOT_NEED_MORE_RESOURCE, + AllocationActivityType.queue); + updateAllocationFinalState(node, AllocationState.SKIPPED); + return CSAssignment.NULL_ASSIGNMENT; } @@ -424,10 +462,20 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, resourceLimits, Resources.createResource( getMetrics().getReservedMB(), getMetrics() .getReservedVirtualCores()), schedulingMode)) { + + recordActivity(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT, + AllocationActivityType.queue); + updateAllocationFinalState(node, AllocationState.SKIPPED); + break; } - + // Schedule + recordActivity(node, ActivityState.DEFAULT, + ActivityDiagnosticConstant.EMPTY, + AllocationActivityType.queue); + CSAssignment assignedToChild = assignContainersToChildQueues(clusterResource, node, resourceLimits, schedulingMode); @@ -437,6 +485,12 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if (Resources.greaterThan( resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { + + updateAllocationFinalState(node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.ALLOCATED); + // Track resource utilization for the parent-queue allocateResource(clusterResource, assignedToChild.getResource(), node.getPartition(), assignedToChild.isIncreasedAllocation()); @@ -474,6 +528,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, " cluster=" + clusterResource); } else { + updateAllocationFinalState(node, AllocationState.SKIPPED); break; } @@ -585,8 +640,8 @@ private synchronized CSAssignment assignContainersToChildQueues( // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); - - assignment = childQueue.assignContainers(cluster, node, + + assignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + @@ -598,6 +653,9 @@ private synchronized CSAssignment assignContainersToChildQueues( if (Resources.greaterThan( resourceCalculator, cluster, assignment.getResource(), Resources.none())) { + updateActivityState(node, ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + // Only update childQueues when we doing non-partitioned node // allocation. if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { @@ -612,6 +670,8 @@ private synchronized CSAssignment assignContainersToChildQueues( } break; } + updateActivityState(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); } return assignment; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235..a081bdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -24,6 +24,10 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AllocationActivityType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -43,23 +47,31 @@ FiCaSchedulerApp application; final ResourceCalculator rc; final RMContext rmContext; - + ActivityManager activityManager; + public AbstractContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { + this(application, rc, rmContext, null); + } + + public AbstractContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext, + ActivityManager activityManager) { this.application = application; this.rc = rc; this.rmContext = rmContext; + this.activityManager = activityManager; } protected CSAssignment getCSAssignmentFromAllocateResult( Resource clusterResource, ContainerAllocation result, - RMContainer rmContainer) { + RMContainer rmContainer, FiCaSchedulerNode node) { // Handle skipped boolean skipped = (result.getAllocationState() == AllocationState.APP_SKIPPED); CSAssignment assignment = new CSAssignment(skipped); assignment.setApplication(application); - + // Handle excess reservation assignment.setExcessReservation(result.getContainerToBeUnreserved()); @@ -103,21 +115,39 @@ protected CSAssignment getCSAssignmentFromAllocateResult( assignment.getAssignmentInformation().incrAllocations(); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), allocatedResource); - + if (rmContainer != null) { assignment.setFulfilledReservation(true); } + if (activityManager.shouldRecordThisNode(node.getNodeID())) { + activityManager.addActivity(node.getNodeID(), + application.getApplicationId().toString(), + updatedContainer.getId().toString(), + updatedContainer.getPriority().toString(), + ActivityState.ALLOCATED, ActivityDiagnosticConstant.EMPTY, + AllocationActivityType.container); + activityManager.updateActivityState(node.getNodeID(), + application.getApplicationId().toString(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + } + if (activityManager.shouldRecordThisApp( + application.getApplicationId())) { + activityManager.addAppActivity(application.getApplicationId(), + updatedContainer.getId().toString(), + updatedContainer.getPriority().toString(), + ActivityState.ALLOCATED, ActivityDiagnosticConstant.EMPTY); + } } assignment.setContainersToKill(result.getToKillContainers()); } - + return assignment; } - + /** * allocate needs to handle following stuffs: - * + * *
    *
  • Select request: Select a request to allocate. E.g. select a resource * request based on requirement/priority/locality.
  • diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 3be8e0e..2001300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -36,12 +37,17 @@ public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { + this(application, rc, rmContext, null); + } + + public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, + RMContext rmContext, ActivityManager activityManager) { super(application, rc, rmContext); - increaseContainerAllocator = - new IncreaseContainerAllocator(application, rc, rmContext); - regularContainerAllocator = - new RegularContainerAllocator(application, rc, rmContext); + increaseContainerAllocator = new IncreaseContainerAllocator(application, rc, + rmContext); + regularContainerAllocator = new RegularContainerAllocator(application, rc, + rmContext, activityManager); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index aae5292..7d3817c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -29,10 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -53,10 +50,11 @@ private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); private ResourceRequest lastResourceRequest = null; - + public RegularContainerAllocator(FiCaSchedulerApp application, - ResourceCalculator rc, RMContext rmContext) { - super(application, rc, rmContext); + ResourceCalculator rc, RMContext rmContext, + ActivityManager activityManager) { + super(application, rc, rmContext, activityManager); } private boolean checkHeadroom(Resource clusterResource, @@ -77,6 +75,37 @@ private boolean checkHeadroom(Resource clusterResource, required); } + private void recordAndUpdateSkippedActivity(FiCaSchedulerNode node, + ActivityState state, String diagnostic, AllocationActivityType type) { + recordAndUpdateActivity(node, Priority.UNDEFINED.toString(), state, + diagnostic, type, state); + } + + private void recordAndUpdateSkippedActivity(FiCaSchedulerNode node, + String priority, ActivityState state, String diagnostic, + AllocationActivityType type) { + recordAndUpdateActivity(node, priority, state, diagnostic, type, state); + } + + private void recordAndUpdateActivity(FiCaSchedulerNode node, String priority, + ActivityState state, String diagnostic, AllocationActivityType type, + ActivityState appState) { + if (activityManager.shouldRecordThisNode(node.getNodeID())) { + // Add application-container activity into specific node allocation. Under this condition, it fails to allocate a container, so containerId is null. + activityManager.addActivity(node.getNodeID(), + application.getApplicationId().toString(), null, priority, state, + diagnostic, type); + // Update state and diagnostic of existed application activity in node allocation. + activityManager.updateActivityState(node.getNodeID(), + application.getApplicationId().toString(), state, + ActivityDiagnosticConstant.EMPTY); + } + // Add application-container activity into specific application allocation. Under this condition, it fails to allocate a container to this application, so containerId is null. + if (activityManager.shouldRecordThisApp(application.getApplicationId())) { + activityManager.addAppActivity(application.getApplicationId(), null, + priority, appState, diagnostic); + } + } private ContainerAllocation preCheckForNewContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, @@ -84,12 +113,19 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE, + AllocationActivityType.container); return ContainerAllocation.APP_SKIPPED; } ResourceRequest anyRequest = application.getResourceRequest(priority, ResourceRequest.ANY); if (null == anyRequest) { + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.PRIORITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -98,6 +134,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, // Do we need containers at this 'priority'? if (application.getTotalRequiredResources(priority) <= 0) { + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.PRIORITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -112,6 +151,10 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, } application.updateAppSkipNodeDiagnostics( "Skipping assigning to Node in Ignore Exclusivity mode. "); + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE, + AllocationActivityType.container); return ContainerAllocation.APP_SKIPPED; } } @@ -122,6 +165,9 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( anyRequest.getNodeLabelExpression(), node.getPartition(), schedulingMode)) { + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.PRIORITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -130,6 +176,10 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug("doesn't need containers based on reservation algo!"); } + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS, + AllocationActivityType.container); return ContainerAllocation.PRIORITY_SKIPPED; } } @@ -139,6 +189,10 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, LOG.debug("cannot allocate required resource=" + required + " because of headroom"); } + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM, + AllocationActivityType.container); return ContainerAllocation.QUEUE_SKIPPED; } @@ -170,7 +224,10 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, + missedNonPartitionedRequestSchedulingOpportunity + " required=" + rmContext.getScheduler().getNumClusterNodes()); } - + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST, + AllocationActivityType.container); return ContainerAllocation.APP_SKIPPED; } } @@ -294,6 +351,10 @@ private ContainerAllocation assignNodeLocalContainers( } // Skip node-local request, go to rack-local request + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST, + AllocationActivityType.container); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -308,6 +369,10 @@ private ContainerAllocation assignRackLocalContainers( } // Skip rack-local request, go to off-switch request + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST, + AllocationActivityType.container); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -323,6 +388,9 @@ private ContainerAllocation assignOffSwitchContainers( application.updateAppSkipNodeDiagnostics( CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY); + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.SKIP_AM_ALLOCATION, + AllocationActivityType.container); return ContainerAllocation.APP_SKIPPED; } @@ -354,6 +422,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, application.getResourceRequest(priority, node.getRackName()); if (rackLocalResourceRequest != null) { if (!rackLocalResourceRequest.getRelaxLocality()) { + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.PRIORITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -377,6 +448,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, application.getResourceRequest(priority, ResourceRequest.ANY); if (offSwitchResourceRequest != null) { if (!offSwitchResourceRequest.getRelaxLocality()) { + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.PRIORITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.PRIORITY_SKIPPED; } if (requestType != NodeType.NODE_LOCAL @@ -398,7 +472,9 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, return allocation; } - + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.PRIORITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.PRIORITY_SKIPPED; } @@ -421,6 +497,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed // We should un-reserve this container. + recordAndUpdateActivity(node, priority.toString(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS, + AllocationActivityType.container, ActivityState.REJECTED); return new ContainerAllocation(rmContainer, null, AllocationState.LOCALITY_SKIPPED); } @@ -435,6 +514,10 @@ private ContainerAllocation assignContainer(Resource clusterResource, + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); // Skip this locality request + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE, + AllocationActivityType.container); return ContainerAllocation.LOCALITY_SKIPPED; } @@ -513,6 +596,10 @@ private ContainerAllocation assignContainer(Resource clusterResource, // continue. if (null == unreservedContainer) { // Skip the locality request + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.LOCALITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -537,6 +624,10 @@ private ContainerAllocation assignContainer(Resource clusterResource, LOG.debug("we needed to unreserve to be able to allocate"); } // Skip the locality request + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, + ActivityDiagnosticConstant.LOCALITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -549,6 +640,9 @@ private ContainerAllocation assignContainer(Resource clusterResource, return result; } // Skip the locality request + recordAndUpdateSkippedActivity(node, priority.toString(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.LOCALITY_SKIPPED, + AllocationActivityType.container); return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -623,6 +717,9 @@ private ContainerAllocation handleNewContainerAllocation( ContainerAllocation ret = new ContainerAllocation(allocationResult.containerToBeUnreserved, null, AllocationState.APP_SKIPPED); + recordAndUpdateActivity(node, priority.toString(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, + AllocationActivityType.container, ActivityState.REJECTED); return ret; } @@ -649,6 +746,9 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, application .updateAppSkipNodeDiagnostics("Scheduling of container failed. "); LOG.warn("Couldn't get container for allocation!"); + recordAndUpdateActivity(node, priority.toString(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER, + AllocationActivityType.container, ActivityState.REJECTED); return ContainerAllocation.APP_SKIPPED; } @@ -721,6 +821,9 @@ public CSAssignment assignContainers(Resource clusterResource, + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-label=" + node.getPartition()); } + recordAndUpdateSkippedActivity(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.DO_NOT_NEED_RESOURCE, + AllocationActivityType.container); return CSAssignment.SKIP_ASSIGNMENT; } @@ -735,18 +838,21 @@ public CSAssignment assignContainers(Resource clusterResource, continue; } return getCSAssignmentFromAllocateResult(clusterResource, result, - null); + null, node); } // We will reach here if we skipped all priorities of the app, so we will // skip the app. + recordAndUpdateSkippedActivity(node, ActivityState.SKIPPED, + ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES, + AllocationActivityType.container); return CSAssignment.SKIP_ASSIGNMENT; } else { ContainerAllocation result = allocate(clusterResource, node, schedulingMode, resourceLimits, reservedContainer.getReservedPriority(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, - reservedContainer); + reservedContainer, node); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 8009580..81e9f1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -48,13 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; @@ -107,8 +101,16 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { + this(applicationAttemptId, user, queue, activeUsersManager, rmContext, + appPriority, isAttemptRecovering, null); + } + + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, Priority appPriority, boolean isAttemptRecovering, + ActivityManager activityManager) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); - + RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); Resource amResource; @@ -138,8 +140,9 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, if (scheduler.getResourceCalculator() != null) { rc = scheduler.getResourceCalculator(); } - - containerAllocator = new ContainerAllocator(this, rc, rmContext); + + containerAllocator = new ContainerAllocator(this, rc, rmContext, + activityManager); if (scheduler instanceof CapacityScheduler) { capacitySchedulerContext = (CapacitySchedulerContext) scheduler; @@ -188,7 +191,7 @@ public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, return null; } - // Required sanity check - AM can call 'allocate' to update resource + // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check if (getTotalRequiredResources(priority) <= 0) { return null; @@ -489,7 +492,7 @@ public RMContainer findNodeToUnreserve(Resource clusterResource, public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 878bf65..0e2351a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -129,9 +129,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; @@ -175,6 +181,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webapp.WebServices; @@ -570,6 +577,85 @@ public AppsInfo getApps(@Context HttpServletRequest hsr, } @GET + @Path("/scheduler/activities") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, + @QueryParam("nodeId") String nodeId) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + + if (scheduler instanceof AbstractYarnScheduler) { + AbstractYarnScheduler abstractYarnScheduler = + (AbstractYarnScheduler) scheduler; + + ActivityManager activityManager = + abstractYarnScheduler.getActivityManager(); + if (null == activityManager) { + return null; + } + + List allocations; + ActivitiesInfo info; + + if (nodeId != null && !nodeId.contains(":")) { + List nodeList = + abstractYarnScheduler.getNodeTracker().getAllNodes(); + for (FiCaSchedulerNode node : nodeList) { + if (node.getRMNode().getHostName().equals(nodeId)) { + nodeId = node.getNodeID().toString(); + break; + } + } + } + + activityManager.startRecordNodeUpdateActivities(nodeId); + allocations = activityManager.getAllocations(nodeId); + info = new ActivitiesInfo(allocations, nodeId); + return info; + } + + return null; + } + + @GET + @Path("/scheduler/app-activities") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, + @QueryParam("appId") String appId, @QueryParam("maxTime") String time) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + + if (scheduler instanceof AbstractYarnScheduler) { + AbstractYarnScheduler abstractYarnScheduler = + (AbstractYarnScheduler) scheduler; + + ActivityManager activityManager = + abstractYarnScheduler.getActivityManager(); + if (null == activityManager) { + return null; + } + + List allocations; + AppActivitiesInfo info; + double maxTime = 3.0; + + if (time != null) { + if (time.contains(".")) + maxTime = Double.parseDouble(time); + else + maxTime = Double.parseDouble(time + ".0"); + } + + ApplicationId applicationId = ApplicationId.fromString(appId); + activityManager.turnOnAppUpdate(applicationId, + maxTime); + allocations = activityManager.getAppAllocations(applicationId); + info = new AppActivitiesInfo(allocations, applicationId); + + return info; + } + return null; + } + + @GET @Path("/appstatistics") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) public ApplicationStatisticsInfo getAppStatistics( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java new file mode 100644 index 0000000..67bb960 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.List; +import java.util.ArrayList; + +/* + * DAO object to display node allocation activity. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class ActivitiesInfo { + protected String nodeId; + protected String timeStamp; + protected String diagnostic = null; + protected List allocations; + + private static final Log LOG = LogFactory.getLog(ActivitiesInfo.class); + + public ActivitiesInfo() { + } + + public ActivitiesInfo(List nodeAllocations, String nodeId) { + this.nodeId = nodeId; + this.allocations = new ArrayList<>(); + + if (nodeAllocations == null) { + diagnostic = (nodeId != null ? + "waiting for display" : + "waiting for next allocation"); + } else { + if (nodeAllocations.size() == 0) { + diagnostic = "do not have available resources"; + } else { + this.nodeId = nodeAllocations.get(0).getNodeId(); + this.timeStamp = String.valueOf(nodeAllocations.get(0).getTimeStamp()); + for (int i = 0; i < nodeAllocations.size(); i++) { + NodeAllocation nodeAllocation = nodeAllocations.get(i); + NodeAllocationInfo allocationInfo = new NodeAllocationInfo( + nodeAllocation); + this.allocations.add(allocationInfo); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java new file mode 100644 index 0000000..ac1fcf8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivityNodeInfo.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityNode; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/* + * DAO object to display node information in allocation tree. + * It corresponds to "ActivityNode" class. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class ActivityNodeInfo { + protected String name; // The name for activity node + protected String priority; + protected String allocationState; + protected String diagnostic; + + protected List children; + + ActivityNodeInfo() { + } + + ActivityNodeInfo(ActivityNode node) { + this.name = node.getName(); + this.priority = node.getPriority(); + this.allocationState = node.getState().name(); + this.diagnostic = node.getDiagnostic(); + this.children = new ArrayList<>(); + + for (ActivityNode child : node.getChildren()) { + ActivityNodeInfo containerInfo = new ActivityNodeInfo(child); + this.children.add(containerInfo); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java new file mode 100644 index 0000000..8e644f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/* + * DAO object to display application activity. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class AppActivitiesInfo { + protected String applicationId; + protected String diagnostic; + protected String timeStamp; + protected List allocations; + + private static final Log LOG = LogFactory.getLog(AppActivitiesInfo.class); + + public AppActivitiesInfo() { + } + + public AppActivitiesInfo(List appAllocations, + ApplicationId applicationId) { + this.applicationId = applicationId.toString(); + this.allocations = new ArrayList<>(); + + if (appAllocations == null) { + diagnostic = "waiting for display"; + Date date = new Date(); + timeStamp = String.valueOf(date.getTime()); + } else { + for (int i = 0; i < appAllocations.size(); i++) { + AppAllocation appAllocation = appAllocations.get(i); + AppAllocationInfo appAllocationInfo = new AppAllocationInfo( + appAllocation); + this.allocations.add(appAllocationInfo); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java new file mode 100644 index 0000000..5e3b9d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActivityNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/* + * DAO object to display application allocation detailed information. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class AppAllocationInfo { + protected String nodeId; + protected String queueName; + protected String priority; + protected String allocatedContainerId; + protected String allocationState; + protected String timeStamp; + protected List allocationAttempt; + + private static final Log LOG = LogFactory.getLog(AppAllocationInfo.class); + + AppAllocationInfo() { + } + + AppAllocationInfo(AppAllocation allocation) { + this.allocationAttempt = new ArrayList<>(); + + this.nodeId = allocation.getNodeId(); + this.queueName = allocation.getQueueName(); + this.priority = allocation.getPriority(); + this.allocatedContainerId = allocation.getContainerId(); + this.allocationState = allocation.getAppState().name(); + this.timeStamp = String.valueOf(allocation.getTime()); + + for (ActivityNode attempt : allocation.getAllocationAttempts()) { + ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt); + this.allocationAttempt.add(containerInfo); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java new file mode 100644 index 0000000..7ca1618 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeAllocationInfo.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeAllocation; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/* + * DAO object to display each node allocation in node heartbeat. + */ +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class NodeAllocationInfo { + protected String allocatedContainerId; + protected String finalAllocationState; + protected ActivityNodeInfo root = null; + + private static final Log LOG = LogFactory.getLog(NodeAllocationInfo.class); + + NodeAllocationInfo() { + } + + NodeAllocationInfo(NodeAllocation allocation) { + this.allocatedContainerId = allocation.getContainerId(); + this.finalAllocationState = allocation.getFinalAllocationState().name(); + + root = new ActivityNodeInfo(allocation.getRoot()); + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index 649d719..6177bdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -24,15 +24,24 @@ import static org.junit.Assert.fail; import java.io.StringReader; +import java.util.ArrayList; import javax.ws.rs.core.MediaType; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -66,6 +75,9 @@ private static CapacitySchedulerConfiguration csConf; private static YarnConfiguration conf; + private static final Log LOG = LogFactory.getLog( + TestRMWebServicesCapacitySched.class); + private class QueueInfo { float capacity; float usedCapacity; @@ -550,6 +562,176 @@ private JSONObject getSubQueue(JSONObject queue, String subQueue) return null; } + @Test public void testActivityJSON() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("nodeId", "127.0.0.1"); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + LOG.info("====First: " + json.toString()); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + LOG.info("====second: " + json.toString()); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "finalAllocationState", "ALLOCATED"); + + verifyNumberOfNodes(allocations, 6); + + nm.nodeHeartbeat(true); + Thread.sleep(1000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + LOG.info("====third: " + json.toString()); + } + finally { + rm.stop(); + } + } + + private void verifyNumberOfNodes(JSONObject allocation, int realValue) + throws Exception { + if (allocation.isNull("root")) { + assertEquals("State of allocation is wrong", 0, realValue); + } else { + assertEquals("State of allocation is wrong", + 1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue); + } + } + + private int getNumberOfNodes(JSONObject allocation) throws Exception { + if (!allocation.isNull("children")) { + Object object = allocation.get("children"); + if (object.getClass() == JSONObject.class) { + return 1 + getNumberOfNodes((JSONObject) object); + } else { + int count = 0; + for (int i = 0; i < ((JSONArray) object).length(); i++) { + count += (1 + getNumberOfNodes( + ((JSONArray) object).getJSONObject(i))); + } + return count; + } + } else { + return 0; + } + } + + private void verifyStateOfAllocations(JSONObject allocation, + String nameToCheck, String realState) throws Exception { + assertEquals("State of allocation is wrong", allocation.get(nameToCheck), + realState); + } + + private void verifyNumberOfAllocations(JSONObject json, int realValue) + throws Exception { + Object object = json.get("allocations"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations is wrong", + ((JSONArray) object).length(), realValue); + } + } + + private void verifyNumberOfAllocationAttempts(JSONObject allocation, + int realValue) throws Exception { + if (allocation.isNull("allocationAttempt")) { + assertEquals("Number of allocation attempts is wrong", 0, realValue); + } else { + Object object = allocation.get("allocationAttempt"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations attempts is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations attempts is wrong", + ((JSONArray) object).length(), realValue); + } + } + } + + @Test public void testAppActivityJSON() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); + + //Get JSON + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("appId", app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + JSONObject json = response.getEntity(JSONObject.class); + LOG.info("====First: " + json.toString()); + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + //Get JSON + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + LOG.info("====second: " + json.toString()); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED"); + + verifyNumberOfAllocationAttempts(allocations, 1); + + nm.nodeHeartbeat(true); + Thread.sleep(5000); + + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/app-activities").queryParams(params).accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); + json = response.getEntity(JSONObject.class); + LOG.info("====third: " + json.toString()); + } + finally { + rm.stop(); + } + } + @Test public void testPerUserResourcesJSON() throws Exception { //Start RM so that it accepts app submissions