diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
deleted file mode 100644
index 12c32fc..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ProfileCapability;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.api.records.UpdatedContainer;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Class that tests the allocation of OPPORTUNISTIC containers through the
- * centralized ResourceManager.
- */
-public class TestOpportunisticContainerAllocation {
- private static Configuration conf = null;
- private static MiniYARNCluster yarnCluster = null;
- private static YarnClient yarnClient = null;
- private static List nodeReports = null;
- private static int nodeCount = 3;
-
- private static final int ROLLING_INTERVAL_SEC = 13;
- private static final long AM_EXPIRE_MS = 4000;
-
- private static Resource capability;
- private static ProfileCapability profileCapability;
- private static Priority priority;
- private static Priority priority2;
- private static Priority priority3;
- private static Priority priority4;
- private static String node;
- private static String rack;
- private static String[] nodes;
- private static String[] racks;
- private final static int DEFAULT_ITERATION = 3;
-
- // Per test..
- private ApplicationAttemptId attemptId = null;
- private AMRMClientImpl amClient = null;
- private long availMB;
- private int availVCores;
- private long allocMB;
- private int allocVCores;
-
- @BeforeClass
- public static void setup() throws Exception {
- // start minicluster
- conf = new YarnConfiguration();
- conf.setLong(
- YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
- ROLLING_INTERVAL_SEC);
- conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
- conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
- // set the minimum allocation so that resource decrease can go under 1024
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
- conf.setBoolean(
- YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
- conf.setInt(
- YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
- conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
- yarnCluster =
- new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
- yarnCluster.init(conf);
- yarnCluster.start();
-
- // start rm client
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
-
- // get node info
- nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
-
- priority = Priority.newInstance(1);
- priority2 = Priority.newInstance(2);
- priority3 = Priority.newInstance(3);
- priority4 = Priority.newInstance(4);
- capability = Resource.newInstance(512, 1);
- profileCapability = ProfileCapability.newInstance(capability);
-
- node = nodeReports.get(0).getNodeId().getHost();
- rack = nodeReports.get(0).getRackName();
- nodes = new String[]{node};
- racks = new String[]{rack};
- }
-
- @Before
- public void startApp() throws Exception {
- // submit new app
- ApplicationSubmissionContext appContext =
- yarnClient.createApplication().getApplicationSubmissionContext();
- ApplicationId appId = appContext.getApplicationId();
- // set the application name
- appContext.setApplicationName("Test");
- // Set the priority for the application master
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(0);
- appContext.setPriority(pri);
- // Set the queue to which this application is to be submitted in the RM
- appContext.setQueue("default");
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
- Collections.emptyMap(),
- new HashMap(), Arrays.asList("sleep", "100"),
- new HashMap(), null,
- new HashMap());
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(Resource.newInstance(1024, 1));
- // Create the request to send to the applications manager
- SubmitApplicationRequest appRequest =
- Records.newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
- // Submit the application to the applications manager
- yarnClient.submitApplication(appContext);
-
- // wait for app to start
- RMAppAttempt appAttempt = null;
- while (true) {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- if (appReport.getYarnApplicationState() ==
- YarnApplicationState.ACCEPTED) {
- attemptId = appReport.getCurrentApplicationAttemptId();
- appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
- .get(attemptId.getApplicationId()).getCurrentAppAttempt();
- while (true) {
- if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
- break;
- }
- }
- break;
- }
- }
- // Just dig into the ResourceManager and get the AMRMToken just for the sake
- // of testing.
- UserGroupInformation.setLoginUser(UserGroupInformation
- .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-
- // emulate RM setup of AMRM token in credentials by adding the token
- // *before* setting the token service
- UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
- appAttempt.getAMRMToken()
- .setService(ClientRMProxy.getAMRMTokenService(conf));
-
- // start am rm client
- amClient = (AMRMClientImpl)AMRMClient
- .createAMRMClient();
-
- //setting an instance NMTokenCache
- amClient.setNMTokenCache(new NMTokenCache());
- //asserting we are not using the singleton instance cache
- Assert.assertNotSame(NMTokenCache.getSingleton(),
- amClient.getNMTokenCache());
-
- amClient.init(conf);
- amClient.start();
-
- amClient.registerApplicationMaster("Host", 10000, "");
- }
-
- @After
- public void cancelApp() throws YarnException, IOException {
- try {
- amClient
- .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
- null);
- } finally {
- if (amClient != null &&
- amClient.getServiceState() == Service.STATE.STARTED) {
- amClient.stop();
- }
- }
- yarnClient.killApplication(attemptId.getApplicationId());
- attemptId = null;
- }
-
- @AfterClass
- public static void tearDown() {
- if (yarnClient != null &&
- yarnClient.getServiceState() == Service.STATE.STARTED) {
- yarnClient.stop();
- }
- if (yarnCluster != null &&
- yarnCluster.getServiceState() == Service.STATE.STARTED) {
- yarnCluster.stop();
- }
- }
-
- @Test(timeout = 60000)
- public void testPromotionFromAcquired() throws YarnException, IOException {
- // setup container request
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- int oppContainersRequestedAny =
- amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
- .getNumContainers();
-
- assertEquals(1, oppContainersRequestedAny);
-
- assertEquals(1, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- Map allocatedOpportContainers = new HashMap<>();
- int iterationsLeft = 50;
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- updateMetrics("Before Opp Allocation");
-
- while (allocatedContainerCount < oppContainersRequestedAny
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- allocatedContainerCount +=
- allocResponse.getAllocatedContainers().size();
- for (Container container : allocResponse.getAllocatedContainers()) {
- if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
- allocatedOpportContainers.put(container.getId(), container);
- removeCR(container);
- }
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < oppContainersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(oppContainersRequestedAny, allocatedContainerCount);
- assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
-
- updateMetrics("After Opp Allocation / Before Promotion");
-
- try {
- Container c = allocatedOpportContainers.values().iterator().next();
- amClient.requestContainerUpdate(
- c, UpdateContainerRequest.newInstance(c.getVersion(),
- c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
- null, ExecutionType.OPPORTUNISTIC));
- Assert.fail("Should throw Exception..");
- } catch (IllegalArgumentException e) {
- System.out.println("## " + e.getMessage());
- Assert.assertTrue(e.getMessage().contains(
- "target should be GUARANTEED and original should be OPPORTUNISTIC"));
- }
-
- Container c = allocatedOpportContainers.values().iterator().next();
- amClient.requestContainerUpdate(
- c, UpdateContainerRequest.newInstance(c.getVersion(),
- c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
- null, ExecutionType.GUARANTEED));
- iterationsLeft = 120;
- Map updatedContainers = new HashMap<>();
- // do a few iterations to ensure RM is not going to send new containers
- while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
- // inform RM of rejection
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- // RM did not send new containers because AM does not need any
- if (allocResponse.getUpdatedContainers() != null) {
- for (UpdatedContainer updatedContainer : allocResponse
- .getUpdatedContainers()) {
- System.out.println("Got update..");
- updatedContainers.put(updatedContainer.getContainer().getId(),
- updatedContainer);
- }
- }
- if (iterationsLeft > 0) {
- // sleep to make sure NM's heartbeat
- sleep(100);
- }
- }
-
- updateMetrics("After Promotion");
-
- assertEquals(1, updatedContainers.size());
- for (ContainerId cId : allocatedOpportContainers.keySet()) {
- Container orig = allocatedOpportContainers.get(cId);
- UpdatedContainer updatedContainer = updatedContainers.get(cId);
- assertNotNull(updatedContainer);
- assertEquals(ExecutionType.GUARANTEED,
- updatedContainer.getContainer().getExecutionType());
- assertEquals(orig.getResource(),
- updatedContainer.getContainer().getResource());
- assertEquals(orig.getNodeId(),
- updatedContainer.getContainer().getNodeId());
- assertEquals(orig.getVersion() + 1,
- updatedContainer.getContainer().getVersion());
- }
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
- amClient.ask.clear();
- }
-
- @Test(timeout = 60000)
- public void testDemotionFromAcquired() throws YarnException, IOException {
- // setup container request
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority3));
-
- int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
- .remoteRequest.getNumContainers();
-
- assertEquals(1, guarContainersRequestedAny);
-
- assertEquals(1, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- Map allocatedGuarContainers = new HashMap<>();
- int iterationsLeft = 50;
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- updateMetrics("Before Guar Allocation");
-
- while (allocatedContainerCount < guarContainersRequestedAny
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- allocatedContainerCount +=
- allocResponse.getAllocatedContainers().size();
- for (Container container : allocResponse.getAllocatedContainers()) {
- if (container.getExecutionType() == ExecutionType.GUARANTEED) {
- allocatedGuarContainers.put(container.getId(), container);
- removeCR(container);
- }
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < guarContainersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(guarContainersRequestedAny, allocatedContainerCount);
- assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
-
- updateMetrics("After Guar Allocation / Before Demotion");
-
- try {
- Container c = allocatedGuarContainers.values().iterator().next();
- amClient.requestContainerUpdate(
- c, UpdateContainerRequest.newInstance(c.getVersion(),
- c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
- null, ExecutionType.GUARANTEED));
- Assert.fail("Should throw Exception..");
- } catch (IllegalArgumentException e) {
- System.out.println("## " + e.getMessage());
- Assert.assertTrue(e.getMessage().contains(
- "target should be OPPORTUNISTIC and original should be GUARANTEED"));
- }
-
- Container c = allocatedGuarContainers.values().iterator().next();
- amClient.requestContainerUpdate(
- c, UpdateContainerRequest.newInstance(c.getVersion(),
- c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
- null, ExecutionType.OPPORTUNISTIC));
- iterationsLeft = 120;
- Map updatedContainers = new HashMap<>();
- // do a few iterations to ensure RM is not going to send new containers
- while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
- // inform RM of rejection
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- // RM did not send new containers because AM does not need any
- if (allocResponse.getUpdatedContainers() != null) {
- for (UpdatedContainer updatedContainer : allocResponse
- .getUpdatedContainers()) {
- System.out.println("Got update..");
- updatedContainers.put(updatedContainer.getContainer().getId(),
- updatedContainer);
- }
- }
- if (iterationsLeft > 0) {
- // sleep to make sure NM's heartbeat
- sleep(100);
- }
- }
-
- updateMetrics("After Demotion");
-
- assertEquals(1, updatedContainers.size());
- for (ContainerId cId : allocatedGuarContainers.keySet()) {
- Container orig = allocatedGuarContainers.get(cId);
- UpdatedContainer updatedContainer = updatedContainers.get(cId);
- assertNotNull(updatedContainer);
- assertEquals(ExecutionType.OPPORTUNISTIC,
- updatedContainer.getContainer().getExecutionType());
- assertEquals(orig.getResource(),
- updatedContainer.getContainer().getResource());
- assertEquals(orig.getNodeId(),
- updatedContainer.getContainer().getNodeId());
- assertEquals(orig.getVersion() + 1,
- updatedContainer.getContainer().getVersion());
- }
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
- amClient.ask.clear();
- }
-
- @Test(timeout = 60000)
- public void testMixedAllocationAndRelease() throws YarnException,
- IOException {
- // setup container request
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- int containersRequestedNode = amClient.getTable(0).get(priority,
- node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- int containersRequestedRack = amClient.getTable(0).get(priority,
- rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- int containersRequestedAny = amClient.getTable(0).get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
- .remoteRequest.getNumContainers();
- int oppContainersRequestedAny =
- amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
- .getNumContainers();
-
- assertEquals(4, containersRequestedNode);
- assertEquals(4, containersRequestedRack);
- assertEquals(4, containersRequestedAny);
- assertEquals(2, oppContainersRequestedAny);
-
- assertEquals(4, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- containersRequestedNode = amClient.getTable(0).get(priority,
- node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- containersRequestedRack = amClient.getTable(0).get(priority,
- rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- containersRequestedAny = amClient.getTable(0).get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
- .remoteRequest.getNumContainers();
- oppContainersRequestedAny =
- amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
- .getNumContainers();
-
- assertEquals(2, containersRequestedNode);
- assertEquals(2, containersRequestedRack);
- assertEquals(2, containersRequestedAny);
- assertEquals(1, oppContainersRequestedAny);
-
- assertEquals(4, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- int allocatedOpportContainerCount = 0;
- int iterationsLeft = 50;
- Set releases = new TreeSet<>();
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- while (allocatedContainerCount <
- containersRequestedAny + oppContainersRequestedAny
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- allocatedContainerCount +=
- allocResponse.getAllocatedContainers().size();
- for (Container container : allocResponse.getAllocatedContainers()) {
- if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
- allocatedOpportContainerCount++;
- }
- ContainerId rejectContainerId = container.getId();
- releases.add(rejectContainerId);
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < containersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(containersRequestedAny + oppContainersRequestedAny,
- allocatedContainerCount);
- assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
- for (ContainerId rejectContainerId : releases) {
- amClient.releaseAssignedContainer(rejectContainerId);
- }
- assertEquals(3, amClient.release.size());
- assertEquals(0, amClient.ask.size());
-
- // need to tell the AMRMClient that we don't need these resources anymore
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- assertEquals(4, amClient.ask.size());
-
- iterationsLeft = 3;
- // do a few iterations to ensure RM is not going to send new containers
- while (iterationsLeft-- > 0) {
- // inform RM of rejection
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- // RM did not send new containers because AM does not need any
- assertEquals(0, allocResponse.getAllocatedContainers().size());
- if (allocResponse.getCompletedContainersStatuses().size() > 0) {
- for (ContainerStatus cStatus : allocResponse
- .getCompletedContainersStatuses()) {
- if (releases.contains(cStatus.getContainerId())) {
- assertEquals(cStatus.getState(), ContainerState.COMPLETE);
- assertEquals(-100, cStatus.getExitStatus());
- releases.remove(cStatus.getContainerId());
- }
- }
- }
- if (iterationsLeft > 0) {
- // sleep to make sure NM's heartbeat
- sleep(100);
- }
- }
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
- }
-
- /**
- * Tests allocation with requests comprising only opportunistic containers.
- */
- @Test(timeout = 60000)
- public void testOpportunisticAllocation() throws YarnException, IOException {
- // setup container request
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- int oppContainersRequestedAny = amClient.getTable(0)
- .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
- profileCapability).remoteRequest.getNumContainers();
-
- assertEquals(2, oppContainersRequestedAny);
-
- assertEquals(1, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- int iterationsLeft = 10;
- Set releases = new TreeSet<>();
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- while (allocatedContainerCount < oppContainersRequestedAny
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- for (Container container : allocResponse.getAllocatedContainers()) {
- allocatedContainerCount++;
- ContainerId rejectContainerId = container.getId();
- releases.add(rejectContainerId);
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < oppContainersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(oppContainersRequestedAny, allocatedContainerCount);
- assertEquals(1, receivedNMTokens.values().size());
- }
-
- private void removeCR(Container container) {
- List extends Collection>
- matchingRequests = amClient.getMatchingRequests(container
- .getPriority(),
- ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
- container.getResource());
- Set toRemove = new HashSet<>();
- for (Collection rc : matchingRequests) {
- for (AMRMClient.ContainerRequest cr : rc) {
- toRemove.add(cr);
- }
- }
- for (AMRMClient.ContainerRequest cr : toRemove) {
- amClient.removeContainerRequest(cr);
- }
- }
-
- private void updateMetrics(String msg) {
- AbstractYarnScheduler scheduler =
- (AbstractYarnScheduler)yarnCluster.getResourceManager()
- .getResourceScheduler();
- availMB = scheduler.getRootQueueMetrics().getAvailableMB();
- availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
- allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
- allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
- System.out.println("## METRICS (" + msg + ")==>");
- System.out.println(" : availMB=" + availMB + ", " +
- "availVCores=" +availVCores + ", " +
- "allocMB=" + allocMB + ", " +
- "allocVCores=" + allocVCores + ", ");
- System.out.println("<== ##");
- }
-
- private void sleep(int sleepTime) {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
new file mode 100644
index 0000000..94cb28e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Class that tests the allocation of OPPORTUNISTIC containers through the
+ * centralized ResourceManager.
+ */
+public class TestOpportunisticContainerAllocationE2E {
+ private static Configuration conf = null;
+ private static MiniYARNCluster yarnCluster = null;
+ private static YarnClient yarnClient = null;
+ private static List nodeReports = null;
+ private static int nodeCount = 3;
+
+ private static final int ROLLING_INTERVAL_SEC = 13;
+ private static final long AM_EXPIRE_MS = 4000;
+
+ private static Resource capability;
+ private static ProfileCapability profileCapability;
+ private static Priority priority;
+ private static Priority priority2;
+ private static Priority priority3;
+ private static Priority priority4;
+ private static String node;
+ private static String rack;
+ private static String[] nodes;
+ private static String[] racks;
+ private final static int DEFAULT_ITERATION = 3;
+
+ // Per test..
+ private ApplicationAttemptId attemptId = null;
+ private AMRMClientImpl amClient = null;
+ private long availMB;
+ private int availVCores;
+ private long allocMB;
+ private int allocVCores;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // start minicluster
+ conf = new YarnConfiguration();
+ conf.setLong(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+ ROLLING_INTERVAL_SEC);
+ conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
+ conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
+ // set the minimum allocation so that resource decrease can go under 1024
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+ conf.setBoolean(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+ conf.setInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+ yarnCluster =
+ new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ // start rm client
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ // get node info
+ nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+
+ priority = Priority.newInstance(1);
+ priority2 = Priority.newInstance(2);
+ priority3 = Priority.newInstance(3);
+ priority4 = Priority.newInstance(4);
+ capability = Resource.newInstance(512, 1);
+ profileCapability = ProfileCapability.newInstance(capability);
+
+ node = nodeReports.get(0).getNodeId().getHost();
+ rack = nodeReports.get(0).getRackName();
+ nodes = new String[]{node};
+ racks = new String[]{rack};
+ }
+
+ @Before
+ public void startApp() throws Exception {
+ // submit new app
+ ApplicationSubmissionContext appContext =
+ yarnClient.createApplication().getApplicationSubmissionContext();
+ ApplicationId appId = appContext.getApplicationId();
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
+ Collections.emptyMap(),
+ new HashMap(), Arrays.asList("sleep", "100"),
+ new HashMap(), null,
+ new HashMap());
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(Resource.newInstance(1024, 1));
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest =
+ Records.newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ RMAppAttempt appAttempt = null;
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() ==
+ YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+ .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+ while (true) {
+ if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+ break;
+ }
+ }
+ break;
+ }
+ }
+ // Just dig into the ResourceManager and get the AMRMToken just for the sake
+ // of testing.
+ UserGroupInformation.setLoginUser(UserGroupInformation
+ .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+ // emulate RM setup of AMRM token in credentials by adding the token
+ // *before* setting the token service
+ UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+ appAttempt.getAMRMToken()
+ .setService(ClientRMProxy.getAMRMTokenService(conf));
+
+ // start am rm client
+ amClient = (AMRMClientImpl)AMRMClient
+ .createAMRMClient();
+
+ //setting an instance NMTokenCache
+ amClient.setNMTokenCache(new NMTokenCache());
+ //asserting we are not using the singleton instance cache
+ Assert.assertNotSame(NMTokenCache.getSingleton(),
+ amClient.getNMTokenCache());
+
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("Host", 10000, "");
+ }
+
+ @After
+ public void cancelApp() throws YarnException, IOException {
+ try {
+ amClient
+ .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+ null);
+ } finally {
+ if (amClient != null &&
+ amClient.getServiceState() == Service.STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ yarnClient.killApplication(attemptId.getApplicationId());
+ attemptId = null;
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (yarnClient != null &&
+ yarnClient.getServiceState() == Service.STATE.STARTED) {
+ yarnClient.stop();
+ }
+ if (yarnCluster != null &&
+ yarnCluster.getServiceState() == Service.STATE.STARTED) {
+ yarnCluster.stop();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testPromotionFromAcquired() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map allocatedOpportContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ updateMetrics("Before Opp Allocation");
+
+ while (allocatedContainerCount < oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ allocatedOpportContainers.put(container.getId(), container);
+ removeCR(container);
+ }
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < oppContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
+
+ updateMetrics("After Opp Allocation / Before Promotion");
+
+ try {
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ Assert.fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ Assert.assertTrue(e.getMessage().contains(
+ "target should be GUARANTEED and original should be OPPORTUNISTIC"));
+ }
+
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ iterationsLeft = 120;
+ Map updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+
+ updateMetrics("After Promotion");
+
+ assertEquals(1, updatedContainers.size());
+ for (ContainerId cId : allocatedOpportContainers.keySet()) {
+ Container orig = allocatedOpportContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.GUARANTEED,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ amClient.ask.clear();
+ }
+
+ @Test(timeout = 60000)
+ public void testDemotionFromAcquired() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority3));
+
+ int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+ .remoteRequest.getNumContainers();
+
+ assertEquals(1, guarContainersRequestedAny);
+
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map allocatedGuarContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ updateMetrics("Before Guar Allocation");
+
+ while (allocatedContainerCount < guarContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+ allocatedGuarContainers.put(container.getId(), container);
+ removeCR(container);
+ }
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < guarContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(guarContainersRequestedAny, allocatedContainerCount);
+ assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
+
+ updateMetrics("After Guar Allocation / Before Demotion");
+
+ try {
+ Container c = allocatedGuarContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ Assert.fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ Assert.assertTrue(e.getMessage().contains(
+ "target should be OPPORTUNISTIC and original should be GUARANTEED"));
+ }
+
+ Container c = allocatedGuarContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ iterationsLeft = 120;
+ Map updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+
+ updateMetrics("After Demotion");
+
+ assertEquals(1, updatedContainers.size());
+ for (ContainerId cId : allocatedGuarContainers.keySet()) {
+ Container orig = allocatedGuarContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.OPPORTUNISTIC,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ amClient.ask.clear();
+ }
+
+ @Test(timeout = 60000)
+ public void testMixedAllocationAndRelease() throws YarnException,
+ IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int containersRequestedNode = amClient.getTable(0).get(priority,
+ node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+ .getNumContainers();
+ int containersRequestedRack = amClient.getTable(0).get(priority,
+ rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+ .getNumContainers();
+ int containersRequestedAny = amClient.getTable(0).get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+ .remoteRequest.getNumContainers();
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(4, containersRequestedNode);
+ assertEquals(4, containersRequestedRack);
+ assertEquals(4, containersRequestedAny);
+ assertEquals(2, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ containersRequestedNode = amClient.getTable(0).get(priority,
+ node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+ .getNumContainers();
+ containersRequestedRack = amClient.getTable(0).get(priority,
+ rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+ .getNumContainers();
+ containersRequestedAny = amClient.getTable(0).get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+ .remoteRequest.getNumContainers();
+ oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(2, containersRequestedNode);
+ assertEquals(2, containersRequestedRack);
+ assertEquals(2, containersRequestedAny);
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int allocatedOpportContainerCount = 0;
+ int iterationsLeft = 50;
+ Set releases = new TreeSet<>();
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ while (allocatedContainerCount <
+ containersRequestedAny + oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ allocatedOpportContainerCount++;
+ }
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(containersRequestedAny + oppContainersRequestedAny,
+ allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
+ for (ContainerId rejectContainerId : releases) {
+ amClient.releaseAssignedContainer(rejectContainerId);
+ }
+ assertEquals(3, amClient.release.size());
+ assertEquals(0, amClient.ask.size());
+
+ // need to tell the AMRMClient that we don't need these resources anymore
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ assertEquals(4, amClient.ask.size());
+
+ iterationsLeft = 3;
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ assertEquals(0, allocResponse.getAllocatedContainers().size());
+ if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+ for (ContainerStatus cStatus : allocResponse
+ .getCompletedContainersStatuses()) {
+ if (releases.contains(cStatus.getContainerId())) {
+ assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+ assertEquals(-100, cStatus.getExitStatus());
+ releases.remove(cStatus.getContainerId());
+ }
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ }
+
+ /**
+ * Tests allocation with requests comprising only opportunistic containers.
+ */
+ @Test(timeout = 60000)
+ public void testOpportunisticAllocation() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int oppContainersRequestedAny = amClient.getTable(0)
+ .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+ profileCapability).remoteRequest.getNumContainers();
+
+ assertEquals(2, oppContainersRequestedAny);
+
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 10;
+ Set releases = new TreeSet<>();
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ while (allocatedContainerCount < oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ allocatedContainerCount++;
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < oppContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+ assertEquals(1, receivedNMTokens.values().size());
+ }
+
+ private void removeCR(Container container) {
+ List extends Collection>
+ matchingRequests = amClient.getMatchingRequests(container
+ .getPriority(),
+ ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+ container.getResource());
+ Set toRemove = new HashSet<>();
+ for (Collection rc : matchingRequests) {
+ for (AMRMClient.ContainerRequest cr : rc) {
+ toRemove.add(cr);
+ }
+ }
+ for (AMRMClient.ContainerRequest cr : toRemove) {
+ amClient.removeContainerRequest(cr);
+ }
+ }
+
+ private void updateMetrics(String msg) {
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler)yarnCluster.getResourceManager()
+ .getResourceScheduler();
+ availMB = scheduler.getRootQueueMetrics().getAvailableMB();
+ availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
+ allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
+ allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
+ System.out.println("## METRICS (" + msg + ")==>");
+ System.out.println(" : availMB=" + availMB + ", " +
+ "availVCores=" +availVCores + ", " +
+ "allocMB=" + allocMB + ", " +
+ "allocVCores=" + allocVCores + ", ");
+ System.out.println("<== ##");
+ }
+
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
index e403a12..f621aa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
@@ -47,6 +47,24 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
}
/**
+ * Create new Instance.
+ * @param nodeId NodeId.
+ * @param httpAddress Http address.
+ * @param rackName Rack Name.
+ * @return RemoteNode instance.
+ */
+ @Private
+ @Unstable
+ public static RemoteNode newInstance(NodeId nodeId, String httpAddress,
+ String rackName) {
+ RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
+ remoteNode.setNodeId(nodeId);
+ remoteNode.setHttpAddress(httpAddress);
+ remoteNode.setRackName(rackName);
+ return remoteNode;
+ }
+
+ /**
* Get {@link NodeId}.
* @return NodeId.
*/
@@ -79,6 +97,22 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
public abstract void setHttpAddress(String httpAddress);
/**
+ * Get Rack Name.
+ * @return Rack Name.
+ */
+ @Private
+ @Unstable
+ public abstract String getRackName();
+
+ /**
+ * Set Rack Name.
+ * @param rackName Rack Name.
+ */
+ @Private
+ @Unstable
+ public abstract void setRackName(String rackName);
+
+ /**
* Use the underlying {@link NodeId} comparator.
* @param other RemoteNode.
* @return Comparison.
@@ -92,6 +126,7 @@ public int compareTo(RemoteNode other) {
public String toString() {
return "RemoteNode{" +
"nodeId=" + getNodeId() + ", " +
+ "rackName=" + getRackName() + ", " +
"httpAddress=" + getHttpAddress() + "}";
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
index 3e4fd4a..c2492cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
@@ -118,6 +118,25 @@ public void setHttpAddress(String httpAddress) {
}
@Override
+ public String getRackName() {
+ RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasRackName()) {
+ return null;
+ }
+ return (p.getRackName());
+ }
+
+ @Override
+ public void setRackName(String rackName) {
+ maybeInitBuilder();
+ if (rackName == null) {
+ builder.clearRackName();
+ return;
+ }
+ builder.setRackName(rackName);
+ }
+
+ @Override
public int hashCode() {
return getProto().hashCode();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 6fd5228..e419d12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -45,10 +45,11 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -61,6 +62,10 @@
*/
public class OpportunisticContainerAllocator {
+ private static final int NODE_LOCAL_LOOP = 0;
+ private static final int RACK_LOCAL_LOOP = 1;
+ private static final int OFF_SWITCH_LOOP = 2;
+
/**
* This class encapsulates application specific parameters used to build a
* Container.
@@ -188,6 +193,53 @@ public long generateContainerId() {
private final BaseContainerTokenSecretManager tokenSecretManager;
+ static class Allocation {
+ private final Container container;
+ private final String resourceName;
+
+ Allocation(Container container, String resourceName) {
+ this.container = container;
+ this.resourceName = resourceName;
+ }
+
+ Container getContainer() {
+ return container;
+ }
+
+ String getResourceName() {
+ return resourceName;
+ }
+ }
+
+ static class EnrichedResourceRequest {
+ private final Set nodeLocations = new HashSet<>();
+ private final Set rackLocations = new HashSet<>();
+ private final ResourceRequest request;
+
+ EnrichedResourceRequest(ResourceRequest request) {
+ this.request = request;
+ }
+
+ ResourceRequest getRequest() {
+ return request;
+ }
+
+ void addLocation(String location) {
+ if (location.startsWith("/")) {
+ rackLocations.add(location);
+ } else {
+ nodeLocations.add(location);
+ }
+ }
+
+ Set getNodeLocations() {
+ return nodeLocations;
+ }
+
+ Set getRackLocations() {
+ return rackLocations;
+ }
+ }
/**
* Create a new Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
@@ -233,27 +285,30 @@ public OpportunisticContainerAllocator(
// might be different than what is requested, which is why
// we need the requested capability (key) to match against
// the outstanding reqs)
- Map> allocated = allocate(rmIdentifier,
+ Map> allocated = allocate(rmIdentifier,
opportContext, schedulerKey, applicationAttemptId, appSubmitter);
- for (Map.Entry> e : allocated.entrySet()) {
+ for (Map.Entry> e : allocated.entrySet()) {
opportContext.matchAllocationToOutstandingRequest(
e.getKey(), e.getValue());
- allocatedContainers.addAll(e.getValue());
+ for (Allocation alloc : e.getValue()) {
+ allocatedContainers.add(alloc.getContainer());
+ }
}
}
return allocatedContainers;
}
- private Map> allocate(long rmIdentifier,
+ private Map> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName) throws YarnException {
- Map> containers = new HashMap<>();
- for (ResourceRequest anyAsk :
+ Map> containers = new HashMap<>();
+ for (EnrichedResourceRequest enrichedAsk :
appContext.getOutstandingOpReqs().get(schedKey).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), appContext.getBlacklist(),
- appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
+ appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk);
+ ResourceRequest anyAsk = enrichedAsk.getRequest();
if (!containers.isEmpty()) {
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
@@ -270,44 +325,148 @@ private void allocateContainersInternal(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set blacklist, ApplicationAttemptId id,
Map allNodes, String userName,
- Map> containers, ResourceRequest anyAsk)
+ Map> allocations,
+ EnrichedResourceRequest enrichedAsk)
throws YarnException {
+ if (allNodes.size() == 0) {
+ LOG.info("No nodes currently available to " +
+ "allocate OPPORTUNISTIC containers.");
+ return;
+ }
+ ResourceRequest anyAsk = enrichedAsk.getRequest();
int toAllocate = anyAsk.getNumContainers()
- - (containers.isEmpty() ? 0 :
- containers.get(anyAsk.getCapability()).size());
-
- List nodesForScheduling = new ArrayList<>();
- for (Entry nodeEntry : allNodes.entrySet()) {
- // Do not use blacklisted nodes for scheduling.
- if (blacklist.contains(nodeEntry.getKey())) {
- continue;
+ - (allocations.isEmpty() ? 0 :
+ allocations.get(anyAsk.getCapability()).size());
+ int numAllocated = 0;
+ // Node Candidates are selected as follows:
+ // * Node local candidates selected in loop == 0
+ // * Rack local candidates selected in loop == 1
+ // * From loop == 2 onwards, we revert to off switch allocations.
+ int loopIndex = 2;
+ boolean shouldConsiderNodeLocals =
+ enrichedAsk.getNodeLocations().size() > 0;
+ if (shouldConsiderNodeLocals) {
+ loopIndex = 0;
+ }
+ boolean shouldConsiderRackLocals =
+ enrichedAsk.getRackLocations().size() > 0;
+ while (numAllocated < toAllocate) {
+ Collection nodeCandidates =
+ findNodeCandidates(loopIndex, allNodes, enrichedAsk);
+ for (RemoteNode rNode : nodeCandidates) {
+ String rNodeHost = rNode.getNodeId().getHost();
+ // Ignore black list
+ if (blacklist.contains(rNodeHost)) {
+ LOG.info("Nodes for scheduling has a blacklisted node" +
+ " [" + rNodeHost + "]..");
+ continue;
+ }
+ if (shouldConsiderNodeLocals && loopIndex == NODE_LOCAL_LOOP &&
+ !enrichedAsk.getNodeLocations().contains(rNodeHost)) {
+ continue;
+ }
+ if (shouldConsiderRackLocals && loopIndex == RACK_LOCAL_LOOP &&
+ !enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
+ continue;
+ }
+ Container container = createContainer(rmIdentifier, appParams,
+ idCounter, id, userName, allocations, ResourceRequest.ANY,
+ anyAsk, rNode);
+ numAllocated++;
+ LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
+ if (numAllocated >= toAllocate) {
+ break;
+ }
+ }
+ if (loopIndex == NODE_LOCAL_LOOP && shouldConsiderRackLocals) {
+ loopIndex = RACK_LOCAL_LOOP;
+ } else {
+ loopIndex++;
+ }
+ // Handle case where there are no nodes remaining after blacklist is
+ // considered.
+ if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) {
+ LOG.warn("Unable to allocate any opportunistic containers.");
+ break;
}
- nodesForScheduling.add(nodeEntry.getValue());
}
- if (nodesForScheduling.isEmpty()) {
- LOG.warn("No nodes available for allocating opportunistic containers. [" +
- "allNodes=" + allNodes + ", " +
- "blacklist=" + blacklist + "]");
- return;
+ }
+
+ private Collection findNodeCandidates(int loopIndex,
+ Map allNodes, EnrichedResourceRequest enrichedRR) {
+ if (loopIndex > 1) {
+ return allNodes.values();
+ } else {
+ List retList = new ArrayList<>();
+ int numContainers = enrichedRR.getRequest().getNumContainers();
+ while (numContainers > 0) {
+ if (loopIndex == 0) {
+ // Node local candidates
+ numContainers = collectNodeLocalCandidates(
+ allNodes, enrichedRR, retList, numContainers);
+ } else {
+ // Rack local candidates
+ numContainers = collectRackLocalCandidates(
+ allNodes, enrichedRR, retList, numContainers);
+ }
+ if (numContainers == enrichedRR.getRequest().getNumContainers()) {
+ // If there is no change in numContainers, then there is no point
+ // in looping again.
+ break;
+ }
+ }
+ return retList;
}
- int numAllocated = 0;
- int nextNodeToSchedule = 0;
- for (int numCont = 0; numCont < toAllocate; numCont++) {
- nextNodeToSchedule++;
- nextNodeToSchedule %= nodesForScheduling.size();
- RemoteNode node = nodesForScheduling.get(nextNodeToSchedule);
- Container container = buildContainer(rmIdentifier, appParams, idCounter,
- anyAsk, id, userName, node);
- List cList = containers.get(anyAsk.getCapability());
- if (cList == null) {
- cList = new ArrayList<>();
- containers.put(anyAsk.getCapability(), cList);
+ }
+
+ private int collectRackLocalCandidates(Map allNodes,
+ EnrichedResourceRequest enrichedRR, List retList,
+ int numContainers) {
+ // This is an optimization to ensure, we loop over allNodes only once
+ Collection toIterate =
+ (retList.size() == 0) ? allNodes.values() : retList;
+ for (RemoteNode rNode : toIterate) {
+ if (enrichedRR.getRackLocations().contains(rNode.getRackName())) {
+ retList.add(rNode);
+ numContainers--;
+ }
+ if (numContainers == 0) {
+ break;
+ }
+ }
+ return numContainers;
+ }
+
+ private int collectNodeLocalCandidates(Map allNodes,
+ EnrichedResourceRequest enrichedRR, List retList,
+ int numContainers) {
+ for (String nodeName : enrichedRR.getNodeLocations()) {
+ RemoteNode remoteNode = allNodes.get(nodeName);
+ if (remoteNode != null) {
+ retList.add(remoteNode);
+ numContainers--;
+ }
+ if (numContainers == 0) {
+ break;
}
- cList.add(container);
- numAllocated++;
- LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
}
- LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+ return numContainers;
+ }
+
+ private Container createContainer(long rmIdentifier,
+ AllocationParams appParams, ContainerIdGenerator idCounter,
+ ApplicationAttemptId id, String userName,
+ Map> allocations, String location,
+ ResourceRequest anyAsk, RemoteNode rNode) throws YarnException {
+ Container container = buildContainer(rmIdentifier, appParams,
+ idCounter, anyAsk, id, userName, rNode);
+ List allocList = allocations.get(anyAsk.getCapability());
+ if (allocList == null) {
+ allocList = new ArrayList<>();
+ allocations.put(anyAsk.getCapability(), allocList);
+ }
+ allocList.add(new Allocation(container, location));
+ return container;
}
private Container buildContainer(long rmIdentifier,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index 1b1c5b9..3aed2be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.scheduler;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
@@ -35,8 +34,10 @@
import java.util.Set;
import java.util.TreeMap;
+import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.Allocation;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams;
import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator;
+import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.EnrichedResourceRequest;
/**
* This encapsulates application specific information used by the
@@ -61,7 +62,8 @@
// Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
- private final TreeMap>
+ private final TreeMap
+ >
outstandingOpReqs = new TreeMap<>();
public AllocationParams getAppParams() {
@@ -107,7 +109,7 @@ public void updateAllocationParams(Resource minResource, Resource maxResource,
return blacklist;
}
- public TreeMap>
+ public TreeMap>
getOutstandingOpReqs() {
return outstandingOpReqs;
}
@@ -125,36 +127,32 @@ public void addToOutstandingReqs(List resourceAsks) {
for (ResourceRequest request : resourceAsks) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
- // TODO: Extend for Node/Rack locality. We only handle ANY requests now
- if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
- continue;
- }
-
- if (request.getNumContainers() == 0) {
- continue;
- }
-
- Map reqMap =
+ Map reqMap =
outstandingOpReqs.get(schedulerKey);
if (reqMap == null) {
reqMap = new HashMap<>();
outstandingOpReqs.put(schedulerKey, reqMap);
}
- ResourceRequest resourceRequest = reqMap.get(request.getCapability());
- if (resourceRequest == null) {
- resourceRequest = request;
- reqMap.put(request.getCapability(), request);
+ EnrichedResourceRequest eReq = reqMap.get(request.getCapability());
+ if (eReq == null) {
+ eReq = new EnrichedResourceRequest(request);
+ reqMap.put(request.getCapability(), eReq);
+ }
+ // Set numContainers only for ANY request
+ if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+ eReq.getRequest().setResourceName(ResourceRequest.ANY);
+ eReq.getRequest().setNumContainers(request.getNumContainers());
} else {
- resourceRequest.setNumContainers(
- resourceRequest.getNumContainers() + request.getNumContainers());
+ eReq.addLocation(request.getResourceName());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at "
+ "priority = " + schedulerKey.getPriority()
+ ", allocationReqId = " + schedulerKey.getAllocationRequestId()
+ ", with capability = " + request.getCapability() + " ) : "
- + resourceRequest.getNumContainers());
+ + ", with location = " + request.getResourceName() + " ) : "
+ + ", numContainers = " + eReq.getRequest().getNumContainers());
}
}
}
@@ -163,25 +161,30 @@ public void addToOutstandingReqs(List resourceAsks) {
* This method matches a returned list of Container Allocations to any
* outstanding OPPORTUNISTIC ResourceRequest.
* @param capability Capability
- * @param allocatedContainers Allocated Containers
+ * @param allocations Allocations.
*/
public void matchAllocationToOutstandingRequest(Resource capability,
- List allocatedContainers) {
- for (Container c : allocatedContainers) {
+ List allocations) {
+ for (OpportunisticContainerAllocator.Allocation allocation : allocations) {
SchedulerRequestKey schedulerKey =
- SchedulerRequestKey.extractFrom(c);
- Map asks =
+ SchedulerRequestKey.extractFrom(allocation.getContainer());
+ Map asks =
outstandingOpReqs.get(schedulerKey);
if (asks == null) {
continue;
}
- ResourceRequest rr = asks.get(capability);
- if (rr != null) {
- rr.setNumContainers(rr.getNumContainers() - 1);
- if (rr.getNumContainers() == 0) {
+ EnrichedResourceRequest err = asks.get(capability);
+ if (err != null) {
+ int numContainers = err.getRequest().getNumContainers();
+ numContainers--;
+ err.getRequest().setNumContainers(numContainers);
+ if (numContainers == 0) {
asks.remove(capability);
+ if (asks.size() == 0) {
+ outstandingOpReqs.remove(schedulerKey);
+ }
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index e889cde..8e59f14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -30,6 +30,7 @@ import "yarn_service_protos.proto";
message RemoteNodeProto {
optional NodeIdProto node_id = 1;
optional string http_address = 2;
+ optional string rack_name = 3;
}
message RegisterDistributedSchedulingAMResponseProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
new file mode 100644
index 0000000..16a5299
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class TestOpportunisticContainerAllocator {
+
+ private static final int GB = 1024;
+ private OpportunisticContainerAllocator allocator = null;
+ private OpportunisticContainerContext oppCntxt = null;
+
+ @Before
+ public void setup() {
+ SecurityUtil.setTokenServiceUseIp(false);
+ final MasterKey mKey = new MasterKey() {
+ @Override
+ public int getKeyId() {
+ return 1;
+ }
+ @Override
+ public void setKeyId(int keyId) {}
+ @Override
+ public ByteBuffer getBytes() {
+ return ByteBuffer.allocate(8);
+ }
+ @Override
+ public void setBytes(ByteBuffer bytes) {}
+ };
+ BaseContainerTokenSecretManager secMan =
+ new BaseContainerTokenSecretManager(new Configuration()) {
+ @Override
+ public MasterKey getCurrentKey() {
+ return mKey;
+ }
+
+ @Override
+ public byte[] createPassword(ContainerTokenIdentifier identifier) {
+ return new byte[]{1, 2};
+ }
+ };
+ allocator = new OpportunisticContainerAllocator(secMan);
+ oppCntxt = new OpportunisticContainerContext();
+ oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1));
+ oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1));
+ oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10));
+ }
+
+ @Test
+ public void testSimpleAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(1 * GB), 1, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "h1:1234", "/r1")));
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ Assert.assertEquals(1, containers.size());
+ Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
+ }
+
+ @Test
+ public void testBlacklistRejection() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ Arrays.asList("h1", "h2"), new ArrayList<>());
+ List reqs =
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(1 * GB), 1, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r2")));
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ Assert.assertEquals(0, containers.size());
+ Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
+ }
+
+ @Test
+ public void testRoundRobinSimpleAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(1 * GB), 3, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ System.out.println(containers);
+ Set allocatedHosts = new HashSet<>();
+ for (Container c : containers) {
+ allocatedHosts.add(c.getNodeHttpAddress());
+ }
+ Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+ Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+ Assert.assertTrue(allocatedHosts.contains("h3:1234"));
+ Assert.assertEquals(3, containers.size());
+ }
+
+ @Test
+ public void testNodeLocalAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)),
+ ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)),
+ ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ System.out.println(containers);
+ Set allocatedHosts = new HashSet<>();
+ for (Container c : containers) {
+ allocatedHosts.add(c.getNodeHttpAddress());
+ }
+ Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h2:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+ Assert.assertEquals(2, containers.size());
+ }
+
+ @Test
+ public void testSimpleRackLocalAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)),
+ ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)),
+ ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ Set allocatedHosts = new HashSet<>();
+ for (Container c : containers) {
+ allocatedHosts.add(c.getNodeHttpAddress());
+ }
+ Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+ Assert.assertEquals(2, containers.size());
+ }
+
+ @Test
+ public void testRoundRobinRackLocalAllocation() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)),
+ ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)),
+ ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ Set allocatedHosts = new HashSet<>();
+ for (Container c : containers) {
+ allocatedHosts.add(c.getNodeHttpAddress());
+ }
+ Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+ Assert.assertTrue(allocatedHosts.contains("h5:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+ Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+ Assert.assertEquals(2, containers.size());
+ }
+
+ @Test
+ public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)),
+ ResourceRequest.newInstance(Priority.newInstance(1), "h6",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)),
+ ResourceRequest.newInstance(Priority.newInstance(1), "/r3",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+ List containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ System.out.println(containers);
+ Assert.assertEquals(2, containers.size());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index c3ed7d5..ce425df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -433,8 +433,12 @@ public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
private RemoteNode convertToRemoteNode(NodeId nodeId) {
SchedulerNode node =
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
- return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress())
- : null;
+ if (node != null) {
+ RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
+ rNode.setRackName(node.getRackName());
+ return rNode;
+ }
+ return null;
}
private static ApplicationAttemptId getAppAttemptId() throws YarnException {