diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 8a3ffce1d8a..598f7e117c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -62,14 +62,14 @@ public static void recordRejectedAppActivityFromLeafQueue( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, Priority priority, String diagnostic) { - String type = "app"; - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - recordActivity(activitiesManager, node, application.getQueueName(), + if (activitiesManager.shouldRecordThisNode(nodeId)) { + recordActivity(activitiesManager, nodeId, application.getQueueName(), application.getApplicationId().toString(), priority, - ActivityState.REJECTED, diagnostic, type); + ActivityState.REJECTED, diagnostic, "app"); } finishSkippedAppAllocationRecording(activitiesManager, application.getApplicationId(), ActivityState.REJECTED, diagnostic); @@ -84,18 +84,19 @@ public static void recordAppActivityWithoutAllocation( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, Priority priority, String diagnostic, ActivityState appState) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + if (activitiesManager.shouldRecordThisNode(nodeId)) { String type = "container"; // Add application-container activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node, + activitiesManager.addSchedulingActivityForNode(nodeId, application.getApplicationId().toString(), null, priority.toString(), ActivityState.SKIPPED, diagnostic, type); type = "app"; // Add queue-application activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node, + activitiesManager.addSchedulingActivityForNode(nodeId, application.getQueueName(), application.getApplicationId().toString(), application.getPriority().toString(), ActivityState.SKIPPED, @@ -121,20 +122,21 @@ public static void recordAppActivityWithAllocation( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, RMContainer updatedContainer, ActivityState activityState) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + if (activitiesManager.shouldRecordThisNode(nodeId)) { String type = "container"; // Add application-container activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node, + activitiesManager.addSchedulingActivityForNode(nodeId, application.getApplicationId().toString(), updatedContainer.getContainer().toString(), updatedContainer.getContainer().getPriority().toString(), activityState, ActivityDiagnosticConstant.EMPTY, type); type = "app"; // Add queue-application activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node, + activitiesManager.addSchedulingActivityForNode(nodeId, application.getQueueName(), application.getApplicationId().toString(), application.getPriority().toString(), ActivityState.ACCEPTED, @@ -160,11 +162,12 @@ public static void startAppAllocationRecording( ActivitiesManager activitiesManager, FiCaSchedulerNode node, long currentTime, SchedulerApplicationAttempt application) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } activitiesManager - .startAppAllocationRecording(node.getNodeID(), currentTime, + .startAppAllocationRecording(nodeId, currentTime, application); } @@ -210,11 +213,12 @@ public static void finishSkippedAppAllocationRecording( public static void recordQueueActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentQueueName, String queueName, ActivityState state, String diagnostic) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - recordActivity(activitiesManager, node, parentQueueName, queueName, + if (activitiesManager.shouldRecordThisNode(nodeId)) { + recordActivity(activitiesManager, nodeId, parentQueueName, queueName, null, state, diagnostic, null); } } @@ -242,11 +246,12 @@ public static void finishSkippedNodeAllocation( public static void finishAllocatedNodeAllocation( ActivitiesManager activitiesManager, SchedulerNode node, ContainerId containerId, AllocationState containerState) { - if (node == null || activitiesManager == null) { + NodeId nodeId = getRecordingNodeId(activitiesManager, node); + if (nodeId == null) { return; } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - activitiesManager.updateAllocationFinalState(node.getNodeID(), + if (activitiesManager.shouldRecordThisNode(nodeId)) { + activitiesManager.updateAllocationFinalState(nodeId, containerId, containerState); } } @@ -276,12 +281,19 @@ public static void startNodeUpdateRecording( // Add queue, application or container activity into specific node allocation. private static void recordActivity(ActivitiesManager activitiesManager, - SchedulerNode node, String parentName, String childName, + NodeId nodeId, String parentName, String childName, Priority priority, ActivityState state, String diagnostic, String type) { - - activitiesManager.addSchedulingActivityForNode(node, parentName, + activitiesManager.addSchedulingActivityForNode(nodeId, parentName, childName, priority != null ? priority.toString() : null, state, diagnostic, type); + } + private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager, + SchedulerNode node) { + if (activitiesManager == null) { + return null; + } else { + return activitiesManager.getRecordingNodeId(node); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java index 5d96b17371e..cb9967350ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.service.AbstractService; @@ -45,7 +46,10 @@ */ public class ActivitiesManager extends AbstractService { private static final Log LOG = LogFactory.getLog(ActivitiesManager.class); - private ConcurrentMap> recordingNodesAllocation; + public static final NodeId MULTI_NODES_AGENT_ID = + NodeId.newInstance("MULTI_NODES_AGENT", 0); + private ThreadLocal>> + recordingNodesAllocation; private ConcurrentMap> completedNodeAllocations; private Set activeRecordedNodes; private ConcurrentMap @@ -62,7 +66,7 @@ public ActivitiesManager(RMContext rmContext) { super(ActivitiesManager.class.getName()); - recordingNodesAllocation = new ConcurrentHashMap<>(); + recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap()); completedNodeAllocations = new ConcurrentHashMap<>(); appsAllocation = new ConcurrentHashMap<>(); completedAppAllocations = new ConcurrentHashMap<>(); @@ -172,9 +176,11 @@ void startNodeUpdateRecording(NodeId nodeID) { if (recordNextAvailableNode) { recordNextNodeUpdateActivities(nodeID.toString()); } - if (activeRecordedNodes.contains(nodeID)) { + // Removing from activeRecordedNodes immediately is to ensure that + // activities will be recorded just once in multiple threads. + if (activeRecordedNodes.remove(nodeID)) { List nodeAllocation = new ArrayList<>(); - recordingNodesAllocation.put(nodeID, nodeAllocation); + recordingNodesAllocation.get().put(nodeID, nodeAllocation); } } @@ -198,12 +204,11 @@ void startAppAllocationRecording(NodeId nodeID, long currTS, } // Add queue, application or container activity into specific node allocation. - void addSchedulingActivityForNode(SchedulerNode node, String parentName, + void addSchedulingActivityForNode(NodeId nodeId, String parentName, String childName, String priority, ActivityState state, String diagnostic, String type) { - if (shouldRecordThisNode(node.getNodeID())) { - NodeAllocation nodeAllocation = getCurrentNodeAllocation( - node.getNodeID()); + if (shouldRecordThisNode(nodeId)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeId); nodeAllocation.addAllocationActivity(parentName, childName, priority, state, diagnostic, type); } @@ -261,7 +266,7 @@ void finishAppAllocationRecording(ApplicationId applicationId, } void finishNodeUpdateRecording(NodeId nodeID) { - List value = recordingNodesAllocation.get(nodeID); + List value = recordingNodesAllocation.get().get(nodeID); long timeStamp = SystemClock.getInstance().getTime(); if (value != null) { @@ -277,9 +282,8 @@ void finishNodeUpdateRecording(NodeId nodeID) { } if (shouldRecordThisNode(nodeID)) { - recordingNodesAllocation.remove(nodeID); + recordingNodesAllocation.get().remove(nodeID); completedNodeAllocations.put(nodeID, value); - stopRecordNodeUpdateActivities(nodeID); } } } @@ -290,12 +294,15 @@ boolean shouldRecordThisApp(ApplicationId applicationId) { } boolean shouldRecordThisNode(NodeId nodeID) { - return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation + return isRecordingMultiNodes() || recordingNodesAllocation.get() .containsKey(nodeID); } private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) { - List nodeAllocations = recordingNodesAllocation.get(nodeID); + NodeId recordingKey = + isRecordingMultiNodes() ? MULTI_NODES_AGENT_ID : nodeID; + List nodeAllocations = + recordingNodesAllocation.get().get(recordingKey); NodeAllocation nodeAllocation; // When this node has already stored allocation activities, get the // last allocation for this node. @@ -322,11 +329,32 @@ private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) { return nodeAllocation; } - private void stopRecordNodeUpdateActivities(NodeId nodeId) { - activeRecordedNodes.remove(nodeId); - } - private void turnOffActivityMonitoringForApp(ApplicationId applicationId) { recordingAppActivitiesUntilSpecifiedTime.remove(applicationId); } + + public boolean isRecordingMultiNodes() { + return recordingNodesAllocation.get().containsKey(MULTI_NODES_AGENT_ID); + } + + /** + * Get recording node id: + * 1. node id of the input node if it is not null. + * 2. MULTI_NODES_AGENT_ID if input node is null and activities manager is + * recording multi-nodes. + * 3. null otherwise. + */ + public NodeId getRecordingNodeId(SchedulerNode node) { + if (node != null) { + return node.getNodeID(); + } else if (isRecordingMultiNodes()) { + return ActivitiesManager.MULTI_NODES_AGENT_ID; + } + return null; + } + + @VisibleForTesting + public List getLastAvailableNodeActivities() { + return lastAvailableNodeActivities; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 618ee20cfe0..5ca0bc64bcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1301,8 +1301,6 @@ protected void nodeUpdate(RMNode rmNode) { if (!scheduleAsynchronously) { try { writeLock.lock(); - ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, - rmNode.getNodeID()); // reset allocation and reservation stats before we start doing any // work @@ -1310,8 +1308,6 @@ protected void nodeUpdate(RMNode rmNode) { CSAssignment.NULL_ASSIGNMENT); allocateContainersToNode(rmNode.getNodeID(), true); - ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, - rmNode.getNodeID()); } finally { writeLock.unlock(); } @@ -1724,10 +1720,18 @@ CSAssignment allocateContainersToNode( // nodes. CSAssignment assignment; if (!multiNodePlacementEnabled) { + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + node.getNodeID()); assignment = allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + node.getNodeID()); } else{ + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + ActivitiesManager.MULTI_NODES_AGENT_ID); assignment = allocateContainersOnMultiNodes(candidates); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + ActivitiesManager.MULTI_NODES_AGENT_ID); } if (assignment != null && assignment.getAssignmentInformation() != null diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java index 0de340a1ec6..d6295fbbb99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ActivitiesInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation; @@ -77,4 +78,9 @@ public ActivitiesInfo(List nodeAllocations, String nodeId) { } } } + + @VisibleForTesting + public List getAllocations() { + return allocations; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java new file mode 100644 index 00000000000..1c0a9843c87 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeAllocationInfo; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.yarn.util.resource.Resources; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; + +public class TestActivitiesManager { + + private final static int NUM_NODES = 5; + + private final static int NUM_APPS = 5; + + private final static int NUM_THREADS = 5; + + private RMContext rmContext; + + private ActivitiesManager activitiesManager; + + private List apps; + + private List nodes; + + private ThreadPoolExecutor threadPoolExecutor; + + @Before + public void setup() { + rmContext = Mockito.mock(RMContext.class); + ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class); + Mockito.when(scheduler.getMinimumResourceCapability()) + .thenReturn(Resources.none()); + Mockito.when(rmContext.getScheduler()).thenReturn(scheduler); + LeafQueue mockQueue = Mockito.mock(LeafQueue.class); + Map rmApps = new ConcurrentHashMap<>(); + Mockito.doReturn(rmApps).when(rmContext).getRMApps(); + apps = new ArrayList<>(); + for (int i = 0; i < NUM_APPS; i++) { + ApplicationAttemptId appAttemptId = + TestUtils.getMockApplicationAttemptId(i, 0); + RMApp mockApp = Mockito.mock(RMApp.class); + Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp) + .getApplicationId(); + rmApps.put(appAttemptId.getApplicationId(), mockApp); + FiCaSchedulerApp app = + new FiCaSchedulerApp(appAttemptId, "user", mockQueue, + mock(ActiveUsersManager.class), rmContext); + apps.add(app); + } + nodes = new ArrayList<>(); + for (int i = 0; i < NUM_NODES; i++) { + nodes.add(TestUtils.getMockNode("host" + i, "rack", 1, 10240)); + } + activitiesManager = new ActivitiesManager(rmContext); + threadPoolExecutor = + new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 3L, + TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + } + + /** + * Test recording activities belong to different nodes in multiple threads, + * these threads can run without interference and one activity + * should be recorded by every thread. + */ + @Test + public void testRecordingDifferentNodeActivitiesInMultiThreads() + throws Exception { + Random rand = new Random(); + List>> futures = new ArrayList<>(); + for (SchedulerNode node : nodes) { + Callable> task = () -> { + SchedulerApplicationAttempt randomApp = apps.get(rand.nextInt(NUM_APPS)); + // start recording activities for random node + activitiesManager.recordNextNodeUpdateActivities( + node.getNodeID().toString()); + // generate node/app activities + ActivitiesLogger.NODE + .startNodeUpdateRecording(activitiesManager, node.getNodeID()); + ActivitiesLogger.APP + .recordAppActivityWithoutAllocation(activitiesManager, node, + randomApp, Priority.newInstance(0), + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, + ActivityState.REJECTED); + ActivitiesLogger.NODE + .finishNodeUpdateRecording(activitiesManager, node.getNodeID()); + return activitiesManager + .getActivitiesInfo(node.getNodeID().toString()) + .getAllocations(); + }; + futures.add(threadPoolExecutor.submit(task)); + } + for (Future> future : futures) { + Assert.assertEquals(1, future.get().size()); + } + } + + /** + * Test recording activities for multi-nodes assignment in multiple threads, + * only one activity info should be recorded by one of these threads. + */ + @Test + public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads() + throws Exception { + Random rand = new Random(); + TestingActivitiesManager activitiesManager = + new TestingActivitiesManager(rmContext); + // start recording activities for multi-nodes + activitiesManager.recordNextNodeUpdateActivities( + ActivitiesManager.MULTI_NODES_AGENT_ID.toString()); + List> futures = new ArrayList<>(); + // generate node/app activities + for (SchedulerNode node : nodes) { + Callable task = () -> { + SchedulerApplicationAttempt randomApp = + apps.get(rand.nextInt(NUM_APPS)); + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + ActivitiesManager.MULTI_NODES_AGENT_ID); + ActivitiesLogger.APP + .recordAppActivityWithoutAllocation(activitiesManager, node, + randomApp, Priority.newInstance(0), + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, + ActivityState.REJECTED); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + ActivitiesManager.MULTI_NODES_AGENT_ID); + return activitiesManager.getActivitiesInfo( + ActivitiesManager.MULTI_NODES_AGENT_ID.toString()); + }; + futures.add(threadPoolExecutor.submit(task)); + } + // Check activities for multi-nodes should be recorded only once + for (Future future : futures) { + future.get(); + } + Assert.assertEquals(1, activitiesManager.activitiesChangedCount); + } + + /** + * Testing activities manager with activities changed count + */ + public class TestingActivitiesManager extends ActivitiesManager { + + public int activitiesChangedCount = 0; + + public TestingActivitiesManager(RMContext rmContext) { + super(rmContext); + } + + void finishNodeUpdateRecording(NodeId nodeId) { + List beforeValue = getLastAvailableNodeActivities(); + super.finishNodeUpdateRecording(nodeId); + List afterValue = getLastAvailableNodeActivities(); + if (beforeValue != afterValue) { + activitiesChangedCount++; + } + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java index 40cf483cd3a..74ce757f505 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -95,16 +95,12 @@ public void testAssignMultipleContainersPerNodeHeartbeat() response.getType().toString()); json = response.getEntity(JSONObject.class); - verifyNumberOfAllocations(json, 11); - - JSONArray allocations = json.getJSONArray("allocations"); - for (int i = 0; i < allocations.length(); i++) { - if (i != allocations.length() - 1) { - verifyStateOfAllocations(allocations.getJSONObject(i), - "finalAllocationState", "ALLOCATED"); - verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1"); - } - } + // Collection logic of scheduler activities changed after YARN-9313, + // only one allocation should be recorded for all scenarios. + verifyNumberOfAllocations(json, 1); + verifyStateOfAllocations(json.getJSONObject("allocations"), + "finalAllocationState", "ALLOCATED"); + verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b2-b3-b1"); } finally { rm.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java new file mode 100644 index 00000000000..da77d3a8528 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import com.google.inject.Guice; +import com.google.inject.servlet.ServletModule; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; +import com.sun.jersey.test.framework.WebAppDescriptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.JettyUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.GuiceServletConfig; +import org.apache.hadoop.yarn.webapp.JerseyTestBase; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.MediaType; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled + extends JerseyTestBase { + + protected static MockRM rm; + protected static CapacitySchedulerConfiguration csConf; + protected static YarnConfiguration conf; + + public TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled() { + super(new WebAppDescriptor.Builder( + "org.apache.hadoop.yarn.server.resourcemanager.webapp") + .contextListenerClass(GuiceServletConfig.class) + .filterClass(com.google.inject.servlet.GuiceFilter.class) + .contextPath("jersey-guice-filter").servletPath("/").build()); + } + + private static class WebServletModule extends ServletModule { + @Override + protected void configureServlets() { + bind(JAXBContextResolver.class); + bind(RMWebServices.class); + bind(GenericExceptionHandler.class); + csConf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + + conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // enable multi-nodes placement + conf.setBoolean( + CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true); + rm = new MockRM(conf); + bind(ResourceManager.class).toInstance(rm); + serve("/*").with(GuiceContainer.class); + } + } + + private static void setupQueueConfiguration( + CapacitySchedulerConfiguration config) { + // Define top-level queues + config.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + config.setCapacity(A, 10.5f); + config.setMaximumCapacity(A, 50); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + config.setCapacity(B, 89.5f); + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule())); + } + + @Test + public void testAppAssign() throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 2 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "127.0.0.1", + Resources.createResource(1024), 1), ResourceRequest + .newInstance(Priority.UNDEFINED, "/default-rack", + Resources.createResource(1024), 1), ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024), + 1)), null); + + //Trigger recording for multi-nodes without params + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + //Trigger scheduling for this app + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + + //Check scheduler activities, it should contain one allocation and + // final allocation state is ALLOCATED + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + JSONObject json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, + "finalAllocationState", "ALLOCATED"); + } + finally { + rm.stop(); + } + } + + @Test + public void testSchedulingWithoutPendingRequests() + throws Exception { + //Start RM so that it accepts app submissions + rm.start(); + + MockNM nm = new MockNM("127.0.0.1:1234", 8 * 1024, + rm.getResourceTrackerService()); + nm.registerNode(); + + try { + //Trigger recording for multi-nodes without params + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + //Trigger scheduling for this app + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + + //Check scheduler activities, it should contain one allocation and + // final allocation state is SKIPPED + response = r.path("ws").path("v1").path("cluster").path( + "scheduler/activities").accept( + MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + JSONObject json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, + "finalAllocationState", "SKIPPED"); + } + finally { + rm.stop(); + } + } + + private void verifyNumberOfAllocations(JSONObject json, int realValue) + throws Exception { + if (json.isNull("allocations")) { + assertEquals("Number of allocations is wrong", 0, realValue); + } else { + Object object = json.get("allocations"); + if (object.getClass() == JSONObject.class) { + assertEquals("Number of allocations is wrong", 1, realValue); + } else if (object.getClass() == JSONArray.class) { + assertEquals("Number of allocations is wrong in: " + object, + ((JSONArray) object).length(), realValue); + } + } + } + + private void verifyStateOfAllocations(JSONObject allocation, + String nameToCheck, String realState) throws Exception { + assertEquals("State of allocation is wrong", allocation.get(nameToCheck), + realState); + } +}