diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 9dc59451499..9d5956f6748 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -107,6 +108,7 @@ private RMAppLifetimeMonitor rmAppLifetimeMonitor; private QueueLimitCalculator queueLimitCalculator; + private PlacementTagsManager placementTagsManager; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -398,6 +400,19 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { @Private @Unstable + public PlacementTagsManager getPlacementTagsManager() { + return placementTagsManager; + } + + @Private + @Unstable + public void setPlacementTagsManager( + PlacementTagsManager placementTagsManager) { + this.placementTagsManager = placementTagsManager; + } + + @Private + @Unstable public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { return rmDelegatedNodeLabelsUpdater; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index ec940304660..1b15ec06f7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager; @@ -166,4 +167,8 @@ void setRMDelegatedNodeLabelsUpdater( void setResourceProfilesManager(ResourceProfilesManager mgr); String getAppProxyUrl(Configuration conf, ApplicationId applicationId); + + PlacementTagsManager getPlacementTagsManager(); + + void setPlacementTagsManager(PlacementTagsManager placementTagsManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 80a91096271..bcce818d50e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager; @@ -504,6 +505,17 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { } @Override + public PlacementTagsManager getPlacementTagsManager() { + return activeServiceContext.getPlacementTagsManager(); + } + + @Override + public void setPlacementTagsManager( + PlacementTagsManager placementTagsManager) { + activeServiceContext.setPlacementTagsManager(placementTagsManager); + } + + @Override public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { return activeServiceContext.getRMDelegatedNodeLabelsUpdater(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6f8a0a48edb..1bcaf74060b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -493,6 +494,10 @@ protected RMNodeLabelsManager createNodeLabelManager() throws InstantiationException, IllegalAccessException { return new RMNodeLabelsManager(); } + + protected PlacementTagsManager createPlacementTagsManager() { + return new PlacementTagsManager(); + } protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); @@ -612,6 +617,9 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(nlm); rmContext.setNodeLabelManager(nlm); + PlacementTagsManager placementTagsManager = createPlacementTagsManager(); + rmContext.setPlacementTagsManager(placementTagsManager); + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater(); if (delegatedNodeLabelsUpdater != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/InvalidPlacementTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/InvalidPlacementTagsQueryException.java new file mode 100644 index 00000000000..4195301b9dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/InvalidPlacementTagsQueryException.java @@ -0,0 +1,35 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception when invalid parameter specified to do placement tags related + * queries. + */ +public class InvalidPlacementTagsQueryException extends YarnException { + private static final long serialVersionUID = 12312831974894L; + + public InvalidPlacementTagsQueryException(String msg) { + super(msg); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementTagsManager.java new file mode 100644 index 00000000000..48d98065563 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementTagsManager.java @@ -0,0 +1,297 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.LongBinaryOperator; + +/** + * Support storing maps between container-tags/applications and + * nodes. This will be required by affinity/anti-affinity implementation and + * cardinality. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PlacementTagsManager { + private static final Logger LOG = Logger.getLogger( + PlacementTagsManager.class); + + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + + // Application's tags to node + private Map perAppMappings = + new HashMap<>(); + + // Global tags to node mapping (used to fast return aggregated tags + // cardinality across apps) + private NodeToCountedTags globalMapping = new NodeToCountedTags(); + + /** + * Store node to counted tags. + */ + @VisibleForTesting + static class NodeToCountedTags { + // Map> + private Map> nodeToTagsWithCount = + new HashMap<>(); + + // protected by external locks + private void addTagsToNode(NodeId nodeId, Set tags) { + Map innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId, + k -> new HashMap<>()); + + for (String tag : tags) { + Long count = innerMap.get(tag); + if (count == null) { + innerMap.put(tag, 1L); + } else{ + innerMap.put(tag, count + 1); + } + } + } + + private void removeTagsFromNode(NodeId nodeId, Set tags) { + Map innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + return; + } + + for (String tag : tags) { + Long count = innerMap.get(tag); + if (count > 1) { + innerMap.put(tag, count - 1); + } else{ + innerMap.remove(tag); + } + } + + if (innerMap.isEmpty()) { + nodeToTagsWithCount.remove(nodeId); + } + } + + private long getCardinality(NodeId nodeId, Set tags, + LongBinaryOperator op) { + Map innerMap = nodeToTagsWithCount.get(nodeId); + if (innerMap == null) { + return 0; + } + + long returnValue = 0; + int numValues = 0; + + if (tags != null && !tags.isEmpty()) { + for (String tag : tags) { + Long value = innerMap.get(tag); + if (value == null) { + value = 0L; + } + numValues++; + + // For the first value, we will not apply op + if (numValues == 1) { + returnValue = value; + continue; + } + returnValue = op.applyAsLong(returnValue, value); + } + } else{ + // Similar to above if, but only iterate values for better performance + for (long value : innerMap.values()) { + numValues++; + + // For the first value, we will not apply op + if (numValues == 1) { + returnValue = value; + continue; + } + returnValue = op.applyAsLong(returnValue, value); + } + } + return returnValue; + } + + private boolean isEmpty() { + return nodeToTagsWithCount.isEmpty(); + } + + @VisibleForTesting + public Map> getNodeToTagsWithCount() { + return nodeToTagsWithCount; + } + } + + @VisibleForTesting + Map getPerAppMappings() { + return perAppMappings; + } + + @VisibleForTesting + NodeToCountedTags getGlobalMapping() { + return globalMapping; + } + + public PlacementTagsManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + /** + * Notify container allocated on a node + * + * @param nodeId allocated node. + * @param applicationId applicationId + * @param containerId container id. + * @param allocationTags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()} + * application_id will be added to allocationTags. + */ + public void addContainer(NodeId nodeId, ApplicationId applicationId, + ContainerId containerId, Set allocationTags) { + if (allocationTags == null || allocationTags.isEmpty()) { + allocationTags = ImmutableSet.of(applicationId.toString()); + } else{ + // Copy before edit it. + allocationTags = new HashSet<>(allocationTags); + allocationTags.add(applicationId.toString()); + } + + writeLock.lock(); + try { + NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent( + applicationId, k -> new NodeToCountedTags()); + perAppTagsMapping.addTagsToNode(nodeId, allocationTags); + globalMapping.addTagsToNode(nodeId, allocationTags); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Added container=" + containerId + " with tags=[" + StringUtils + .join(allocationTags, ",") + "]"); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Notify container removed. + * + * @param nodeId nodeId + * @param applicationId applicationId + * @param containerId containerId. + * @param allocationTags allocation tags for given container + */ + public void removeContainer(NodeId nodeId, ApplicationId applicationId, + ContainerId containerId, Set allocationTags) { + if (allocationTags == null || allocationTags.isEmpty()) { + allocationTags = ImmutableSet.of(applicationId.toString()); + } else{ + // Copy before edit it. + allocationTags = new HashSet<>(allocationTags); + allocationTags.add(applicationId.toString()); + } + + writeLock.lock(); + try { + NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId); + if (perAppTagsMapping == null) { + return; + } + + perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags); + if (perAppTagsMapping.isEmpty()) { + perAppMappings.remove(applicationId); + } + globalMapping.removeTagsFromNode(nodeId, allocationTags); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Removed container=" + containerId + " with tags=[" + StringUtils + .join(allocationTags, ",") + "]"); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Get cardinality for following conditions: + * + * @param nodeId nodeId, required. + * @param applicationId applicationId. When null is specified, return + * aggregated cardinality among all nodes. + * @param tags allocation tags, see + * {@link SchedulingRequest#getAllocationTags()}, + * When multiple tags specified. Returns cardinality depends on + * op. If a specified tag doesn't exist, 0 will be its cardinality. + * When null/empty tags specified, all tags (of the node/app) will be + * considered. + * @param op operator. Such as Long::max, Long::sum, etc. Required. This + * parameter only take effect when #values >= 2. + * @return cardinality of specified query on the node. + * @throws InvalidPlacementTagsQueryException when illegal query + * parameter specified + */ + public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId, + Set tags, LongBinaryOperator op) + throws InvalidPlacementTagsQueryException { + readLock.lock(); + + try { + if (nodeId == null || op == null) { + throw new InvalidPlacementTagsQueryException( + "Must specify nodeId/tags/op to query cardinality"); + } + + NodeToCountedTags mapping; + if (applicationId != null) { + mapping = perAppMappings.get(applicationId); + } else{ + mapping = globalMapping; + } + + if (mapping == null) { + return 0; + } + + return mapping.getCardinality(nodeId, tags, op); + } finally { + readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 29680e5d87b..1967cf8b326 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import java.util.List; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -114,4 +116,10 @@ boolean completed(); NodeId getNodeId(); + + /** + * Return {@link SchedulingRequest#getAllocationTags()} specified by AM. + * @return allocation tags, could be null/empty + */ + Set getAllocationTags(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index a43459cfbb5..65316008438 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -188,6 +189,9 @@ private boolean isExternallyAllocated; private SchedulerRequestKey allocatedSchedulerKey; + // TODO, set it when container allocated by scheduler. + private Set allocationTags = null; + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { @@ -501,6 +505,11 @@ public NodeId getNodeId() { return nodeId; } + @Override + public Set getAllocationTags() { + return allocationTags; + } + private static class BaseTransition implements SingleArcTransition { @@ -565,6 +574,12 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Notify placementManager + container.rmContext.getPlacementTagsManager().addContainer( + container.getNodeId(), + container.getApplicationAttemptId().getApplicationId(), + container.getContainerId(), container.getAllocationTags()); + container.eventHandler.handle(new RMAppAttemptEvent( container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED)); } @@ -676,6 +691,12 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Notify placementManager + container.rmContext.getPlacementTagsManager().removeContainer( + container.getNodeId(), + container.getApplicationAttemptId().getApplicationId(), + container.getContainerId(), container.getAllocationTags()); + RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; container.finishTime = System.currentTimeMillis(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementTagsManager.java new file mode 100644 index 00000000000..113314411b7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementTagsManager.java @@ -0,0 +1,314 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test functionality of PlacementTagsManager + */ +public class TestPlacementTagsManager { + @Test + public void testPlacementTagsManagerSimpleCases() + throws InvalidPlacementTagsQueryException { + PlacementTagsManager ptm = new PlacementTagsManager(); + + /** + * Construct test case: + * Node1: + * container_1_1 (mapper/reducer/app_1) + * container_1_3 (service/app_1) + * + * Node2: + * container_1_2 (mapper/reducer/app_1) + * container_1_4 (reducer/app_1) + * container_2_1 (service/app_2) + */ + + // 3 Containers from app1 + ptm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + ptm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + // 1 Container from app2 + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Get Cardinality of app1 on node1, with tag "mapper" + Assert.assertEquals(1, + ptm.getNodeCardinality(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::max)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min + Assert.assertEquals(1, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::min)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max + Assert.assertEquals(2, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::max)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum + Assert.assertEquals(3, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::sum)); + + // Get Cardinality of app1 on node2, with tag "no_existed/reducer", op=min + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("no_existed", "reducer"), Long::min)); + + // Get Cardinality of app1 on node2, with tag "", op=max + // (Expect this returns #containers from app1 on node2) + Assert.assertEquals(2, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()), + Long::max)); + + // Get Cardinality of app1 on node2, with empty tag set, op=max + Assert.assertEquals(2, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); + + // Get Cardinality of all apps on node2, with empty tag set, op=sum + Assert.assertEquals(7, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), null, + ImmutableSet.of(), Long::sum)); + + // Get Cardinality of app_1 on node2, with empty tag set, op=sum + Assert.assertEquals(5, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); + + // Get Cardinality of app_1 on node2, with empty tag set, op=sum + Assert.assertEquals(2, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); + + // Finish all containers: + ptm.removeContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + ptm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + ptm.removeContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + ptm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + ptm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Expect all cardinality to be 0 + // Get Cardinality of app1 on node1, with tag "mapper" + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::max)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::min)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::max)); + + // Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer"), Long::sum)); + + // Get Cardinality of app1 on node2, with tag "", op=max + // (Expect this returns #containers from app1 on node2) + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()), + Long::max)); + + // Get Cardinality of app1 on node2, with empty tag set, op=max + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); + + // Get Cardinality of all apps on node2, with empty tag set, op=sum + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), null, + ImmutableSet.of(), Long::sum)); + + // Get Cardinality of app_1 on node2, with empty tag set, op=sum + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum)); + + // Get Cardinality of app_1 on node2, with empty tag set, op=sum + Assert.assertEquals(0, + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum)); + } + + @Test + public void testPlacementTagsManagerMemoryAfterCleanup() + throws InvalidPlacementTagsQueryException { + /** + * Make sure YARN cleans up all memory once container/app finishes. + */ + + PlacementTagsManager ptm = new PlacementTagsManager(); + + // Add a bunch of containers + ptm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + ptm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Remove all these containers + ptm.removeContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + ptm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + ptm.removeContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + ptm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + ptm.removeContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // Check internal data structure + Assert.assertEquals(0, + ptm.getGlobalMapping().getNodeToTagsWithCount().size()); + Assert.assertEquals(0, ptm.getPerAppMappings().size()); + } + + @Test + public void testQueryCardinalityWithIllegalParameters() + throws InvalidPlacementTagsQueryException { + /** + * Make sure YARN cleans up all memory once container/app finishes. + */ + + PlacementTagsManager ptm = new PlacementTagsManager(); + + // Add a bunch of containers + ptm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1), + ImmutableSet.of("mapper", "reducer")); + + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2), + ImmutableSet.of("mapper", "reducer")); + + ptm.addContainer(NodeId.fromString("node1:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3), + ImmutableSet.of("service")); + + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4), + ImmutableSet.of("reducer")); + + ptm.addContainer(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3), + ImmutableSet.of("service")); + + // No node-id + boolean caughtException = false; + try { + ptm.getNodeCardinality(null, TestUtils.getMockApplicationId(2), + ImmutableSet.of("mapper"), Long::min); + } catch (InvalidPlacementTagsQueryException e) { + caughtException = true; + } + Assert.assertTrue("should fail because of nodeId specified", + caughtException); + + // No op + caughtException = false; + try { + ptm.getNodeCardinality(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null); + } catch (InvalidPlacementTagsQueryException e) { + caughtException = true; + } + Assert.assertTrue("should fail because of nodeId specified", + caughtException); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index db3144898fd..9c0e8ed7d78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -109,6 +110,8 @@ public void testReleaseWhileRunning() { when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getRMApps()).thenReturn(rmApps); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + PlacementTagsManager ptm = mock(PlacementTagsManager.class); + when(rmContext.getPlacementTagsManager()).thenReturn(ptm); YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean( YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, @@ -209,6 +212,8 @@ public void testExpireWhileRunning() { when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + PlacementTagsManager ptm = mock(PlacementTagsManager.class); + when(rmContext.getPlacementTagsManager()).thenReturn(ptm); YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean( @@ -367,4 +372,123 @@ public void testStoreOnlyAMContainerMetrics() throws Exception { verify(publisher, times(1)).containerCreated(any(RMContainer.class), anyLong()); verify(publisher, times(1)).containerFinished(any(RMContainer.class), anyLong()); } + + @Test + public void testContainerTransitionNotifyPlacementTagsManager() + throws Exception { + DrainDispatcher drainDispatcher = new DrainDispatcher(); + EventHandler appAttemptEventHandler = mock( + EventHandler.class); + EventHandler generic = mock(EventHandler.class); + drainDispatcher.register(RMAppAttemptEventType.class, + appAttemptEventHandler); + drainDispatcher.register(RMNodeEventType.class, generic); + drainDispatcher.init(new YarnConfiguration()); + drainDispatcher.start(); + NodeId nodeId = BuilderUtils.newNodeId("host", 3425); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); + + Resource resource = BuilderUtils.newResource(512, 1); + Priority priority = BuilderUtils.newPriority(5); + + Container container = BuilderUtils.newContainer(containerId, nodeId, + "host:3465", resource, priority, null); + ConcurrentMap rmApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + when(rmApp.getRMAppAttempt(Matchers.any())).thenReturn(null); + Mockito.doReturn(rmApp).when(rmApps).get(Matchers.any()); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + PlacementTagsManager tagsManager = new PlacementTagsManager(); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getRMApps()).thenReturn(rmApps); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + when(rmContext.getPlacementTagsManager()).thenReturn(tagsManager); + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, + true); + when(rmContext.getYarnConfiguration()).thenReturn(conf); + + /* First container: ALLOCATED -> KILLED */ + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, + nodeId, "user", rmContext); + + Assert.assertEquals(0, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + + Assert.assertEquals(1, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + + rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.KILL)); + + Assert.assertEquals(0, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + + /* Second container: ACQUIRED -> FINISHED */ + rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, + nodeId, "user", rmContext); + + Assert.assertEquals(0, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + + Assert.assertEquals(1, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); + + rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED)); + + Assert.assertEquals(0, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + + /* Third container: RUNNING -> FINISHED */ + rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, + nodeId, "user", rmContext); + + Assert.assertEquals(0, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + + Assert.assertEquals(1, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); + + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); + + rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus + .newInstance(containerId, ContainerState.COMPLETE, "", 0), + RMContainerEventType.FINISHED)); + + Assert.assertEquals(0, + tagsManager.getNodeCardinality(nodeId, appId, null, Long::max)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 8036a409df8..7abe5d9b1c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -135,6 +136,9 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { new DefaultResourceCalculator()); rmContext.setScheduler(mockScheduler); + PlacementTagsManager ptm = mock(PlacementTagsManager.class); + rmContext.setPlacementTagsManager(ptm); + return rmContext; } @@ -234,6 +238,11 @@ public static ContainerId getMockContainerId(FiCaSchedulerApp application) { doReturn(id).when(containerId).getContainerId(); return containerId; } + + public static ContainerId getMockContainerId(int appId, int containerId) { + ApplicationAttemptId attemptId = getMockApplicationAttemptId(appId, 1); + return ContainerId.newContainerId(attemptId, containerId); + } public static Container getMockContainer( ContainerId containerId, NodeId nodeId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 3f97b59551c..09616a5fc6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -234,6 +235,8 @@ public void testNodeLocalAssignment() throws Exception { FifoScheduler scheduler = new FifoScheduler(); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler); + PlacementTagsManager ptm = mock(PlacementTagsManager.class); + rmContext.setPlacementTagsManager(ptm); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter( mock(RMApplicationHistoryWriter.class)); @@ -312,12 +315,14 @@ public void testUpdateResourceOnNode() throws Exception { FifoScheduler scheduler = new FifoScheduler(); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler); + PlacementTagsManager ptm = mock(PlacementTagsManager.class); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); ((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration()); NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager(); nlm.init(new Configuration()); rmContext.setNodeLabelManager(nlm); + rmContext.setPlacementTagsManager(ptm); scheduler.setRMContext(rmContext); ((RMContextImpl) rmContext).setScheduler(scheduler);