diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 21fa15f..beb3380 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -240,6 +240,22 @@ public ResourceRequestBuilder executionTypeRequest(
}
/**
+ * Set the executionTypeRequest of the request with 'ensure
+ * execution type' flag set to true.
+ * @see ResourceRequest#setExecutionTypeRequest(
+ * ExecutionTypeRequest)
+ * @param executionType executionType of the request.
+ * @return {@link ResourceRequestBuilder}
+ */
+ @Public
+ @Evolving
+ public ResourceRequestBuilder executionType(ExecutionType executionType) {
+ resourceRequest.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(executionType, true));
+ return this;
+ }
+
+ /**
* Set the allocationRequestId of the request.
* @see ResourceRequest#setAllocationRequestId(long)
* @param allocationRequestId
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
deleted file mode 100644
index 00f5e03..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ /dev/null
@@ -1,649 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ProfileCapability;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Validates End2End Distributed Scheduling flow which includes the AM
- * specifying OPPORTUNISTIC containers in its resource requests,
- * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
- * on the NM and the DistributedSchedulingProtocol used by the framework to talk
- * to the OpportunisticContainerAllocatorAMService running on the RM.
- */
-public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
-
- private static final Log LOG =
- LogFactory.getLog(TestDistributedScheduling.class);
-
- protected MiniYARNCluster cluster;
- protected YarnClient rmClient;
- protected ApplicationMasterProtocol client;
- protected Configuration conf;
- protected Configuration yarnConf;
- protected ApplicationAttemptId attemptId;
- protected ApplicationId appId;
-
- @Before
- public void doBefore() throws Exception {
- cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
-
- conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.
- OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
- conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
- conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
- 10);
- cluster.init(conf);
- cluster.start();
- yarnConf = cluster.getConfig();
-
- // the client has to connect to AMRMProxy
- yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
- rmClient = YarnClient.createYarnClient();
- rmClient.init(yarnConf);
- rmClient.start();
-
- // Submit application
- attemptId = createApp(rmClient, cluster, conf);
- appId = attemptId.getApplicationId();
- client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
- }
-
- @After
- public void doAfter() throws Exception {
- if (client != null) {
- try {
- client.finishApplicationMaster(FinishApplicationMasterRequest
- .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
- rmClient.killApplication(attemptId.getApplicationId());
- attemptId = null;
- } catch (Exception e) {
- }
- }
- if (rmClient != null) {
- try {
- rmClient.stop();
- } catch (Exception e) {
- }
- }
- if (cluster != null) {
- try {
- cluster.stop();
- } catch (Exception e) {
- }
- }
- }
-
-
- /**
- * Validates if Allocate Requests containing only OPPORTUNISTIC container
- * requests are satisfied instantly.
- *
- * @throws Exception
- */
- @Test(timeout = 60000)
- public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
- LOG.info("testDistributedSchedulingE2E - Register");
-
- RegisterApplicationMasterResponse responseRegister =
- client.registerApplicationMaster(RegisterApplicationMasterRequest
- .newInstance(NetUtils.getHostname(), 1024, ""));
-
- Assert.assertNotNull(responseRegister);
- Assert.assertNotNull(responseRegister.getQueue());
- Assert.assertNotNull(responseRegister.getApplicationACLs());
- Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
- Assert
- .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
- Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
- Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
- // Wait until the RM has been updated and verify
- Map rmApps =
- cluster.getResourceManager().getRMContext().getRMApps();
- boolean rmUpdated = false;
- for (int i=0; i<10 && !rmUpdated; i++) {
- sleep(100);
- RMApp rmApp = rmApps.get(appId);
- if (rmApp.getState() == RMAppState.RUNNING) {
- rmUpdated = true;
- }
- }
- RMApp rmApp = rmApps.get(appId);
- Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
- LOG.info("testDistributedSchedulingE2E - Allocate");
-
- AllocateRequest request =
- createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-
- // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
- // everything else
- List newAskList = new ArrayList<>();
- for (ResourceRequest rr : request.getAskList()) {
- if (ResourceRequest.ANY.equals(rr.getResourceName())) {
- ResourceRequest newRR = ResourceRequest.newInstance(rr
- .getPriority(), rr.getResourceName(),
- rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(),
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true));
- newAskList.add(newRR);
- }
- }
- request.setAskList(newAskList);
-
- AllocateResponse allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
-
- // Ensure that all the requests are satisfied immediately
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
- // Verify that the allocated containers are OPPORTUNISTIC
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- containerTokenIdentifier.getExecutionType());
- }
-
- // Check that the RM sees OPPORTUNISTIC containers
- ResourceScheduler scheduler = cluster.getResourceManager()
- .getResourceScheduler();
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerId containerId = allocatedContainer.getId();
- RMContainer rmContainer = scheduler.getRMContainer(containerId);
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- rmContainer.getExecutionType());
- }
-
- LOG.info("testDistributedSchedulingE2E - Finish");
- }
-
- /**
- * Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC
- * container requests works as expected.
- *
- * @throws Exception
- */
- @Test(timeout = 60000)
- public void testMixedExecutionTypeRequestE2E() throws Exception {
- LOG.info("testDistributedSchedulingE2E - Register");
-
- RegisterApplicationMasterResponse responseRegister =
- client.registerApplicationMaster(RegisterApplicationMasterRequest
- .newInstance(NetUtils.getHostname(), 1024, ""));
-
- Assert.assertNotNull(responseRegister);
- Assert.assertNotNull(responseRegister.getQueue());
- Assert.assertNotNull(responseRegister.getApplicationACLs());
- Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
- Assert
- .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
- Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
- Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
- RMApp rmApp =
- cluster.getResourceManager().getRMContext().getRMApps().get(appId);
- Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
- LOG.info("testDistributedSchedulingE2E - Allocate");
-
- AllocateRequest request =
- createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
- List askList = request.getAskList();
- List newAskList = new ArrayList<>(askList);
-
- // Duplicate all ANY requests marking them as opportunistic
- for (ResourceRequest rr : askList) {
- if (ResourceRequest.ANY.equals(rr.getResourceName())) {
- ResourceRequest newRR = ResourceRequest.newInstance(rr
- .getPriority(), rr.getResourceName(),
- rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(),
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true));
- newAskList.add(newRR);
- }
- }
- request.setAskList(newAskList);
-
- AllocateResponse allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
-
- // Ensure that all the requests are satisfied immediately
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
- // Verify that the allocated containers are OPPORTUNISTIC
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- containerTokenIdentifier.getExecutionType());
- }
-
- request.setAskList(new ArrayList());
- request.setResponseId(request.getResponseId() + 1);
-
- Thread.sleep(1000);
-
- // RM should allocate GUARANTEED containers within 2 calls to allocate()
- allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
- // Verify that the allocated containers are GUARANTEED
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.GUARANTEED,
- containerTokenIdentifier.getExecutionType());
- }
-
- LOG.info("testDistributedSchedulingE2E - Finish");
- }
-
- /**
- * Validates if AMRMClient can be used with Distributed Scheduling turned on.
- *
- * @throws Exception
- */
- @Test(timeout = 120000)
- @SuppressWarnings("unchecked")
- public void testAMRMClient() throws Exception {
- AMRMClientImpl amClient = null;
- try {
- Priority priority = Priority.newInstance(1);
- Priority priority2 = Priority.newInstance(2);
- Resource capability = Resource.newInstance(1024, 1);
-
- List nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
- String node = nodeReports.get(0).getNodeId().getHost();
- String rack = nodeReports.get(0).getRackName();
- String[] nodes = new String[]{node};
- String[] racks = new String[]{rack};
-
- // start am rm client
- amClient = new AMRMClientImpl(client);
- amClient.init(yarnConf);
- amClient.start();
- amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
-
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2,
- 0, true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2,
- 0, true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2,
- 0, true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- RemoteRequestsTable remoteRequestsTable =
- amClient.getTable(0);
- ProfileCapability profileCapability =
- ProfileCapability.newInstance(capability);
-
- int containersRequestedNode = remoteRequestsTable.get(priority,
- node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- int containersRequestedRack = remoteRequestsTable.get(priority,
- rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- int containersRequestedAny = remoteRequestsTable.get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
- .remoteRequest.getNumContainers();
- int oppContainersRequestedAny =
- remoteRequestsTable.get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
- .getNumContainers();
-
- assertEquals(2, containersRequestedNode);
- assertEquals(2, containersRequestedRack);
- assertEquals(2, containersRequestedAny);
- assertEquals(1, oppContainersRequestedAny);
-
- assertEquals(4, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- int iterationsLeft = 10;
- Set releases = new TreeSet<>();
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- while (allocatedContainerCount <
- (containersRequestedAny + oppContainersRequestedAny)
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- allocatedContainerCount += allocResponse.getAllocatedContainers()
- .size();
- for (Container container : allocResponse.getAllocatedContainers()) {
- ContainerId rejectContainerId = container.getId();
- releases.add(rejectContainerId);
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < containersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(allocatedContainerCount,
- containersRequestedAny + oppContainersRequestedAny);
- for (ContainerId rejectContainerId : releases) {
- amClient.releaseAssignedContainer(rejectContainerId);
- }
- assertEquals(3, amClient.release.size());
- assertEquals(0, amClient.ask.size());
-
- // need to tell the AMRMClient that we dont need these resources anymore
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
- 0, true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- assertEquals(4, amClient.ask.size());
-
- // test RPC exception handling
- amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
- nodes, racks, priority));
- amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
- nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
- 0, true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- final AMRMClient amc = amClient;
- ApplicationMasterProtocol realRM = amClient.rmClient;
- try {
- ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
- .class);
- when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
- new Answer() {
- public AllocateResponse answer(InvocationOnMock invocation)
- throws Exception {
- amc.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes,
- racks, priority));
- amc.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks,
- priority));
- amc.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null,
- priority2, 0, true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- throw new Exception();
- }
- });
- amClient.rmClient = mockRM;
- amClient.allocate(0.1f);
- } catch (Exception ioe) {
- } finally {
- amClient.rmClient = realRM;
- }
-
- assertEquals(3, amClient.release.size());
- assertEquals(6, amClient.ask.size());
-
- iterationsLeft = 3;
- // do a few iterations to ensure RM is not going send new containers
- while (iterationsLeft-- > 0) {
- // inform RM of rejection
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- // RM did not send new containers because AM does not need any
- assertEquals(0, allocResponse.getAllocatedContainers().size());
- if (allocResponse.getCompletedContainersStatuses().size() > 0) {
- for (ContainerStatus cStatus : allocResponse
- .getCompletedContainersStatuses()) {
- if (releases.contains(cStatus.getContainerId())) {
- assertEquals(cStatus.getState(), ContainerState.COMPLETE);
- assertEquals(-100, cStatus.getExitStatus());
- releases.remove(cStatus.getContainerId());
- }
- }
- }
- if (iterationsLeft > 0) {
- // sleep to make sure NM's heartbeat
- sleep(100);
- }
- }
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
- null, null);
-
- } finally {
- if (amClient != null && amClient.getServiceState() == Service.STATE
- .STARTED) {
- amClient.stop();
- }
- }
- }
-
- /**
- * Check if an AM can ask for opportunistic containers and get them.
- * @throws Exception
- */
- @Test
- public void testAMOpportunistic() throws Exception {
- // Basic container to request
- Resource capability = Resource.newInstance(1024, 1);
- Priority priority = Priority.newInstance(1);
-
- // Get the cluster topology
- List nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
- String node = nodeReports.get(0).getNodeId().getHost();
- String rack = nodeReports.get(0).getRackName();
- String[] nodes = new String[]{node};
- String[] racks = new String[]{rack};
-
- // Create an AM to request resources
- AMRMClient amClient = null;
- try {
- amClient = new AMRMClientImpl(client);
- amClient.init(yarnConf);
- amClient.start();
- amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
-
- // AM requests an opportunistic container
- ExecutionTypeRequest execTypeRequest =
- ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
- ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
- capability, nodes, racks, priority, 0, true, null, execTypeRequest);
- amClient.addContainerRequest(containerRequest);
-
- // Wait until the container is allocated
- ContainerId opportunisticContainerId = null;
- for (int i=0; i<10 && opportunisticContainerId == null; i++) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- List allocatedContainers =
- allocResponse.getAllocatedContainers();
- for (Container allocatedContainer : allocatedContainers) {
- // Check that this is the container we required
- assertEquals(ExecutionType.OPPORTUNISTIC,
- allocatedContainer.getExecutionType());
- opportunisticContainerId = allocatedContainer.getId();
- }
- sleep(100);
- }
- assertNotNull(opportunisticContainerId);
-
- // The RM sees the container as OPPORTUNISTIC
- ResourceScheduler scheduler = cluster.getResourceManager()
- .getResourceScheduler();
- RMContainer rmContainer = scheduler.getRMContainer(
- opportunisticContainerId);
- assertEquals(ExecutionType.OPPORTUNISTIC,
- rmContainer.getExecutionType());
-
- // Release the opportunistic container
- amClient.releaseAssignedContainer(opportunisticContainerId);
- // Wait for the release container to appear
- boolean released = false;
- for (int i=0; i<10 && !released; i++) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- List completedContainers =
- allocResponse.getCompletedContainersStatuses();
- for (ContainerStatus completedContainer : completedContainers) {
- ContainerId completedContainerId =
- completedContainer.getContainerId();
- assertEquals(completedContainerId, opportunisticContainerId);
- released = true;
- }
- if (!released) {
- sleep(100);
- }
- }
- assertTrue(released);
-
- // The RM shouldn't see the container anymore
- rmContainer = scheduler.getRMContainer(opportunisticContainerId);
- assertNull(rmContainer);
-
- // Clean the AM
- amClient.unregisterApplicationMaster(
- FinalApplicationStatus.SUCCEEDED, null, null);
- } finally {
- if (amClient != null &&
- amClient.getServiceState() == Service.STATE.STARTED) {
- amClient.close();
- }
- }
- }
-
- private void sleep(int sleepTime) {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
deleted file mode 100644
index 12c32fc..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ProfileCapability;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.api.records.UpdatedContainer;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Class that tests the allocation of OPPORTUNISTIC containers through the
- * centralized ResourceManager.
- */
-public class TestOpportunisticContainerAllocation {
- private static Configuration conf = null;
- private static MiniYARNCluster yarnCluster = null;
- private static YarnClient yarnClient = null;
- private static List nodeReports = null;
- private static int nodeCount = 3;
-
- private static final int ROLLING_INTERVAL_SEC = 13;
- private static final long AM_EXPIRE_MS = 4000;
-
- private static Resource capability;
- private static ProfileCapability profileCapability;
- private static Priority priority;
- private static Priority priority2;
- private static Priority priority3;
- private static Priority priority4;
- private static String node;
- private static String rack;
- private static String[] nodes;
- private static String[] racks;
- private final static int DEFAULT_ITERATION = 3;
-
- // Per test..
- private ApplicationAttemptId attemptId = null;
- private AMRMClientImpl amClient = null;
- private long availMB;
- private int availVCores;
- private long allocMB;
- private int allocVCores;
-
- @BeforeClass
- public static void setup() throws Exception {
- // start minicluster
- conf = new YarnConfiguration();
- conf.setLong(
- YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
- ROLLING_INTERVAL_SEC);
- conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
- conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
- // set the minimum allocation so that resource decrease can go under 1024
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
- conf.setBoolean(
- YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
- conf.setInt(
- YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
- conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
- yarnCluster =
- new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
- yarnCluster.init(conf);
- yarnCluster.start();
-
- // start rm client
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
-
- // get node info
- nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
-
- priority = Priority.newInstance(1);
- priority2 = Priority.newInstance(2);
- priority3 = Priority.newInstance(3);
- priority4 = Priority.newInstance(4);
- capability = Resource.newInstance(512, 1);
- profileCapability = ProfileCapability.newInstance(capability);
-
- node = nodeReports.get(0).getNodeId().getHost();
- rack = nodeReports.get(0).getRackName();
- nodes = new String[]{node};
- racks = new String[]{rack};
- }
-
- @Before
- public void startApp() throws Exception {
- // submit new app
- ApplicationSubmissionContext appContext =
- yarnClient.createApplication().getApplicationSubmissionContext();
- ApplicationId appId = appContext.getApplicationId();
- // set the application name
- appContext.setApplicationName("Test");
- // Set the priority for the application master
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(0);
- appContext.setPriority(pri);
- // Set the queue to which this application is to be submitted in the RM
- appContext.setQueue("default");
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
- Collections.emptyMap(),
- new HashMap(), Arrays.asList("sleep", "100"),
- new HashMap(), null,
- new HashMap());
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(Resource.newInstance(1024, 1));
- // Create the request to send to the applications manager
- SubmitApplicationRequest appRequest =
- Records.newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
- // Submit the application to the applications manager
- yarnClient.submitApplication(appContext);
-
- // wait for app to start
- RMAppAttempt appAttempt = null;
- while (true) {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- if (appReport.getYarnApplicationState() ==
- YarnApplicationState.ACCEPTED) {
- attemptId = appReport.getCurrentApplicationAttemptId();
- appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
- .get(attemptId.getApplicationId()).getCurrentAppAttempt();
- while (true) {
- if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
- break;
- }
- }
- break;
- }
- }
- // Just dig into the ResourceManager and get the AMRMToken just for the sake
- // of testing.
- UserGroupInformation.setLoginUser(UserGroupInformation
- .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-
- // emulate RM setup of AMRM token in credentials by adding the token
- // *before* setting the token service
- UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
- appAttempt.getAMRMToken()
- .setService(ClientRMProxy.getAMRMTokenService(conf));
-
- // start am rm client
- amClient = (AMRMClientImpl)AMRMClient
- .createAMRMClient();
-
- //setting an instance NMTokenCache
- amClient.setNMTokenCache(new NMTokenCache());
- //asserting we are not using the singleton instance cache
- Assert.assertNotSame(NMTokenCache.getSingleton(),
- amClient.getNMTokenCache());
-
- amClient.init(conf);
- amClient.start();
-
- amClient.registerApplicationMaster("Host", 10000, "");
- }
-
- @After
- public void cancelApp() throws YarnException, IOException {
- try {
- amClient
- .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
- null);
- } finally {
- if (amClient != null &&
- amClient.getServiceState() == Service.STATE.STARTED) {
- amClient.stop();
- }
- }
- yarnClient.killApplication(attemptId.getApplicationId());
- attemptId = null;
- }
-
- @AfterClass
- public static void tearDown() {
- if (yarnClient != null &&
- yarnClient.getServiceState() == Service.STATE.STARTED) {
- yarnClient.stop();
- }
- if (yarnCluster != null &&
- yarnCluster.getServiceState() == Service.STATE.STARTED) {
- yarnCluster.stop();
- }
- }
-
- @Test(timeout = 60000)
- public void testPromotionFromAcquired() throws YarnException, IOException {
- // setup container request
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- int oppContainersRequestedAny =
- amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
- .getNumContainers();
-
- assertEquals(1, oppContainersRequestedAny);
-
- assertEquals(1, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- Map allocatedOpportContainers = new HashMap<>();
- int iterationsLeft = 50;
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- updateMetrics("Before Opp Allocation");
-
- while (allocatedContainerCount < oppContainersRequestedAny
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- allocatedContainerCount +=
- allocResponse.getAllocatedContainers().size();
- for (Container container : allocResponse.getAllocatedContainers()) {
- if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
- allocatedOpportContainers.put(container.getId(), container);
- removeCR(container);
- }
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < oppContainersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(oppContainersRequestedAny, allocatedContainerCount);
- assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
-
- updateMetrics("After Opp Allocation / Before Promotion");
-
- try {
- Container c = allocatedOpportContainers.values().iterator().next();
- amClient.requestContainerUpdate(
- c, UpdateContainerRequest.newInstance(c.getVersion(),
- c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
- null, ExecutionType.OPPORTUNISTIC));
- Assert.fail("Should throw Exception..");
- } catch (IllegalArgumentException e) {
- System.out.println("## " + e.getMessage());
- Assert.assertTrue(e.getMessage().contains(
- "target should be GUARANTEED and original should be OPPORTUNISTIC"));
- }
-
- Container c = allocatedOpportContainers.values().iterator().next();
- amClient.requestContainerUpdate(
- c, UpdateContainerRequest.newInstance(c.getVersion(),
- c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
- null, ExecutionType.GUARANTEED));
- iterationsLeft = 120;
- Map updatedContainers = new HashMap<>();
- // do a few iterations to ensure RM is not going to send new containers
- while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
- // inform RM of rejection
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- // RM did not send new containers because AM does not need any
- if (allocResponse.getUpdatedContainers() != null) {
- for (UpdatedContainer updatedContainer : allocResponse
- .getUpdatedContainers()) {
- System.out.println("Got update..");
- updatedContainers.put(updatedContainer.getContainer().getId(),
- updatedContainer);
- }
- }
- if (iterationsLeft > 0) {
- // sleep to make sure NM's heartbeat
- sleep(100);
- }
- }
-
- updateMetrics("After Promotion");
-
- assertEquals(1, updatedContainers.size());
- for (ContainerId cId : allocatedOpportContainers.keySet()) {
- Container orig = allocatedOpportContainers.get(cId);
- UpdatedContainer updatedContainer = updatedContainers.get(cId);
- assertNotNull(updatedContainer);
- assertEquals(ExecutionType.GUARANTEED,
- updatedContainer.getContainer().getExecutionType());
- assertEquals(orig.getResource(),
- updatedContainer.getContainer().getResource());
- assertEquals(orig.getNodeId(),
- updatedContainer.getContainer().getNodeId());
- assertEquals(orig.getVersion() + 1,
- updatedContainer.getContainer().getVersion());
- }
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
- amClient.ask.clear();
- }
-
- @Test(timeout = 60000)
- public void testDemotionFromAcquired() throws YarnException, IOException {
- // setup container request
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority3));
-
- int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
- .remoteRequest.getNumContainers();
-
- assertEquals(1, guarContainersRequestedAny);
-
- assertEquals(1, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- Map allocatedGuarContainers = new HashMap<>();
- int iterationsLeft = 50;
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- updateMetrics("Before Guar Allocation");
-
- while (allocatedContainerCount < guarContainersRequestedAny
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- allocatedContainerCount +=
- allocResponse.getAllocatedContainers().size();
- for (Container container : allocResponse.getAllocatedContainers()) {
- if (container.getExecutionType() == ExecutionType.GUARANTEED) {
- allocatedGuarContainers.put(container.getId(), container);
- removeCR(container);
- }
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < guarContainersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(guarContainersRequestedAny, allocatedContainerCount);
- assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
-
- updateMetrics("After Guar Allocation / Before Demotion");
-
- try {
- Container c = allocatedGuarContainers.values().iterator().next();
- amClient.requestContainerUpdate(
- c, UpdateContainerRequest.newInstance(c.getVersion(),
- c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
- null, ExecutionType.GUARANTEED));
- Assert.fail("Should throw Exception..");
- } catch (IllegalArgumentException e) {
- System.out.println("## " + e.getMessage());
- Assert.assertTrue(e.getMessage().contains(
- "target should be OPPORTUNISTIC and original should be GUARANTEED"));
- }
-
- Container c = allocatedGuarContainers.values().iterator().next();
- amClient.requestContainerUpdate(
- c, UpdateContainerRequest.newInstance(c.getVersion(),
- c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
- null, ExecutionType.OPPORTUNISTIC));
- iterationsLeft = 120;
- Map updatedContainers = new HashMap<>();
- // do a few iterations to ensure RM is not going to send new containers
- while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
- // inform RM of rejection
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- // RM did not send new containers because AM does not need any
- if (allocResponse.getUpdatedContainers() != null) {
- for (UpdatedContainer updatedContainer : allocResponse
- .getUpdatedContainers()) {
- System.out.println("Got update..");
- updatedContainers.put(updatedContainer.getContainer().getId(),
- updatedContainer);
- }
- }
- if (iterationsLeft > 0) {
- // sleep to make sure NM's heartbeat
- sleep(100);
- }
- }
-
- updateMetrics("After Demotion");
-
- assertEquals(1, updatedContainers.size());
- for (ContainerId cId : allocatedGuarContainers.keySet()) {
- Container orig = allocatedGuarContainers.get(cId);
- UpdatedContainer updatedContainer = updatedContainers.get(cId);
- assertNotNull(updatedContainer);
- assertEquals(ExecutionType.OPPORTUNISTIC,
- updatedContainer.getContainer().getExecutionType());
- assertEquals(orig.getResource(),
- updatedContainer.getContainer().getResource());
- assertEquals(orig.getNodeId(),
- updatedContainer.getContainer().getNodeId());
- assertEquals(orig.getVersion() + 1,
- updatedContainer.getContainer().getVersion());
- }
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
- amClient.ask.clear();
- }
-
- @Test(timeout = 60000)
- public void testMixedAllocationAndRelease() throws YarnException,
- IOException {
- // setup container request
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- int containersRequestedNode = amClient.getTable(0).get(priority,
- node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- int containersRequestedRack = amClient.getTable(0).get(priority,
- rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- int containersRequestedAny = amClient.getTable(0).get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
- .remoteRequest.getNumContainers();
- int oppContainersRequestedAny =
- amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
- .getNumContainers();
-
- assertEquals(4, containersRequestedNode);
- assertEquals(4, containersRequestedRack);
- assertEquals(4, containersRequestedAny);
- assertEquals(2, oppContainersRequestedAny);
-
- assertEquals(4, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- containersRequestedNode = amClient.getTable(0).get(priority,
- node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- containersRequestedRack = amClient.getTable(0).get(priority,
- rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
- .getNumContainers();
- containersRequestedAny = amClient.getTable(0).get(priority,
- ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
- .remoteRequest.getNumContainers();
- oppContainersRequestedAny =
- amClient.getTable(0).get(priority2, ResourceRequest.ANY,
- ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
- .getNumContainers();
-
- assertEquals(2, containersRequestedNode);
- assertEquals(2, containersRequestedRack);
- assertEquals(2, containersRequestedAny);
- assertEquals(1, oppContainersRequestedAny);
-
- assertEquals(4, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- int allocatedOpportContainerCount = 0;
- int iterationsLeft = 50;
- Set releases = new TreeSet<>();
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- while (allocatedContainerCount <
- containersRequestedAny + oppContainersRequestedAny
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- allocatedContainerCount +=
- allocResponse.getAllocatedContainers().size();
- for (Container container : allocResponse.getAllocatedContainers()) {
- if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
- allocatedOpportContainerCount++;
- }
- ContainerId rejectContainerId = container.getId();
- releases.add(rejectContainerId);
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < containersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(containersRequestedAny + oppContainersRequestedAny,
- allocatedContainerCount);
- assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
- for (ContainerId rejectContainerId : releases) {
- amClient.releaseAssignedContainer(rejectContainerId);
- }
- assertEquals(3, amClient.release.size());
- assertEquals(0, amClient.ask.size());
-
- // need to tell the AMRMClient that we don't need these resources anymore
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
- amClient.removeContainerRequest(
- new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- assertEquals(4, amClient.ask.size());
-
- iterationsLeft = 3;
- // do a few iterations to ensure RM is not going to send new containers
- while (iterationsLeft-- > 0) {
- // inform RM of rejection
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- // RM did not send new containers because AM does not need any
- assertEquals(0, allocResponse.getAllocatedContainers().size());
- if (allocResponse.getCompletedContainersStatuses().size() > 0) {
- for (ContainerStatus cStatus : allocResponse
- .getCompletedContainersStatuses()) {
- if (releases.contains(cStatus.getContainerId())) {
- assertEquals(cStatus.getState(), ContainerState.COMPLETE);
- assertEquals(-100, cStatus.getExitStatus());
- releases.remove(cStatus.getContainerId());
- }
- }
- }
- if (iterationsLeft > 0) {
- // sleep to make sure NM's heartbeat
- sleep(100);
- }
- }
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
- }
-
- /**
- * Tests allocation with requests comprising only opportunistic containers.
- */
- @Test(timeout = 60000)
- public void testOpportunisticAllocation() throws YarnException, IOException {
- // setup container request
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
- amClient.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
- true, null,
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true)));
-
- int oppContainersRequestedAny = amClient.getTable(0)
- .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
- profileCapability).remoteRequest.getNumContainers();
-
- assertEquals(2, oppContainersRequestedAny);
-
- assertEquals(1, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- // RM should allocate container within 2 calls to allocate()
- int allocatedContainerCount = 0;
- int iterationsLeft = 10;
- Set releases = new TreeSet<>();
-
- amClient.getNMTokenCache().clearCache();
- Assert.assertEquals(0,
- amClient.getNMTokenCache().numberOfTokensInCache());
- HashMap receivedNMTokens = new HashMap<>();
-
- while (allocatedContainerCount < oppContainersRequestedAny
- && iterationsLeft-- > 0) {
- AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertEquals(0, amClient.ask.size());
- assertEquals(0, amClient.release.size());
-
- for (Container container : allocResponse.getAllocatedContainers()) {
- allocatedContainerCount++;
- ContainerId rejectContainerId = container.getId();
- releases.add(rejectContainerId);
- }
-
- for (NMToken token : allocResponse.getNMTokens()) {
- String nodeID = token.getNodeId().toString();
- receivedNMTokens.put(nodeID, token.getToken());
- }
-
- if (allocatedContainerCount < oppContainersRequestedAny) {
- // sleep to let NM's heartbeat to RM and trigger allocations
- sleep(100);
- }
- }
-
- assertEquals(oppContainersRequestedAny, allocatedContainerCount);
- assertEquals(1, receivedNMTokens.values().size());
- }
-
- private void removeCR(Container container) {
- List extends Collection>
- matchingRequests = amClient.getMatchingRequests(container
- .getPriority(),
- ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
- container.getResource());
- Set toRemove = new HashSet<>();
- for (Collection rc : matchingRequests) {
- for (AMRMClient.ContainerRequest cr : rc) {
- toRemove.add(cr);
- }
- }
- for (AMRMClient.ContainerRequest cr : toRemove) {
- amClient.removeContainerRequest(cr);
- }
- }
-
- private void updateMetrics(String msg) {
- AbstractYarnScheduler scheduler =
- (AbstractYarnScheduler)yarnCluster.getResourceManager()
- .getResourceScheduler();
- availMB = scheduler.getRootQueueMetrics().getAvailableMB();
- availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
- allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
- allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
- System.out.println("## METRICS (" + msg + ")==>");
- System.out.println(" : availMB=" + availMB + ", " +
- "availVCores=" +availVCores + ", " +
- "allocMB=" + allocMB + ", " +
- "allocVCores=" + allocVCores + ", ");
- System.out.println("<== ##");
- }
-
- private void sleep(int sleepTime) {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
new file mode 100644
index 0000000..94cb28e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Class that tests the allocation of OPPORTUNISTIC containers through the
+ * centralized ResourceManager.
+ */
+public class TestOpportunisticContainerAllocationE2E {
+ private static Configuration conf = null;
+ private static MiniYARNCluster yarnCluster = null;
+ private static YarnClient yarnClient = null;
+ private static List nodeReports = null;
+ private static int nodeCount = 3;
+
+ private static final int ROLLING_INTERVAL_SEC = 13;
+ private static final long AM_EXPIRE_MS = 4000;
+
+ private static Resource capability;
+ private static ProfileCapability profileCapability;
+ private static Priority priority;
+ private static Priority priority2;
+ private static Priority priority3;
+ private static Priority priority4;
+ private static String node;
+ private static String rack;
+ private static String[] nodes;
+ private static String[] racks;
+ private final static int DEFAULT_ITERATION = 3;
+
+ // Per test..
+ private ApplicationAttemptId attemptId = null;
+ private AMRMClientImpl amClient = null;
+ private long availMB;
+ private int availVCores;
+ private long allocMB;
+ private int allocVCores;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // start minicluster
+ conf = new YarnConfiguration();
+ conf.setLong(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+ ROLLING_INTERVAL_SEC);
+ conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
+ conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000);
+ // set the minimum allocation so that resource decrease can go under 1024
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+ conf.setBoolean(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+ conf.setInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+ yarnCluster =
+ new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ // start rm client
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ // get node info
+ nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+
+ priority = Priority.newInstance(1);
+ priority2 = Priority.newInstance(2);
+ priority3 = Priority.newInstance(3);
+ priority4 = Priority.newInstance(4);
+ capability = Resource.newInstance(512, 1);
+ profileCapability = ProfileCapability.newInstance(capability);
+
+ node = nodeReports.get(0).getNodeId().getHost();
+ rack = nodeReports.get(0).getRackName();
+ nodes = new String[]{node};
+ racks = new String[]{rack};
+ }
+
+ @Before
+ public void startApp() throws Exception {
+ // submit new app
+ ApplicationSubmissionContext appContext =
+ yarnClient.createApplication().getApplicationSubmissionContext();
+ ApplicationId appId = appContext.getApplicationId();
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
+ Collections.emptyMap(),
+ new HashMap(), Arrays.asList("sleep", "100"),
+ new HashMap(), null,
+ new HashMap());
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(Resource.newInstance(1024, 1));
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest =
+ Records.newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ RMAppAttempt appAttempt = null;
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() ==
+ YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+ .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+ while (true) {
+ if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+ break;
+ }
+ }
+ break;
+ }
+ }
+ // Just dig into the ResourceManager and get the AMRMToken just for the sake
+ // of testing.
+ UserGroupInformation.setLoginUser(UserGroupInformation
+ .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+ // emulate RM setup of AMRM token in credentials by adding the token
+ // *before* setting the token service
+ UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+ appAttempt.getAMRMToken()
+ .setService(ClientRMProxy.getAMRMTokenService(conf));
+
+ // start am rm client
+ amClient = (AMRMClientImpl)AMRMClient
+ .createAMRMClient();
+
+ //setting an instance NMTokenCache
+ amClient.setNMTokenCache(new NMTokenCache());
+ //asserting we are not using the singleton instance cache
+ Assert.assertNotSame(NMTokenCache.getSingleton(),
+ amClient.getNMTokenCache());
+
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("Host", 10000, "");
+ }
+
+ @After
+ public void cancelApp() throws YarnException, IOException {
+ try {
+ amClient
+ .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+ null);
+ } finally {
+ if (amClient != null &&
+ amClient.getServiceState() == Service.STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ yarnClient.killApplication(attemptId.getApplicationId());
+ attemptId = null;
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (yarnClient != null &&
+ yarnClient.getServiceState() == Service.STATE.STARTED) {
+ yarnClient.stop();
+ }
+ if (yarnCluster != null &&
+ yarnCluster.getServiceState() == Service.STATE.STARTED) {
+ yarnCluster.stop();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testPromotionFromAcquired() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map allocatedOpportContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ updateMetrics("Before Opp Allocation");
+
+ while (allocatedContainerCount < oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ allocatedOpportContainers.put(container.getId(), container);
+ removeCR(container);
+ }
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < oppContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
+
+ updateMetrics("After Opp Allocation / Before Promotion");
+
+ try {
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ Assert.fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ Assert.assertTrue(e.getMessage().contains(
+ "target should be GUARANTEED and original should be OPPORTUNISTIC"));
+ }
+
+ Container c = allocatedOpportContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ iterationsLeft = 120;
+ Map updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+
+ updateMetrics("After Promotion");
+
+ assertEquals(1, updatedContainers.size());
+ for (ContainerId cId : allocatedOpportContainers.keySet()) {
+ Container orig = allocatedOpportContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.GUARANTEED,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ amClient.ask.clear();
+ }
+
+ @Test(timeout = 60000)
+ public void testDemotionFromAcquired() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority3));
+
+ int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+ .remoteRequest.getNumContainers();
+
+ assertEquals(1, guarContainersRequestedAny);
+
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ Map allocatedGuarContainers = new HashMap<>();
+ int iterationsLeft = 50;
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ updateMetrics("Before Guar Allocation");
+
+ while (allocatedContainerCount < guarContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+ allocatedGuarContainers.put(container.getId(), container);
+ removeCR(container);
+ }
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < guarContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(guarContainersRequestedAny, allocatedContainerCount);
+ assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size());
+
+ updateMetrics("After Guar Allocation / Before Demotion");
+
+ try {
+ Container c = allocatedGuarContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED));
+ Assert.fail("Should throw Exception..");
+ } catch (IllegalArgumentException e) {
+ System.out.println("## " + e.getMessage());
+ Assert.assertTrue(e.getMessage().contains(
+ "target should be OPPORTUNISTIC and original should be GUARANTEED"));
+ }
+
+ Container c = allocatedGuarContainers.values().iterator().next();
+ amClient.requestContainerUpdate(
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC));
+ iterationsLeft = 120;
+ Map updatedContainers = new HashMap<>();
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ if (allocResponse.getUpdatedContainers() != null) {
+ for (UpdatedContainer updatedContainer : allocResponse
+ .getUpdatedContainers()) {
+ System.out.println("Got update..");
+ updatedContainers.put(updatedContainer.getContainer().getId(),
+ updatedContainer);
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+
+ updateMetrics("After Demotion");
+
+ assertEquals(1, updatedContainers.size());
+ for (ContainerId cId : allocatedGuarContainers.keySet()) {
+ Container orig = allocatedGuarContainers.get(cId);
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
+ assertNotNull(updatedContainer);
+ assertEquals(ExecutionType.OPPORTUNISTIC,
+ updatedContainer.getContainer().getExecutionType());
+ assertEquals(orig.getResource(),
+ updatedContainer.getContainer().getResource());
+ assertEquals(orig.getNodeId(),
+ updatedContainer.getContainer().getNodeId());
+ assertEquals(orig.getVersion() + 1,
+ updatedContainer.getContainer().getVersion());
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ amClient.ask.clear();
+ }
+
+ @Test(timeout = 60000)
+ public void testMixedAllocationAndRelease() throws YarnException,
+ IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int containersRequestedNode = amClient.getTable(0).get(priority,
+ node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+ .getNumContainers();
+ int containersRequestedRack = amClient.getTable(0).get(priority,
+ rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+ .getNumContainers();
+ int containersRequestedAny = amClient.getTable(0).get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+ .remoteRequest.getNumContainers();
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(4, containersRequestedNode);
+ assertEquals(4, containersRequestedRack);
+ assertEquals(4, containersRequestedAny);
+ assertEquals(2, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ containersRequestedNode = amClient.getTable(0).get(priority,
+ node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+ .getNumContainers();
+ containersRequestedRack = amClient.getTable(0).get(priority,
+ rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
+ .getNumContainers();
+ containersRequestedAny = amClient.getTable(0).get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
+ .remoteRequest.getNumContainers();
+ oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(2, containersRequestedNode);
+ assertEquals(2, containersRequestedRack);
+ assertEquals(2, containersRequestedAny);
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int allocatedOpportContainerCount = 0;
+ int iterationsLeft = 50;
+ Set releases = new TreeSet<>();
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ while (allocatedContainerCount <
+ containersRequestedAny + oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount +=
+ allocResponse.getAllocatedContainers().size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ allocatedOpportContainerCount++;
+ }
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(containersRequestedAny + oppContainersRequestedAny,
+ allocatedContainerCount);
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount);
+ for (ContainerId rejectContainerId : releases) {
+ amClient.releaseAssignedContainer(rejectContainerId);
+ }
+ assertEquals(3, amClient.release.size());
+ assertEquals(0, amClient.ask.size());
+
+ // need to tell the AMRMClient that we don't need these resources anymore
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ assertEquals(4, amClient.ask.size());
+
+ iterationsLeft = 3;
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ assertEquals(0, allocResponse.getAllocatedContainers().size());
+ if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+ for (ContainerStatus cStatus : allocResponse
+ .getCompletedContainersStatuses()) {
+ if (releases.contains(cStatus.getContainerId())) {
+ assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+ assertEquals(-100, cStatus.getExitStatus());
+ releases.remove(cStatus.getContainerId());
+ }
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ }
+
+ /**
+ * Tests allocation with requests comprising only opportunistic containers.
+ */
+ @Test(timeout = 60000)
+ public void testOpportunisticAllocation() throws YarnException, IOException {
+ // setup container request
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority3, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int oppContainersRequestedAny = amClient.getTable(0)
+ .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+ profileCapability).remoteRequest.getNumContainers();
+
+ assertEquals(2, oppContainersRequestedAny);
+
+ assertEquals(1, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 10;
+ Set releases = new TreeSet<>();
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ while (allocatedContainerCount < oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ allocatedContainerCount++;
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < oppContainersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+ assertEquals(1, receivedNMTokens.values().size());
+ }
+
+ private void removeCR(Container container) {
+ List extends Collection>
+ matchingRequests = amClient.getMatchingRequests(container
+ .getPriority(),
+ ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
+ container.getResource());
+ Set toRemove = new HashSet<>();
+ for (Collection rc : matchingRequests) {
+ for (AMRMClient.ContainerRequest cr : rc) {
+ toRemove.add(cr);
+ }
+ }
+ for (AMRMClient.ContainerRequest cr : toRemove) {
+ amClient.removeContainerRequest(cr);
+ }
+ }
+
+ private void updateMetrics(String msg) {
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler)yarnCluster.getResourceManager()
+ .getResourceScheduler();
+ availMB = scheduler.getRootQueueMetrics().getAvailableMB();
+ availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores();
+ allocMB = scheduler.getRootQueueMetrics().getAllocatedMB();
+ allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
+ System.out.println("## METRICS (" + msg + ")==>");
+ System.out.println(" : availMB=" + availMB + ", " +
+ "availVCores=" +availVCores + ", " +
+ "allocMB=" + allocMB + ", " +
+ "allocVCores=" + allocVCores + ", ");
+ System.out.println("<== ##");
+ }
+
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
index e403a12..f621aa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
@@ -47,6 +47,24 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
}
/**
+ * Create new Instance.
+ * @param nodeId NodeId.
+ * @param httpAddress Http address.
+ * @param rackName Rack Name.
+ * @return RemoteNode instance.
+ */
+ @Private
+ @Unstable
+ public static RemoteNode newInstance(NodeId nodeId, String httpAddress,
+ String rackName) {
+ RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
+ remoteNode.setNodeId(nodeId);
+ remoteNode.setHttpAddress(httpAddress);
+ remoteNode.setRackName(rackName);
+ return remoteNode;
+ }
+
+ /**
* Get {@link NodeId}.
* @return NodeId.
*/
@@ -79,6 +97,22 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
public abstract void setHttpAddress(String httpAddress);
/**
+ * Get Rack Name.
+ * @return Rack Name.
+ */
+ @Private
+ @Unstable
+ public abstract String getRackName();
+
+ /**
+ * Set Rack Name.
+ * @param rackName Rack Name.
+ */
+ @Private
+ @Unstable
+ public abstract void setRackName(String rackName);
+
+ /**
* Use the underlying {@link NodeId} comparator.
* @param other RemoteNode.
* @return Comparison.
@@ -92,6 +126,7 @@ public int compareTo(RemoteNode other) {
public String toString() {
return "RemoteNode{" +
"nodeId=" + getNodeId() + ", " +
+ "rackName=" + getRackName() + ", " +
"httpAddress=" + getHttpAddress() + "}";
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
index 3e4fd4a..c2492cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
@@ -118,6 +118,25 @@ public void setHttpAddress(String httpAddress) {
}
@Override
+ public String getRackName() {
+ RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasRackName()) {
+ return null;
+ }
+ return (p.getRackName());
+ }
+
+ @Override
+ public void setRackName(String rackName) {
+ maybeInitBuilder();
+ if (rackName == null) {
+ builder.clearRackName();
+ return;
+ }
+ builder.setRackName(rackName);
+ }
+
+ @Override
public int hashCode() {
return getProto().hashCode();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 782dc02..ede4958 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -45,11 +45,14 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -61,6 +64,10 @@
*/
public class OpportunisticContainerAllocator {
+ private static final int NODE_LOCAL_LOOP = 0;
+ private static final int RACK_LOCAL_LOOP = 1;
+ private static final int OFF_SWITCH_LOOP = 2;
+
/**
* This class encapsulates application specific parameters used to build a
* Container.
@@ -70,6 +77,7 @@
private Resource minResource;
private Resource incrementResource;
private int containerTokenExpiryInterval;
+ private int maxAllocationsPerSchedulerKeyPerRound = 1;
/**
* Return Max Resource.
@@ -135,6 +143,24 @@ public void setContainerTokenExpiryInterval(
int containerTokenExpiryInterval) {
this.containerTokenExpiryInterval = containerTokenExpiryInterval;
}
+
+ /**
+ * Get the Max Allocations per Scheduler Key per allocation round.
+ * @return maxAllocationsPerSchedulerKeyPerRound.
+ */
+ public int getMaxAllocationsPerSchedulerKeyPerRound() {
+ return maxAllocationsPerSchedulerKeyPerRound;
+ }
+
+ /**
+ * Set the Max Allocations per Scheduler Key per allocation round.
+ * @param maxAllocationsPerSchedulerKeyPerRound val.
+ */
+ public void setMaxAllocationsPerSchedulerKeyPerRound(
+ int maxAllocationsPerSchedulerKeyPerRound) {
+ this.maxAllocationsPerSchedulerKeyPerRound =
+ maxAllocationsPerSchedulerKeyPerRound;
+ }
}
/**
@@ -188,6 +214,72 @@ public long generateContainerId() {
private final BaseContainerTokenSecretManager tokenSecretManager;
+ static class Allocation {
+ private final Container container;
+ private final String resourceName;
+
+ Allocation(Container container, String resourceName) {
+ this.container = container;
+ this.resourceName = resourceName;
+ }
+
+ Container getContainer() {
+ return container;
+ }
+
+ String getResourceName() {
+ return resourceName;
+ }
+ }
+
+ static class EnrichedResourceRequest {
+ private final Map nodeLocations = new HashMap<>();
+ private final Map rackLocations = new HashMap<>();
+ private final ResourceRequest request;
+
+ EnrichedResourceRequest(ResourceRequest request) {
+ this.request = request;
+ }
+
+ ResourceRequest getRequest() {
+ return request;
+ }
+
+ void addLocation(String location, int count) {
+ Map m = rackLocations;
+ if (!location.startsWith("/")) {
+ m = nodeLocations;
+ }
+ if (count == 0) {
+ m.remove(location);
+ } else {
+ m.put(location, new AtomicInteger(count));
+ }
+ }
+
+ void removeLocation(String location) {
+ Map m = rackLocations;
+ AtomicInteger count = m.get(location);
+ if (count == null) {
+ m = nodeLocations;
+ count = m.get(location);
+ }
+
+ if (count != null) {
+ if (count.decrementAndGet() == 0) {
+ m.remove(location);
+ }
+ }
+ }
+
+ Set getNodeLocations() {
+ return nodeLocations.keySet();
+ }
+
+ Set getRackLocations() {
+ return rackLocations.keySet();
+ }
+ }
/**
* Create a new Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
@@ -223,37 +315,55 @@ public OpportunisticContainerAllocator(
// Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(oppResourceReqs);
- // Satisfy the outstanding OPPORTUNISTIC requests.
+ Set nodeBlackList = new HashSet<>(opportContext.getBlacklist());
List allocatedContainers = new ArrayList<>();
- for (SchedulerRequestKey schedulerKey :
- opportContext.getOutstandingOpReqs().descendingKeySet()) {
- // Allocated containers :
- // Key = Requested Capability,
- // Value = List of Containers of given cap (the actual container size
- // might be different than what is requested, which is why
- // we need the requested capability (key) to match against
- // the outstanding reqs)
- Map> allocated = allocate(rmIdentifier,
- opportContext, schedulerKey, applicationAttemptId, appSubmitter);
- for (Map.Entry> e : allocated.entrySet()) {
- opportContext.matchAllocationToOutstandingRequest(
- e.getKey(), e.getValue());
- allocatedContainers.addAll(e.getValue());
+
+ // Satisfy the outstanding OPPORTUNISTIC requests.
+ boolean continueLoop = true;
+ while (continueLoop) {
+ continueLoop = false;
+ List