diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 9d3080e1adf..46ca4bd090c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -63,14 +63,14 @@ public static void recordRejectedAppActivityFromLeafQueue( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, Priority priority, String diagnostic) { - String type = "app"; - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - recordActivity(activitiesManager, node, application.getQueueName(), + if (activitiesManager.shouldRecordThisNode(nodeId)) { + recordActivity(activitiesManager, nodeId, application.getQueueName(), application.getApplicationId().toString(), priority, - ActivityState.REJECTED, diagnostic, type); + ActivityState.REJECTED, diagnostic, "app"); } finishSkippedAppAllocationRecording(activitiesManager, application.getApplicationId(), ActivityState.REJECTED, diagnostic); @@ -85,18 +85,19 @@ public static void recordAppActivityWithoutAllocation( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, Priority priority, String diagnostic, ActivityState appState) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + if (activitiesManager.shouldRecordThisNode(nodeId)) { String type = "container"; // Add application-container activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node, + activitiesManager.addSchedulingActivityForNode(nodeId, application.getApplicationId().toString(), null, priority.toString(), ActivityState.SKIPPED, diagnostic, type); type = "app"; // Add queue-application activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node, + activitiesManager.addSchedulingActivityForNode(nodeId, application.getQueueName(), application.getApplicationId().toString(), application.getPriority().toString(), ActivityState.SKIPPED, @@ -122,20 +123,21 @@ public static void recordAppActivityWithAllocation( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, RMContainer updatedContainer, ActivityState activityState) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + if (activitiesManager.shouldRecordThisNode(nodeId)) { String type = "container"; // Add application-container activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node, + activitiesManager.addSchedulingActivityForNode(nodeId, application.getApplicationId().toString(), updatedContainer.getContainer().toString(), updatedContainer.getContainer().getPriority().toString(), activityState, ActivityDiagnosticConstant.EMPTY, type); type = "app"; // Add queue-application activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node, + activitiesManager.addSchedulingActivityForNode(nodeId, application.getQueueName(), application.getApplicationId().toString(), application.getPriority().toString(), ActivityState.ACCEPTED, @@ -161,11 +163,12 @@ public static void startAppAllocationRecording( ActivitiesManager activitiesManager, FiCaSchedulerNode node, long currentTime, SchedulerApplicationAttempt application) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } activitiesManager - .startAppAllocationRecording(node.getNodeID(), currentTime, + .startAppAllocationRecording(nodeId, currentTime, application); } @@ -211,11 +214,12 @@ public static void finishSkippedAppAllocationRecording( public static void recordQueueActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentQueueName, String queueName, ActivityState state, String diagnostic) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - recordActivity(activitiesManager, node, parentQueueName, queueName, + if (activitiesManager.shouldRecordThisNode(nodeId)) { + recordActivity(activitiesManager, nodeId, parentQueueName, queueName, null, state, diagnostic, null); } } @@ -243,11 +247,12 @@ public static void finishSkippedNodeAllocation( public static void finishAllocatedNodeAllocation( ActivitiesManager activitiesManager, SchedulerNode node, ContainerId containerId, AllocationState containerState) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - activitiesManager.updateAllocationFinalState(node.getNodeID(), + if (activitiesManager.shouldRecordThisNode(nodeId)) { + activitiesManager.updateAllocationFinalState(nodeId, containerId, containerState); } } @@ -277,12 +282,16 @@ public static void startNodeUpdateRecording( // Add queue, application or container activity into specific node allocation. private static void recordActivity(ActivitiesManager activitiesManager, - SchedulerNode node, String parentName, String childName, + NodeId nodeId, String parentName, String childName, Priority priority, ActivityState state, String diagnostic, String type) { - - activitiesManager.addSchedulingActivityForNode(node, parentName, + activitiesManager.addSchedulingActivityForNode(nodeId, parentName, childName, priority != null ? priority.toString() : null, state, diagnostic, type); + } + private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager, + SchedulerNode node) { + return activitiesManager == null ? null : + activitiesManager.getRecordingNodeId(node); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java index c710ffc3991..740e9743406 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.service.AbstractService; @@ -46,8 +47,13 @@ public class ActivitiesManager extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(ActivitiesManager.class); - private ConcurrentMap> recordingNodesAllocation; - private ConcurrentMap> completedNodeAllocations; + // An empty node ID, we use this variable as a placeholder + // in the activity records when recording multiple nodes assignments. + public static final NodeId EMPTY_NODE_ID = NodeId.newInstance("", 0); + private ThreadLocal>> + recordingNodesAllocation; + @VisibleForTesting + ConcurrentMap> completedNodeAllocations; private Set activeRecordedNodes; private ConcurrentMap recordingAppActivitiesUntilSpecifiedTime; @@ -63,7 +69,7 @@ public ActivitiesManager(RMContext rmContext) { super(ActivitiesManager.class.getName()); - recordingNodesAllocation = new ConcurrentHashMap<>(); + recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap()); completedNodeAllocations = new ConcurrentHashMap<>(); appsAllocation = new ConcurrentHashMap<>(); completedAppAllocations = new ConcurrentHashMap<>(); @@ -173,9 +179,11 @@ void startNodeUpdateRecording(NodeId nodeID) { if (recordNextAvailableNode) { recordNextNodeUpdateActivities(nodeID.toString()); } - if (activeRecordedNodes.contains(nodeID)) { + // Removing from activeRecordedNodes immediately is to ensure that + // activities will be recorded just once in multiple threads. + if (activeRecordedNodes.remove(nodeID)) { List nodeAllocation = new ArrayList<>(); - recordingNodesAllocation.put(nodeID, nodeAllocation); + recordingNodesAllocation.get().put(nodeID, nodeAllocation); } } @@ -199,12 +207,11 @@ void startAppAllocationRecording(NodeId nodeID, long currTS, } // Add queue, application or container activity into specific node allocation. - void addSchedulingActivityForNode(SchedulerNode node, String parentName, + void addSchedulingActivityForNode(NodeId nodeId, String parentName, String childName, String priority, ActivityState state, String diagnostic, String type) { - if (shouldRecordThisNode(node.getNodeID())) { - NodeAllocation nodeAllocation = getCurrentNodeAllocation( - node.getNodeID()); + if (shouldRecordThisNode(nodeId)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeId); nodeAllocation.addAllocationActivity(parentName, childName, priority, state, diagnostic, type); } @@ -262,7 +269,7 @@ void finishAppAllocationRecording(ApplicationId applicationId, } void finishNodeUpdateRecording(NodeId nodeID) { - List value = recordingNodesAllocation.get(nodeID); + List value = recordingNodesAllocation.get().get(nodeID); long timeStamp = SystemClock.getInstance().getTime(); if (value != null) { @@ -278,9 +285,8 @@ void finishNodeUpdateRecording(NodeId nodeID) { } if (shouldRecordThisNode(nodeID)) { - recordingNodesAllocation.remove(nodeID); + recordingNodesAllocation.get().remove(nodeID); completedNodeAllocations.put(nodeID, value); - stopRecordNodeUpdateActivities(nodeID); } } } @@ -291,12 +297,15 @@ boolean shouldRecordThisApp(ApplicationId applicationId) { } boolean shouldRecordThisNode(NodeId nodeID) { - return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation + return isRecordingMultiNodes() || recordingNodesAllocation.get() .containsKey(nodeID); } private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) { - List nodeAllocations = recordingNodesAllocation.get(nodeID); + NodeId recordingKey = + isRecordingMultiNodes() ? EMPTY_NODE_ID : nodeID; + List nodeAllocations = + recordingNodesAllocation.get().get(recordingKey); NodeAllocation nodeAllocation; // When this node has already stored allocation activities, get the // last allocation for this node. @@ -323,11 +332,29 @@ private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) { return nodeAllocation; } - private void stopRecordNodeUpdateActivities(NodeId nodeId) { - activeRecordedNodes.remove(nodeId); - } - private void turnOffActivityMonitoringForApp(ApplicationId applicationId) { recordingAppActivitiesUntilSpecifiedTime.remove(applicationId); } + + public boolean isRecordingMultiNodes() { + return recordingNodesAllocation.get().containsKey(EMPTY_NODE_ID); + } + + /** + * Get recording node id: + * 1. node id of the input node if it is not null. + * 2. EMPTY_NODE_ID if input node is null and activities manager is + * recording multi-nodes. + * 3. null otherwise. + * @param node - input node + * @return recording nodeId + */ + public NodeId getRecordingNodeId(SchedulerNode node) { + if (node != null) { + return node.getNodeID(); + } else if (isRecordingMultiNodes()) { + return ActivitiesManager.EMPTY_NODE_ID; + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4baf4053854..b8fdd423b9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1297,17 +1297,12 @@ protected void nodeUpdate(RMNode rmNode) { if (!scheduleAsynchronously) { writeLock.lock(); try { - ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, - rmNode.getNodeID()); - // reset allocation and reservation stats before we start doing any // work updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(), CSAssignment.NULL_ASSIGNMENT); allocateContainersToNode(rmNode.getNodeID(), true); - ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, - rmNode.getNodeID()); } finally { writeLock.unlock(); } @@ -1706,10 +1701,18 @@ CSAssignment allocateContainersToNode( // nodes. CSAssignment assignment; if (!multiNodePlacementEnabled) { + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + node.getNodeID()); assignment = allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + node.getNodeID()); } else{ + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + ActivitiesManager.EMPTY_NODE_ID); assignment = allocateContainersOnMultiNodes(candidates); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + ActivitiesManager.EMPTY_NODE_ID); } if (assignment != null && assignment.getAssignmentInformation() != null diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java new file mode 100644 index 00000000000..5216a216848 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.yarn.util.resource.Resources; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; + +/** + * Test class for {@link ActivitiesManager}. + */ +public class TestActivitiesManager { + + private final static int NUM_NODES = 5; + + private final static int NUM_APPS = 5; + + private final static int NUM_THREADS = 5; + + private RMContext rmContext; + + private TestingActivitiesManager activitiesManager; + + private List apps; + + private List nodes; + + private ThreadPoolExecutor threadPoolExecutor; + + @Before + public void setup() { + rmContext = Mockito.mock(RMContext.class); + ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class); + Mockito.when(scheduler.getMinimumResourceCapability()) + .thenReturn(Resources.none()); + Mockito.when(rmContext.getScheduler()).thenReturn(scheduler); + LeafQueue mockQueue = Mockito.mock(LeafQueue.class); + Map rmApps = new ConcurrentHashMap<>(); + Mockito.doReturn(rmApps).when(rmContext).getRMApps(); + apps = new ArrayList<>(); + for (int i = 0; i < NUM_APPS; i++) { + ApplicationAttemptId appAttemptId = + TestUtils.getMockApplicationAttemptId(i, 0); + RMApp mockApp = Mockito.mock(RMApp.class); + Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp) + .getApplicationId(); + rmApps.put(appAttemptId.getApplicationId(), mockApp); + FiCaSchedulerApp app = + new FiCaSchedulerApp(appAttemptId, "user", mockQueue, + mock(ActiveUsersManager.class), rmContext); + apps.add(app); + } + nodes = new ArrayList<>(); + for (int i = 0; i < NUM_NODES; i++) { + nodes.add(TestUtils.getMockNode("host" + i, "rack", 1, 10240)); + } + activitiesManager = new TestingActivitiesManager(rmContext); + threadPoolExecutor = + new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 3L, + TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + } + + /** + * Test recording activities belong to different nodes in multiple threads, + * these threads can run without interference and one activity + * should be recorded by every thread. + */ + @Test + public void testRecordingDifferentNodeActivitiesInMultiThreads() + throws Exception { + Random rand = new Random(); + List> futures = new ArrayList<>(); + for (SchedulerNode node : nodes) { + Callable task = () -> { + SchedulerApplicationAttempt randomApp = + apps.get(rand.nextInt(NUM_APPS)); + // start recording activities for random node + activitiesManager.recordNextNodeUpdateActivities( + node.getNodeID().toString()); + // generate node/app activities + ActivitiesLogger.NODE + .startNodeUpdateRecording(activitiesManager, node.getNodeID()); + ActivitiesLogger.APP + .recordAppActivityWithoutAllocation(activitiesManager, node, + randomApp, Priority.newInstance(0), + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, + ActivityState.REJECTED); + ActivitiesLogger.NODE + .finishNodeUpdateRecording(activitiesManager, node.getNodeID()); + return null; + }; + futures.add(threadPoolExecutor.submit(task)); + } + for (Future future : futures) { + future.get(); + } + // Check activities for all nodes should be recorded and every node should + // have only one allocation information. + Assert.assertEquals(NUM_NODES, + activitiesManager.historyNodeAllocations.size()); + for (List> nodeAllocationsForThisNode : + activitiesManager.historyNodeAllocations.values()) { + Assert.assertEquals(1, nodeAllocationsForThisNode.size()); + Assert.assertEquals(1, nodeAllocationsForThisNode.get(0).size()); + } + } + + /** + * Test recording activities for multi-nodes assignment in multiple threads, + * only one activity info should be recorded by one of these threads. + */ + @Test + public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads() + throws Exception { + Random rand = new Random(); + // start recording activities for multi-nodes + activitiesManager.recordNextNodeUpdateActivities( + ActivitiesManager.EMPTY_NODE_ID.toString()); + List> futures = new ArrayList<>(); + // generate node/app activities + for (SchedulerNode node : nodes) { + Callable task = () -> { + SchedulerApplicationAttempt randomApp = + apps.get(rand.nextInt(NUM_APPS)); + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + ActivitiesManager.EMPTY_NODE_ID); + ActivitiesLogger.APP + .recordAppActivityWithoutAllocation(activitiesManager, node, + randomApp, Priority.newInstance(0), + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, + ActivityState.REJECTED); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + ActivitiesManager.EMPTY_NODE_ID); + return null; + }; + futures.add(threadPoolExecutor.submit(task)); + } + for (Future future : futures) { + future.get(); + } + // Check activities for multi-nodes should be recorded only once + Assert.assertEquals(1, activitiesManager.historyNodeAllocations.size()); + } + + /** + * Testing activities manager which can record all history information about + * node allocations. + */ + public class TestingActivitiesManager extends ActivitiesManager { + + private Map>> historyNodeAllocations = + new ConcurrentHashMap<>(); + + public TestingActivitiesManager(RMContext rmContext) { + super(rmContext); + super.completedNodeAllocations = Mockito.spy(new ConcurrentHashMap<>()); + Mockito.doAnswer((invocationOnMock) -> { + NodeId nodeId = (NodeId) invocationOnMock.getArguments()[0]; + List nodeAllocations = + (List) invocationOnMock.getArguments()[1]; + List> historyAllocationsForThisNode = + historyNodeAllocations.get(nodeId); + if (historyAllocationsForThisNode == null) { + historyAllocationsForThisNode = new ArrayList<>(); + historyNodeAllocations.put(nodeId, historyAllocationsForThisNode); + } + historyAllocationsForThisNode.add(nodeAllocations); + return null; + }).when(completedNodeAllocations).put(any(NodeId.class), + any(List.class)); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java index 91b92de9f43..932f58d3093 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -95,16 +95,12 @@ public void testAssignMultipleContainersPerNodeHeartbeat() response.getType().toString()); json = response.getEntity(JSONObject.class); - verifyNumberOfAllocations(json, 11); - - JSONArray allocations = json.getJSONArray("allocations"); - for (int i = 0; i < allocations.length(); i++) { - if (i != allocations.length() - 1) { - verifyStateOfAllocations(allocations.getJSONObject(i), - "finalAllocationState", "ALLOCATED"); - verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1"); - } - } + // Collection logic of scheduler activities changed after YARN-9313, + // only one allocation should be recorded for all scenarios. + verifyNumberOfAllocations(json, 1); + verifyStateOfAllocations(json.getJSONObject("allocations"), + "finalAllocationState", "ALLOCATED"); + verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b2-b3-b1"); } finally { rm.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java new file mode 100644 index 00000000000..724d592d6fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import com.google.inject.Guice; +import com.google.inject.servlet.ServletModule; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; +import com.sun.jersey.test.framework.WebAppDescriptor; +import org.apache.hadoop.http.JettyUtils; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.GuiceServletConfig; +import org.apache.hadoop.yarn.webapp.JerseyTestBase; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.MediaType; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for scheduler/app activities when multi-nodes enabled. + */ +public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled + extends JerseyTestBase { + + private static MockRM rm; + private static CapacitySchedulerConfiguration csConf; + private static YarnConfiguration conf; + + public TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled() { + super(new WebAppDescriptor.Builder( + "org.apache.hadoop.yarn.server.resourcemanager.webapp") + .contextListenerClass(GuiceServletConfig.class) + .filterClass(com.google.inject.servlet.GuiceFilter.class) + .contextPath("jersey-guice-filter").servletPath("/").build()); + } + + private static class WebServletModule extends ServletModule { + @Override + protected void configureServlets() { + bind(JAXBContextResolver.class); + bind(RMWebServices.class); + bind(GenericExceptionHandler.class); + csConf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + + conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // enable multi-nodes placement + conf.setBoolean( + CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true); + rm = new MockRM(conf); + bind(ResourceManager.class).toInstance(rm); + serve("/*").with(GuiceContainer.class); + } + } + + private static void setupQueueConfiguration( + CapacitySchedulerConfiguration config) { + // Define top-level queues + config.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + config.setCapacity(queueA, 10.5f); + config.setMaximumCapacity(queueA, 50); + + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + config.setCapacity(queueB, 89.5f); + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule())); + } + + @Test (timeout=30000) + public void testAssignContainer() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 2 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 1), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 1), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 1)), null); + + //Trigger recording for multi-nodes without params + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + //Trigger scheduling for this app + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + + //Check scheduler activities, it should contain one allocation and + // final allocation state is ALLOCATED + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + JSONObject json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, + "finalAllocationState", "ALLOCATED"); + } finally { + rm.stop(); + } + } + + @Test (timeout=30000) + public void testSchedulingWithoutPendingRequests() + throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 8 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + //Trigger recording for multi-nodes without params + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + //Trigger scheduling for this app + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + + //Check scheduler activities, it should contain one allocation and + // final allocation state is SKIPPED + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + JSONObject json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, + "finalAllocationState", "SKIPPED"); + } finally { + rm.stop(); + } + } + + private void verifyNumberOfAllocations(JSONObject json, int realValue) + throws Exception { + if (json.isNull("allocations")) { + assertEquals("Number of allocations is wrong", 0, realValue); + } else { + Object object = json.get("allocations"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations is wrong in: " + object, + ((JSONArray) object).length(), realValue); + } + } + } + + private void verifyStateOfAllocations(JSONObject allocation, + String nameToCheck, String realState) throws Exception { + assertEquals("State of allocation is wrong", allocation.get(nameToCheck), + realState); + } +}