diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java index 7102f7b..e7458cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java @@ -87,6 +87,12 @@ public int hashCode() { } @Override + public String toString() { + return "UpdateContainerError{reason=" + getReason() + ", " + + "req=" + getUpdateContainerRequest() + "}"; + } + + @Override public boolean equals(Object obj) { if (this == obj) { return true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java index 200dea3..925a7979 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java @@ -150,15 +150,28 @@ public int hashCode() { ContainerId cId = getContainerId(); ExecutionType execType = getExecutionType(); Resource capability = getCapability(); + ContainerUpdateType updateType = getContainerUpdateType(); result = prime * result + ((capability == null) ? 0 : capability.hashCode()); result = prime * result + ((cId == null) ? 0 : cId.hashCode()); result = prime * result + getContainerVersion(); result = prime * result + ((execType == null) ? 0 : execType.hashCode()); + result = prime * result + ((updateType== null) ? 0 : updateType.hashCode()); return result; } @Override + public String toString() { + return "UpdateReq{" + + "containerId=" + getContainerId() + ", " + + "containerVersion=" + getContainerVersion() + ", " + + "targetExecType=" + getExecutionType() + ", " + + "targetCapability=" + getCapability() + ", " + + "updateType=" + getContainerUpdateType() + ", " + + "}"; + } + + @Override public boolean equals(Object obj) { if (this == obj) { return true; @@ -197,6 +210,14 @@ public boolean equals(Object obj) { } else if (!execType.equals(other.getExecutionType())) { return false; } + ContainerUpdateType updateType = getContainerUpdateType(); + if (updateType == null) { + if (other.getContainerUpdateType() != null) { + return false; + } + } else if (!updateType.equals(other.getContainerUpdateType())) { + return false; + } return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 52155f5..37ae746 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -525,6 +525,9 @@ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatu public abstract void requestContainerResourceChange( Container container, Resource capability); + public abstract void requestContainerUpdate( + Container container, Resource capability, ExecutionType targetExectype); + /** * Release containers assigned by the Resource Manager. If the app cannot use * the container or wants to give up the container then it can release them. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 44fc1e0..adaf726 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -169,15 +169,16 @@ static boolean canFit(Resource arg0, Resource arg1) { protected Set pendingRelease = new TreeSet(); // change map holds container resource change requests between two allocate() // calls, and are cleared after each successful allocate() call. - protected final Map> change = - new HashMap<>(); + protected final Map> change = new HashMap<>(); // pendingChange map holds history of container resource change requests in // case AM needs to reregister with the ResourceManager. // Change requests are removed from this map if RM confirms the change // through allocate response, or if RM confirms that the container has been // completed. - protected final Map> - pendingChange = new HashMap<>(); + protected final Map> pendingChange = + new HashMap<>(); public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); @@ -259,7 +260,7 @@ public AllocateResponse allocate(float progressIndicator) AllocateRequest allocateRequest = null; List blacklistToAdd = new ArrayList(); List blacklistToRemove = new ArrayList(); - Map> oldChange = + Map> oldChange = new HashMap<>(); try { synchronized (this) { @@ -374,14 +375,15 @@ public AllocateResponse allocate(float progressIndicator) // // Only insert entries from the cached oldChange map // that do not exist in the current change map: - for (Map.Entry> entry : + for (Map.Entry> entry : oldChange.entrySet()) { ContainerId oldContainerId = entry.getKey(); Container oldContainer = entry.getValue().getKey(); - Resource oldResource = entry.getValue().getValue(); + UpdateContainerRequest oldupdate = entry.getValue().getValue(); if (change.get(oldContainerId) == null) { change.put( - oldContainerId, new SimpleEntry<>(oldContainer, oldResource)); + oldContainerId, new SimpleEntry<>(oldContainer, oldupdate)); } } blacklistAdditions.addAll(blacklistToAdd); @@ -394,19 +396,30 @@ public AllocateResponse allocate(float progressIndicator) private List createUpdateList() { List updateList = new ArrayList<>(); - for (Map.Entry> entry : - change.entrySet()) { - Resource targetCapability = entry.getValue().getValue(); - Resource currCapability = entry.getValue().getKey().getResource(); - int version = entry.getValue().getKey().getVersion(); - ContainerUpdateType updateType = - ContainerUpdateType.INCREASE_RESOURCE; - if (Resources.fitsIn(targetCapability, currCapability)) { - updateType = ContainerUpdateType.DECREASE_RESOURCE; + for (Map.Entry> entry : change.entrySet()) { + Resource targetCapability = entry.getValue().getValue().getCapability(); + if (targetCapability != null) { + Resource currCapability = entry.getValue().getKey().getResource(); + int version = entry.getValue().getKey().getVersion(); + ContainerUpdateType updateType = + ContainerUpdateType.INCREASE_RESOURCE; + if (Resources.fitsIn(targetCapability, currCapability)) { + updateType = ContainerUpdateType.DECREASE_RESOURCE; + } + updateList.add( + UpdateContainerRequest.newInstance(version, entry.getKey(), + updateType, targetCapability, null)); + } else { + // TODO: Validate + ExecutionType targetExecType = + entry.getValue().getValue().getExecutionType(); + updateList.add( + UpdateContainerRequest.newInstance( + entry.getValue().getKey().getVersion(), entry.getKey(), + ContainerUpdateType.UPDATE_EXECUTION_TYPE, null, + targetExecType)); } - updateList.add( - UpdateContainerRequest.newInstance(version, entry.getKey(), - updateType, targetCapability, null)); } return updateList; } @@ -591,21 +604,42 @@ public synchronized void removeContainerRequest(T req) { } @Override - public synchronized void requestContainerResourceChange( - Container container, Resource capability) { - validateContainerResourceChangeRequest( - container.getId(), container.getResource(), capability); + public void requestContainerResourceChange(Container container, + Resource capability) { + requestContainerUpdate(container, UpdateContainerRequest.newInstance( + container.getVersion(), container.getId(), null, capability, null)); + } + + @Override + public void requestContainerUpdate(Container container, Resource capability, + ExecutionType targetExectype) { + LOG.info("Requesting Container update : " + + "container=" + container + ", " + + "targetCapability=" + capability + ", " + + "targetExecType=" + targetExectype); + requestContainerUpdate(container, UpdateContainerRequest.newInstance( + container.getVersion(), container.getId(), null, + capability, targetExectype)); + } + + private synchronized void requestContainerUpdate( + Container container, UpdateContainerRequest updateContainerRequest) { + if (updateContainerRequest.getCapability() != null) { + validateContainerResourceChangeRequest( + container.getId(), container.getResource(), + updateContainerRequest.getCapability()); + } if (change.get(container.getId()) == null) { change.put(container.getId(), - new SimpleEntry<>(container, capability)); + new SimpleEntry<>(container, updateContainerRequest)); } else { - change.get(container.getId()).setValue(capability); + change.get(container.getId()).setValue(updateContainerRequest); } if (pendingChange.get(container.getId()) == null) { pendingChange.put(container.getId(), - new SimpleEntry<>(container, capability)); + new SimpleEntry<>(container, updateContainerRequest)); } else { - pendingChange.get(container.getId()).setValue(capability); + pendingChange.get(container.getId()).setValue(updateContainerRequest); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java index 802c207..118cf36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -54,6 +55,9 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -66,13 +70,17 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; /** * Class that tests the allocation of OPPORTUNISTIC containers through the @@ -83,7 +91,6 @@ private static MiniYARNCluster yarnCluster = null; private static YarnClient yarnClient = null; private static List nodeReports = null; - private static ApplicationAttemptId attemptId = null; private static int nodeCount = 3; private static final int ROLLING_INTERVAL_SEC = 13; @@ -92,12 +99,22 @@ private static Resource capability; private static Priority priority; private static Priority priority2; + private static Priority priority3; + private static Priority priority4; private static String node; private static String rack; private static String[] nodes; private static String[] racks; private final static int DEFAULT_ITERATION = 3; + // Per test.. + private ApplicationAttemptId attemptId = null; + private AMRMClientImpl amClient = null; + private long availMB; + private int availVCores; + private long allocMB; + private int allocVCores; + @BeforeClass public static void setup() throws Exception { // start minicluster @@ -106,7 +123,7 @@ public static void setup() throws Exception { YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, ROLLING_INTERVAL_SEC); conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS); - conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000); // set the minimum allocation so that resource decrease can go under 1024 conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setBoolean( @@ -129,7 +146,9 @@ public static void setup() throws Exception { priority = Priority.newInstance(1); priority2 = Priority.newInstance(2); - capability = Resource.newInstance(1024, 1); + priority3 = Priority.newInstance(3); + priority4 = Priority.newInstance(4); + capability = Resource.newInstance(512, 1); node = nodeReports.get(0).getNodeId().getHost(); rack = nodeReports.get(0).getRackName(); @@ -193,10 +212,35 @@ public void startApp() throws Exception { UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); appAttempt.getAMRMToken() .setService(ClientRMProxy.getAMRMTokenService(conf)); + + // start am rm client + amClient = (AMRMClientImpl)AMRMClient + .createAMRMClient(); + + //setting an instance NMTokenCache + amClient.setNMTokenCache(new NMTokenCache()); + //asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + + amClient.init(conf); + amClient.start(); + + amClient.registerApplicationMaster("Host", 10000, ""); } @After public void cancelApp() throws YarnException, IOException { + try { + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); + } finally { + if (amClient != null && + amClient.getServiceState() == Service.STATE.STARTED) { + amClient.stop(); + } + } yarnClient.killApplication(attemptId.getApplicationId()); attemptId = null; } @@ -214,43 +258,224 @@ public static void tearDown() { } @Test(timeout = 60000) - public void testAMRMClient() throws YarnException, IOException { - AMRMClient amClient = null; - try { - // start am rm client - amClient = AMRMClient.createAMRMClient(); + public void testPromotionFromAcquired() throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); - //setting an instance NMTokenCache - amClient.setNMTokenCache(new NMTokenCache()); - //asserting we are not using the singleton instance cache - Assert.assertNotSame(NMTokenCache.getSingleton(), - amClient.getNMTokenCache()); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); - amClient.init(conf); - amClient.start(); + int oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, capability).remoteRequest + .getNumContainers(); - amClient.registerApplicationMaster("Host", 10000, ""); + assertEquals(1, oppContainersRequestedAny); - testOpportunisticAllocation( - (AMRMClientImpl) amClient); + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); - testAllocation((AMRMClientImpl)amClient); + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + Map allocatedOpportContainers = new HashMap<>(); + int iterationsLeft = 50; - amClient - .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, - null); + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap receivedNMTokens = new HashMap<>(); - } finally { - if (amClient != null && - amClient.getServiceState() == Service.STATE.STARTED) { - amClient.stop(); + updateMetrics("Before Opp Allocation"); + + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + allocatedOpportContainers.put(container.getId(), container); + removeCR(amClient, container); + } + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(oppContainersRequestedAny, allocatedContainerCount); + assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size()); + + updateMetrics("After Opp Allocation / Before Promotion"); + + amClient.requestContainerUpdate( + allocatedOpportContainers.values().iterator().next(), + null, ExecutionType.GUARANTEED); + iterationsLeft = 120; + Map updatedContainers = new HashMap<>(); + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + if (allocResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + System.out.println("Got update.."); + updatedContainers.put(updatedContainer.getContainer().getId(), + updatedContainer); + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); } } + + updateMetrics("After Promotion"); + + assertEquals(1, updatedContainers.size()); + for (ContainerId cId : allocatedOpportContainers.keySet()) { + Container orig = allocatedOpportContainers.get(cId); + UpdatedContainer updatedContainer = updatedContainers.get(cId); + assertNotNull(updatedContainer); + assertEquals(ExecutionType.GUARANTEED, + updatedContainer.getContainer().getExecutionType()); + assertEquals(orig.getResource(), + updatedContainer.getContainer().getResource()); + assertEquals(orig.getNodeId(), + updatedContainer.getContainer().getNodeId()); + assertEquals(orig.getVersion() + 1, + updatedContainer.getContainer().getVersion()); + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + amClient.ask.clear(); } - private void testAllocation( - final AMRMClientImpl amClient) - throws YarnException, IOException { + @Test(timeout = 60000) + public void testDemotionFromAcquired() throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority3)); + + int guarContainersRequestedAny = amClient.getTable(0).get(priority3, + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + .remoteRequest.getNumContainers(); + + assertEquals(1, guarContainersRequestedAny); + + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + Map allocatedGuarContainers = new HashMap<>(); + int iterationsLeft = 50; + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap receivedNMTokens = new HashMap<>(); + + updateMetrics("Before Guar Allocation"); + + while (allocatedContainerCount < guarContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + allocatedGuarContainers.put(container.getId(), container); + removeCR(amClient, container); + } + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < guarContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(guarContainersRequestedAny, allocatedContainerCount); + assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size()); + + updateMetrics("After Guar Allocation / Before Demotion"); + + amClient.requestContainerUpdate( + allocatedGuarContainers.values().iterator().next(), + null, ExecutionType.OPPORTUNISTIC); + iterationsLeft = 120; + Map updatedContainers = new HashMap<>(); + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + if (allocResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + System.out.println("Got update.."); + updatedContainers.put(updatedContainer.getContainer().getId(), + updatedContainer); + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } + + updateMetrics("After Demotion"); + + assertEquals(1, updatedContainers.size()); + for (ContainerId cId : allocatedGuarContainers.keySet()) { + Container orig = allocatedGuarContainers.get(cId); + UpdatedContainer updatedContainer = updatedContainers.get(cId); + assertNotNull(updatedContainer); + assertEquals(ExecutionType.OPPORTUNISTIC, + updatedContainer.getContainer().getExecutionType()); + assertEquals(orig.getResource(), + updatedContainer.getContainer().getResource()); + assertEquals(orig.getNodeId(), + updatedContainer.getContainer().getNodeId()); + assertEquals(orig.getVersion() + 1, + updatedContainer.getContainer().getVersion()); + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + amClient.ask.clear(); + } + + @Test(timeout = 60000) + public void testMixedAllocationAndRelease() throws YarnException, + IOException { // setup container request assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); @@ -274,6 +499,28 @@ private void testAllocation( ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); + int containersRequestedNode = amClient.getTable(0).get(priority, + node, ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + int containersRequestedRack = amClient.getTable(0).get(priority, + rack, ExecutionType.GUARANTEED, capability).remoteRequest + .getNumContainers(); + int containersRequestedAny = amClient.getTable(0).get(priority, + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) + .remoteRequest.getNumContainers(); + int oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, capability).remoteRequest + .getNumContainers(); + + assertEquals(4, containersRequestedNode); + assertEquals(4, containersRequestedRack); + assertEquals(4, containersRequestedAny); + assertEquals(2, oppContainersRequestedAny); + + assertEquals(4, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + amClient.removeContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( @@ -284,16 +531,16 @@ private void testAllocation( ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); - int containersRequestedNode = amClient.getTable(0).get(priority, + containersRequestedNode = amClient.getTable(0).get(priority, node, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedRack = amClient.getTable(0).get(priority, + containersRequestedRack = amClient.getTable(0).get(priority, rack, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedAny = amClient.getTable(0).get(priority, + containersRequestedAny = amClient.getTable(0).get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); - int oppContainersRequestedAny = + oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); @@ -309,7 +556,7 @@ private void testAllocation( // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; int allocatedOpportContainerCount = 0; - int iterationsLeft = 10; + int iterationsLeft = 50; Set releases = new TreeSet<>(); amClient.getNMTokenCache().clearCache(); @@ -324,8 +571,8 @@ private void testAllocation( assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); - allocatedContainerCount += allocResponse.getAllocatedContainers() - .size(); + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); for (Container container : allocResponse.getAllocatedContainers()) { if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { allocatedOpportContainerCount++; @@ -345,9 +592,9 @@ private void testAllocation( } } - assertEquals(allocatedContainerCount, - containersRequestedAny + oppContainersRequestedAny); - assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny); + assertEquals(containersRequestedAny + oppContainersRequestedAny, + allocatedContainerCount); + assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount); for (ContainerId rejectContainerId : releases) { amClient.releaseAssignedContainer(rejectContainerId); } @@ -395,26 +642,25 @@ private void testAllocation( /** * Tests allocation with requests comprising only opportunistic containers. */ - private void testOpportunisticAllocation( - final AMRMClientImpl amClient) - throws YarnException, IOException { + @Test(timeout = 60000) + public void testOpportunisticAllocation() throws YarnException, IOException { // setup container request assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + new AMRMClient.ContainerRequest(capability, null, null, priority3, 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + new AMRMClient.ContainerRequest(capability, null, null, priority3, 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); int oppContainersRequestedAny = - amClient.getTable(0).get(priority, ResourceRequest.ANY, + amClient.getTable(0).get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); @@ -456,9 +702,44 @@ private void testOpportunisticAllocation( } } + assertEquals(oppContainersRequestedAny, allocatedContainerCount); assertEquals(1, receivedNMTokens.values().size()); } + private void removeCR(AMRMClientImpl amClient, + Container container) { + List> + matchingRequests = amClient.getMatchingRequests(container + .getPriority(), + ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, + container.getResource()); + Set toRemove = new HashSet<>(); + for (Collection rc : matchingRequests) { + for (AMRMClient.ContainerRequest cr : rc) { + toRemove.add(cr); + } + } + for (AMRMClient.ContainerRequest cr : toRemove) { + amClient.removeContainerRequest(cr); + } + } + + private void updateMetrics(String msg) { + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler)yarnCluster.getResourceManager() + .getResourceScheduler(); + availMB = scheduler.getRootQueueMetrics().getAvailableMB(); + availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores(); + allocMB = scheduler.getRootQueueMetrics().getAllocatedMB(); + allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); + System.out.println("## METRICS (" + msg + ")==>"); + System.out.println(" : availMB=" + availMB + ", " + + "availVCores=" +availVCores + ", " + + "allocMB=" + allocMB + ", " + + "allocVCores=" + allocVCores + ", "); + System.out.println("<== ##"); + } + private void sleep(int sleepTime) { try { Thread.sleep(sleepTime); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index c0d52a6..de3fc47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -282,8 +282,8 @@ public synchronized void setAllocatedContainers( final List containers) { if (containers == null) return; - // this looks like a bug because it results in append and not set initLocalNewContainerList(); + allocatedContainers.clear(); allocatedContainers.addAll(containers); } @@ -299,6 +299,7 @@ public synchronized void setUpdatedContainers( if (containers == null) return; initLocalUpdatedContainerList(); + updatedContainers.clear(); updatedContainers.addAll(containers); } @@ -315,6 +316,7 @@ public synchronized void setCompletedContainersStatuses( if (containers == null) return; initLocalFinishedContainerList(); + completedContainersStatuses.clear(); completedContainersStatuses.addAll(containers); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 16436bd..3c1fcbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -192,7 +192,8 @@ public OpportunisticContainerAllocator( /** * Allocate OPPORTUNISTIC containers. - * @param request AllocateRequest + * @param blackList Resource BlackList Request + * @param oppResourceReqs Opportunistic Resource Requests * @param applicationAttemptId ApplicationAttemptId * @param opportContext App specific OpportunisticContainerContext * @param rmIdentifier RM Identifier @@ -200,32 +201,24 @@ public OpportunisticContainerAllocator( * @return List of Containers. * @throws YarnException YarnException */ - public List allocateContainers( - AllocateRequest request, ApplicationAttemptId applicationAttemptId, + public List allocateContainers(ResourceBlacklistRequest blackList, + List oppResourceReqs, + ApplicationAttemptId applicationAttemptId, OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException { - // Update released containers. - List releasedContainers = request.getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - opportContext.getContainersAllocated().removeAll(releasedContainers); - } // Update black list. - ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); - if (rbr != null) { - opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); - opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + if (blackList != null) { + opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); } // Add OPPORTUNISTIC requests to the outstanding ones. - opportContext.addToOutstandingReqs(request.getAskList()); + opportContext.addToOutstandingReqs(oppResourceReqs); // Satisfy the outstanding OPPORTUNISTIC requests. List allocatedContainers = new ArrayList<>(); - for (Priority priority : + for (SchedulerRequestKey schedulerKey : opportContext.getOutstandingOpReqs().descendingKeySet()) { // Allocated containers : // Key = Requested Capability, @@ -234,7 +227,7 @@ public OpportunisticContainerAllocator( // we need the requested capability (key) to match against // the outstanding reqs) Map> allocated = allocate(rmIdentifier, - opportContext, priority, applicationAttemptId, appSubmitter); + opportContext, schedulerKey, applicationAttemptId, appSubmitter); for (Map.Entry> e : allocated.entrySet()) { opportContext.matchAllocationToOutstandingRequest( e.getKey(), e.getValue()); @@ -246,19 +239,22 @@ public OpportunisticContainerAllocator( } private Map> allocate(long rmIdentifier, - OpportunisticContainerContext appContext, Priority priority, + OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, ApplicationAttemptId appAttId, String userName) throws YarnException { Map> containers = new HashMap<>(); for (ResourceRequest anyAsk : - appContext.getOutstandingOpReqs().get(priority).values()) { + appContext.getOutstandingOpReqs().get(schedKey).values()) { allocateContainersInternal(rmIdentifier, appContext.getAppParams(), appContext.getContainerIdGenerator(), appContext.getBlacklist(), appAttId, appContext.getNodeMap(), userName, containers, anyAsk); - LOG.info("Opportunistic allocation requested for [" - + "priority=" + anyAsk.getPriority() - + ", num_containers=" + anyAsk.getNumContainers() - + ", capability=" + anyAsk.getCapability() + "]" - + " allocated = " + containers.get(anyAsk.getCapability()).size()); + if (!containers.isEmpty()) { + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", allocationRequestId=" + anyAsk.getAllocationRequestId() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); + } } return containers; } @@ -316,13 +312,21 @@ private Container buildContainer(long rmIdentifier, // before accepting an ask) Resource capability = normalizeCapability(appParams, rr); + return createContainer( + rmIdentifier, appParams.getContainerTokenExpiryInterval(), + SchedulerRequestKey.create(rr), userName, node, cId, capability); + } + + private Container createContainer(long rmIdentifier, long tokenExpiry, + SchedulerRequestKey schedulerKey, String userName, RemoteNode node, + ContainerId cId, Resource capability) { long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier( cId, 0, node.getNodeId().toString(), userName, - capability, currTime + appParams.containerTokenExpiryInterval, + capability, currTime + tokenExpiry, tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, - rr.getPriority(), currTime, + schedulerKey.getPriority(), currTime, null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, ExecutionType.OPPORTUNISTIC); byte[] pwd = @@ -331,9 +335,9 @@ private Container buildContainer(long rmIdentifier, containerTokenIdentifier); Container container = BuilderUtils.newContainer( cId, node.getNodeId(), node.getHttpAddress(), - capability, rr.getPriority(), containerToken, + capability, schedulerKey.getPriority(), containerToken, containerTokenIdentifier.getExecutionType(), - rr.getAllocationRequestId()); + schedulerKey.getAllocationRequestId()); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 725e2d9..6148c04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,14 +18,15 @@ package org.apache.hadoop.yarn.server.scheduler; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; @@ -52,9 +54,11 @@ private static final Logger LOG = LoggerFactory .getLogger(OpportunisticContainerContext.class); - // Currently just used to keep track of allocated containers. - // Can be used for reporting stats later. - private Set containersAllocated = new HashSet<>(); + public static ContainerId UNDEFINED = + ContainerId.newContainerId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(-1, -1), -1), -1); + private AllocationParams appParams = new AllocationParams(); private ContainerIdGenerator containerIdGenerator = @@ -69,12 +73,22 @@ // Resource Name (host/rack/any) and capability. This mapping is required // to match a received Container to an outstanding OPPORTUNISTIC // ResourceRequest (ask). - private final TreeMap> + private final TreeMap> outstandingOpReqs = new TreeMap<>(); - public Set getContainersAllocated() { - return containersAllocated; - } + // Keep track of containers that are undergoing promotion + private final Map>>> outstandingPromotions = new HashMap<>(); + + // A promotion request is sent as just another request request, but it + // is tracked using an allocationRequestId, which is < 0. This will + // ensure that these requests are processed before normal requests + private final Map + outstandingPromotionKeys = new HashMap<>(); + + private final Set outstandingDemotions = new HashSet<>(); + + private AtomicLong promotionCounter = new AtomicLong(-1); public AllocationParams getAppParams() { return appParams; @@ -119,20 +133,123 @@ public void updateAllocationParams(Resource minResource, Resource maxResource, return blacklist; } - public TreeMap> + public TreeMap> getOutstandingOpReqs() { return outstandingOpReqs; } - public void updateCompletedContainers(AllocateResponse allocateResponse) { - for (ContainerStatus cs : - allocateResponse.getCompletedContainersStatuses()) { - if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); + public synchronized boolean isBeingPromoted(Container container) { + Map>> resourceMap = + outstandingPromotions.get(SchedulerRequestKey.extractFrom(container)); + if (resourceMap != null) { + Map> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + return containerIds.contains(container.getId()); + } } } + return false; + } + + public synchronized boolean checkAndAddToOutstandingDemotions( + Container container) { + if (isBeingPromoted(container) + || outstandingDemotions.contains(container.getId())) { + return false; + } + outstandingDemotions.add(container.getId()); + return true; } + public SchedulerRequestKey checkAndAddToOutstandingPromotions( + Container container) { + SchedulerRequestKey schedulerKey = + SchedulerRequestKey.extractFrom(container); + Map>> resourceMap = + outstandingPromotions.get(schedulerKey); + if (resourceMap == null) { + resourceMap = new HashMap<>(); + outstandingPromotions.put(schedulerKey, resourceMap); + } + Map> locationMap = + resourceMap.get(container.getResource()); + if (locationMap == null) { + locationMap = new HashMap<>(); + resourceMap.put(container.getResource(), locationMap); + } + Set containerIds = locationMap.get(container.getNodeId()); + if (containerIds == null) { + containerIds = new HashSet<>(); + locationMap.put(container.getNodeId(), containerIds); + } + if (containerIds.contains(container.getId()) + || outstandingDemotions.contains(container.getId())) { + return null; + } + containerIds.add(container.getId()); + SchedulerRequestKey promotionKey = new SchedulerRequestKey + (container.getPriority(), promotionCounter.decrementAndGet()); + outstandingPromotionKeys.put(promotionKey, schedulerKey); + return promotionKey; + } + + public void completeContainerExecTypeUpdate(Container container) { + SchedulerRequestKey schedulerKey = + SchedulerRequestKey.extractFrom(container); + Map>> resourceMap = + outstandingPromotions.get(schedulerKey); + if (resourceMap != null) { + Map> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + containerIds.remove(container.getId()); + if (containerIds.isEmpty()) { + locationMap.remove(container.getNodeId()); + } + } + if (locationMap.isEmpty()) { + resourceMap.remove(container.getResource()); + } + } + if (resourceMap.isEmpty()) { + outstandingPromotions.remove(schedulerKey); + } + } + outstandingDemotions.remove(container.getId()); + } + + public ContainerId matchContainerToOutstandingPromotionReq( + Container container) { + ContainerId retVal = null; + SchedulerRequestKey promotionKey = + SchedulerRequestKey.extractFrom(container); + SchedulerRequestKey schedulerKey = + outstandingPromotionKeys.get(promotionKey); + Map>> resourceMap = + outstandingPromotions.get(schedulerKey); + if (resourceMap != null) { + Map> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + retVal = containerIds.iterator().next(); + outstandingPromotionKeys.remove(promotionKey); + } + } + } + // Allocation happened on NM on the same host, but not on the NM + // we need.. We need to signal that this container has to be released. + if (schedulerKey != null && retVal == null) { + return UNDEFINED; + } + return retVal; + } /** * Takes a list of ResourceRequests (asks), extracts the key information viz. * (Priority, ResourceName, Capability) and adds to the outstanding @@ -144,7 +261,7 @@ public void updateCompletedContainers(AllocateResponse allocateResponse) { */ public void addToOutstandingReqs(List resourceAsks) { for (ResourceRequest request : resourceAsks) { - Priority priority = request.getPriority(); + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); // TODO: Extend for Node/Rack locality. We only handle ANY requests now if (!ResourceRequest.isAnyLocation(request.getResourceName())) { @@ -156,10 +273,10 @@ public void addToOutstandingReqs(List resourceAsks) { } Map reqMap = - outstandingOpReqs.get(priority); + outstandingOpReqs.get(schedulerKey); if (reqMap == null) { reqMap = new HashMap<>(); - outstandingOpReqs.put(priority, reqMap); + outstandingOpReqs.put(schedulerKey, reqMap); } ResourceRequest resourceRequest = reqMap.get(request.getCapability()); @@ -171,7 +288,9 @@ public void addToOutstandingReqs(List resourceAsks) { resourceRequest.getNumContainers() + request.getNumContainers()); } if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + LOG.info("# of outstandingOpReqs in ANY (at " + + "priority = " + schedulerKey.getPriority() + + ", allocationReqId = " + schedulerKey.getAllocationRequestId() + ", with capability = " + request.getCapability() + " ) : " + resourceRequest.getNumContainers()); } @@ -187,9 +306,10 @@ public void addToOutstandingReqs(List resourceAsks) { public void matchAllocationToOutstandingRequest(Resource capability, List allocatedContainers) { for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); + SchedulerRequestKey schedulerKey = + SchedulerRequestKey.extractFrom(c); Map asks = - outstandingOpReqs.get(c.getPriority()); + outstandingOpReqs.get(schedulerKey); if (asks == null) { continue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java similarity index 92% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index 4b640ae..9b7edbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +package org.apache.hadoop.yarn.server.scheduler; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; @@ -53,7 +53,7 @@ public static SchedulerRequestKey extractFrom(Container container) { container.getAllocationRequestId()); } - private SchedulerRequestKey(Priority priority, long allocationRequestId) { + SchedulerRequestKey(Priority priority, long allocationRequestId) { this.priority = priority; this.allocationRequestId = allocationRequestId; } @@ -119,4 +119,12 @@ public int hashCode() { getAllocationRequestId() >>> 32)); return result; } + + @Override + public String toString() { + return "SchedulerRequestKey{" + + "priority=" + priority + + ", allocationRequestId=" + allocationRequestId + + '}'; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index 0f47c93..a9b5ed4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -227,10 +227,10 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( .partitionAskList(request.getAllocateRequest().getAskList()); // Allocate OPPORTUNISTIC containers. - request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic()); List allocatedContainers = containerAllocator.allocateContainers( - request.getAllocateRequest(), applicationAttemptId, + request.getAllocateRequest().getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), applicationAttemptId, oppContainerContext, rmIdentifier, appSubmitter); // Prepare request for sending to RM for scheduling GUARANTEED containers. @@ -252,18 +252,11 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( nodeTokens.put(nmToken.getNodeId(), nmToken); } - oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse()); - // Check if we have NM tokens for all the allocated containers. If not // generate one and update the response. updateAllocateResponse( dsResp.getAllocateResponse(), nmTokens, allocatedContainers); - if (LOG.isDebugEnabled()) { - LOG.debug("Number of opportunistic containers currently" + - "allocated by application: " + oppContainerContext - .getContainersAllocated().size()); - } return dsResp; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 3d7b2b1..ad01f4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -405,7 +405,6 @@ public AllocateResponse allocate(AllocateRequest request) ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); - ApplicationId applicationId = appAttemptId.getApplicationId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -422,8 +421,10 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "AM is not registered for known application attempt: " + appAttemptId - + " or RM had restarted after AM registered . AM should re-register."; + "AM is not registered for known application attempt: " + + appAttemptId + + " or RM had restarted after AM registered. " + + " AM should re-register."; throw new ApplicationMasterNotRegisteredException(message); } @@ -438,185 +439,10 @@ public AllocateResponse allocate(AllocateRequest request) throw new InvalidApplicationMasterRequestException(message); } - //filter illegal progress values - float filteredProgress = request.getProgress(); - if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY - || filteredProgress < 0) { - request.setProgress(0); - } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { - request.setProgress(1); - } - - // Send the status update to the appAttempt. - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptStatusupdateEvent(appAttemptId, request - .getProgress())); - - List ask = request.getAskList(); - List release = request.getReleaseList(); - - ResourceBlacklistRequest blacklistRequest = - request.getResourceBlacklistRequest(); - List blacklistAdditions = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; - List blacklistRemovals = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; - RMApp app = - this.rmContext.getRMApps().get(applicationId); - - // set label expression for Resource Requests if resourceName=ANY - ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); - for (ResourceRequest req : ask) { - if (null == req.getNodeLabelExpression() - && ResourceRequest.ANY.equals(req.getResourceName())) { - req.setNodeLabelExpression(asc.getNodeLabelExpression()); - } - } - - Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); - - // sanity check - try { - RMServerUtils.normalizeAndValidateRequests(ask, - maximumCapacity, app.getQueue(), - rScheduler, rmContext); - } catch (InvalidResourceRequestException e) { - LOG.warn("Invalid resource ask by application " + appAttemptId, e); - throw e; - } - - try { - RMServerUtils.validateBlacklistRequest(blacklistRequest); - } catch (InvalidResourceBlacklistRequestException e) { - LOG.warn("Invalid blacklist request by application " + appAttemptId, e); - throw e; - } - - // In the case of work-preserving AM restart, it's possible for the - // AM to release containers from the earlier attempt. - if (!app.getApplicationSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { - try { - RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); - } catch (InvalidContainerReleaseException e) { - LOG.warn("Invalid container release by application " + appAttemptId, - e); - throw e; - } - } - - // Split Update Resource Requests into increase and decrease. - // No Exceptions are thrown here. All update errors are aggregated - // and returned to the AM. - List increaseResourceReqs = new ArrayList<>(); - List decreaseResourceReqs = new ArrayList<>(); - List updateContainerErrors = - RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext, - request, maximumCapacity, increaseResourceReqs, - decreaseResourceReqs); - - // Send new requests to appAttempt. - Allocation allocation; - RMAppAttemptState state = - app.getRMAppAttempt(appAttemptId).getAppAttemptState(); - if (state.equals(RMAppAttemptState.FINAL_SAVING) || - state.equals(RMAppAttemptState.FINISHING) || - app.isAppFinalStateStored()) { - LOG.warn(appAttemptId + " is in " + state + - " state, ignore container allocate request."); - allocation = EMPTY_ALLOCATION; - } else { - allocation = - this.rScheduler.allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - increaseResourceReqs, decreaseResourceReqs); - } - - if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { - LOG.info("blacklist are updated in Scheduler." + - "blacklistAdditions: " + blacklistAdditions + ", " + - "blacklistRemovals: " + blacklistRemovals); - } - RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AllocateResponse allocateResponse = + AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); - if (allocation.getNMTokens() != null && - !allocation.getNMTokens().isEmpty()) { - allocateResponse.setNMTokens(allocation.getNMTokens()); - } - - // Notify the AM of container update errors - if (!updateContainerErrors.isEmpty()) { - allocateResponse.setUpdateErrors(updateContainerErrors); - } - // update the response with the deltas of node status changes - List updatedNodes = new ArrayList(); - if(app.pullRMNodeUpdates(updatedNodes) > 0) { - List updatedNodeReports = new ArrayList(); - for(RMNode rmNode: updatedNodes) { - SchedulerNodeReport schedulerNodeReport = - rScheduler.getNodeReport(rmNode.getNodeID()); - Resource used = BuilderUtils.newResource(0, 0); - int numContainers = 0; - if (schedulerNodeReport != null) { - used = schedulerNodeReport.getUsedResource(); - numContainers = schedulerNodeReport.getNumContainers(); - } - NodeId nodeId = rmNode.getNodeID(); - NodeReport report = - BuilderUtils.newNodeReport(nodeId, rmNode.getState(), - rmNode.getHttpAddress(), rmNode.getRackName(), used, - rmNode.getTotalCapability(), numContainers, - rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), - rmNode.getNodeLabels()); - - updatedNodeReports.add(report); - } - allocateResponse.setUpdatedNodes(updatedNodeReports); - } - - allocateResponse.setAllocatedContainers(allocation.getContainers()); - allocateResponse.setCompletedContainersStatuses(appAttempt - .pullJustFinishedContainers()); - allocateResponse.setResponseId(lastResponse.getResponseId() + 1); - allocateResponse.setAvailableResources(allocation.getResourceLimit()); - - // Handling increased/decreased containers - List updatedContainers = new ArrayList<>(); - if (allocation.getIncreasedContainers() != null) { - for (Container c : allocation.getIncreasedContainers()) { - updatedContainers.add( - UpdatedContainer.newInstance( - ContainerUpdateType.INCREASE_RESOURCE, c)); - } - } - if (allocation.getDecreasedContainers() != null) { - for (Container c : allocation.getDecreasedContainers()) { - updatedContainers.add( - UpdatedContainer.newInstance( - ContainerUpdateType.DECREASE_RESOURCE, c)); - } - } - - allocateResponse.setUpdatedContainers(updatedContainers); - - allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - - // add collector address for this application - if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - allocateResponse.setCollectorAddr( - this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); - } - - // add preemption to the allocateResponse message (if any) - allocateResponse - .setPreemptionMessage(generatePreemptionMessage(allocation)); - - // Set application priority - allocateResponse.setApplicationPriority(app - .getApplicationPriority()); + allocate(amrmTokenIdentifier.getApplicationAttemptId(), + request, response); // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = @@ -624,21 +450,24 @@ public AllocateResponse allocate(AllocateRequest request) if (nextMasterKey != null && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier - .getKeyId()) { + .getKeyId()) { + RMApp app = + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt; Token amrmToken = appAttempt.getAMRMToken(); if (nextMasterKey.getMasterKey().getKeyId() != appAttemptImpl.getAMRMTokenKeyId()) { LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" - + " to application: " + applicationId); + + " to application: " + appAttemptId.getApplicationId()); amrmToken = rmContext.getAMRMTokenSecretManager() .createAndGetAMRMToken(appAttemptId); appAttemptImpl.setAMRMToken(amrmToken); } - allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token - .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() - .toString(), amrmToken.getPassword(), amrmToken.getService() - .toString())); + response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() + .toString(), amrmToken.getPassword(), amrmToken.getService() + .toString())); } /* @@ -646,11 +475,225 @@ public AllocateResponse allocate(AllocateRequest request) * need to worry about unregister call occurring in between (which * removes the lock object). */ - lock.setAllocateResponse(allocateResponse); - return allocateResponse; + response.setResponseId(lastResponse.getResponseId() + 1); + lock.setAllocateResponse(response); + return response; } } + protected void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse allocateResponse) + throws YarnException { + + //filter illegal progress values + float filteredProgress = request.getProgress(); + if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY + || filteredProgress < 0) { + request.setProgress(0); + } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { + request.setProgress(1); + } + + // Send the status update to the appAttempt. + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptStatusupdateEvent(appAttemptId, request + .getProgress())); + + List ask = request.getAskList(); + List release = request.getReleaseList(); + + ResourceBlacklistRequest blacklistRequest = + request.getResourceBlacklistRequest(); + List blacklistAdditions = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; + List blacklistRemovals = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; + RMApp app = + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + + // set label expression for Resource Requests if resourceName=ANY + ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); + for (ResourceRequest req : ask) { + if (null == req.getNodeLabelExpression() + && ResourceRequest.ANY.equals(req.getResourceName())) { + req.setNodeLabelExpression(asc.getNodeLabelExpression()); + } + } + + Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); + + // sanity check + try { + RMServerUtils.normalizeAndValidateRequests(ask, + maximumCapacity, app.getQueue(), + rScheduler, rmContext); + } catch (InvalidResourceRequestException e) { + LOG.warn("Invalid resource ask by application " + appAttemptId, e); + throw e; + } + + try { + RMServerUtils.validateBlacklistRequest(blacklistRequest); + } catch (InvalidResourceBlacklistRequestException e) { + LOG.warn("Invalid blacklist request by application " + appAttemptId, e); + throw e; + } + + // In the case of work-preserving AM restart, it's possible for the + // AM to release containers from the earlier attempt. + if (!app.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + try { + RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); + } catch (InvalidContainerReleaseException e) { + LOG.warn("Invalid container release by application " + appAttemptId, + e); + throw e; + } + } + + // Split Update Resource Requests into increase and decrease. + // No Exceptions are thrown here. All update errors are aggregated + // and returned to the AM. + List increaseResourceReqs = new ArrayList<>(); + List decreaseResourceReqs = new ArrayList<>(); + List updateContainerErrors = + RMServerUtils.validateAndSplitUpdateResourceRequests( + rmContext, request, maximumCapacity, + increaseResourceReqs, decreaseResourceReqs); + + // Send new requests to appAttempt. + Allocation allocation; + RMAppAttemptState state = + app.getRMAppAttempt(appAttemptId).getAppAttemptState(); + if (state.equals(RMAppAttemptState.FINAL_SAVING) || + state.equals(RMAppAttemptState.FINISHING) || + app.isAppFinalStateStored()) { + LOG.warn(appAttemptId + " is in " + state + + " state, ignore container allocate request."); + allocation = EMPTY_ALLOCATION; + } else { + allocation = + this.rScheduler.allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, + increaseResourceReqs, decreaseResourceReqs); + } + + if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { + LOG.info("blacklist are updated in Scheduler." + + "blacklistAdditions: " + blacklistAdditions + ", " + + "blacklistRemovals: " + blacklistRemovals); + } + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); + + if (allocation.getNMTokens() != null && + !allocation.getNMTokens().isEmpty()) { + allocateResponse.setNMTokens(allocation.getNMTokens()); + } + + // Notify the AM of container update errors + addToUpdateContainerErrors(allocateResponse, updateContainerErrors); + + // update the response with the deltas of node status changes + List updatedNodes = new ArrayList(); + if(app.pullRMNodeUpdates(updatedNodes) > 0) { + List updatedNodeReports = new ArrayList(); + for(RMNode rmNode: updatedNodes) { + SchedulerNodeReport schedulerNodeReport = + rScheduler.getNodeReport(rmNode.getNodeID()); + Resource used = BuilderUtils.newResource(0, 0); + int numContainers = 0; + if (schedulerNodeReport != null) { + used = schedulerNodeReport.getUsedResource(); + numContainers = schedulerNodeReport.getNumContainers(); + } + NodeId nodeId = rmNode.getNodeID(); + NodeReport report = + BuilderUtils.newNodeReport(nodeId, rmNode.getState(), + rmNode.getHttpAddress(), rmNode.getRackName(), used, + rmNode.getTotalCapability(), numContainers, + rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), + rmNode.getNodeLabels()); + + updatedNodeReports.add(report); + } + allocateResponse.setUpdatedNodes(updatedNodeReports); + } + + addToAllocatedContainers(allocateResponse, allocation.getContainers()); + + allocateResponse.setCompletedContainersStatuses(appAttempt + .pullJustFinishedContainers()); + allocateResponse.setAvailableResources(allocation.getResourceLimit()); + + // Handling increased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.INCREASE_RESOURCE, + allocation.getIncreasedContainers()); + + // Handling decreased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.DECREASE_RESOURCE, + allocation.getDecreasedContainers()); + + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + + // add collector address for this application + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + allocateResponse.setCollectorAddr( + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getCollectorAddr()); + } + + // add preemption to the allocateResponse message (if any) + allocateResponse + .setPreemptionMessage(generatePreemptionMessage(allocation)); + + // Set application priority + allocateResponse.setApplicationPriority(app + .getApplicationPriority()); + } + + protected void addToUpdateContainerErrors(AllocateResponse allocateResponse, + List updateContainerErrors) { + if (!updateContainerErrors.isEmpty()) { + if (allocateResponse.getUpdateErrors() != null + && !allocateResponse.getUpdateErrors().isEmpty()) { + updateContainerErrors = new ArrayList<>(updateContainerErrors); + updateContainerErrors.addAll(allocateResponse.getUpdateErrors()); + } + allocateResponse.setUpdateErrors(updateContainerErrors); + } + } + + protected void addToUpdatedContainers(AllocateResponse allocateResponse, + ContainerUpdateType updateType, List updatedContainers) { + if (updatedContainers != null && updatedContainers.size() > 0) { + ArrayList containersToSet = new ArrayList<>(); + if (allocateResponse.getUpdatedContainers() != null && + !allocateResponse.getUpdatedContainers().isEmpty()) { + containersToSet.addAll(allocateResponse.getUpdatedContainers()); + } + for (Container updatedContainer : updatedContainers) { + containersToSet.add( + UpdatedContainer.newInstance(updateType, updatedContainer)); + } + allocateResponse.setUpdatedContainers(containersToSet); + } + } + + protected void addToAllocatedContainers(AllocateResponse allocateResponse, + List allocatedContainers) { + if (allocateResponse.getAllocatedContainers() != null + && !allocateResponse.getAllocatedContainers().isEmpty()) { + allocatedContainers = new ArrayList<>(allocatedContainers); + allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); + } + allocateResponse.setAllocatedContainers(allocatedContainers); + } + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ PreemptionMessage pMsg = null; // assemble strict preemption request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 7814b84..e726e8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -26,7 +26,14 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; @@ -70,9 +77,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; - import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import java.io.IOException; @@ -80,6 +88,9 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils + .RECORD_FACTORY; + /** * The OpportunisticContainerAllocatorAMService is started instead of the * ApplicationMasterService if opportunistic scheduling is enabled for the YARN @@ -218,8 +229,9 @@ public long generateContainerId() { } @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { + protected void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse allocateResponse) + throws YarnException { // Partition requests to GUARANTEED and OPPORTUNISTIC. OpportunisticContainerAllocator.PartitionedResourceRequests @@ -227,40 +239,185 @@ public AllocateResponse allocate(AllocateRequest request) throws oppContainerAllocator.partitionAskList(request.getAskList()); // Allocate OPPORTUNISTIC containers. - request.setAskList(partitionedAsks.getOpportunistic()); - final ApplicationAttemptId appAttemptId = getAppAttemptId(); - SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) - rmContext.getScheduler()).getApplicationAttempt(appAttemptId); + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler)rmContext.getScheduler()) + .getApplicationAttempt(appAttemptId); OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); List oppContainers = - oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, - ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + oppContainerAllocator.allocateContainers( + request.getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), appAttemptId, oppCtx, + ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); // Create RMContainers and update the NMTokens. if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); + addToAllocatedContainers(allocateResponse, oppContainers); } + handleExecutionTypeUpdates(appAttempt, request, allocateResponse, + oppCtx, partitionedAsks.getGuaranteed()); + + + List promotedContainers = + appAttempt.pullContainersWithUpdatedExecType(); + addToUpdatedContainers(allocateResponse, + ContainerUpdateType.UPDATE_EXECUTION_TYPE, promotedContainers); + // Allocate GUARANTEED containers. request.setAskList(partitionedAsks.getGuaranteed()); - AllocateResponse allocateResp = super.allocate(request); - // Add allocated OPPORTUNISTIC containers to the AllocateResponse. - if (!oppContainers.isEmpty()) { - allocateResp.getAllocatedContainers().addAll(oppContainers); + super.allocate(appAttemptId, request, allocateResponse); + } + + private void handleExecutionTypeUpdates( + SchedulerApplicationAttempt appAttempt, AllocateRequest request, + AllocateResponse allocateResponse, OpportunisticContainerContext oppCntxt, + List existingGuaranteedReqs) { + List promotionRequests = new ArrayList<>(); + List demotionRequests = new ArrayList<>(); + + List updateContainerErrors = RMServerUtils + .validateAndSplitUpdateExecutionTypeRequests(rmContext, + request, promotionRequests, demotionRequests); + + if (!promotionRequests.isEmpty()) { + LOG.info("Promotion Update requests : " + promotionRequests); } + if (!demotionRequests.isEmpty()) { + LOG.info("Demotion Update requests : " + demotionRequests); + } + + List resourceReqsForPromotion = + createResourceReqsForPromotion(oppCntxt, promotionRequests, + updateContainerErrors); - // Update opportunistic container context with the allocated GUARANTEED - // containers. - oppCtx.updateCompletedContainers(allocateResp); + if (!resourceReqsForPromotion.isEmpty() && LOG.isDebugEnabled()) { + LOG.debug("Generated Resource Requests for promotion : " + + resourceReqsForPromotion); + } - // Add all opportunistic containers - return allocateResp; + handleDemotionRequests(appAttempt, demotionRequests, updateContainerErrors); + addToUpdateContainerErrors(allocateResponse, updateContainerErrors); + existingGuaranteedReqs.addAll(resourceReqsForPromotion); + } + + private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt, + List demotionRequests, + List updateContainerErrors) { + OpportunisticContainerContext oppCntxt = + appAttempt.getOpportunisticContainerContext(); + for (UpdateContainerRequest uReq : demotionRequests) { + RMContainer rmContainer = + rmContext.getScheduler().getRMContainer(uReq.getContainerId()); + if (rmContainer != null) { + if (oppCntxt.checkAndAddToOutstandingDemotions( + rmContainer.getContainer())) { + RMContainer demotedRMContainer = createDemotedRMContainer + (appAttempt, oppCntxt, rmContainer); + appAttempt.addToNewlyDemotedContainers( + uReq.getContainerId(), demotedRMContainer); + } else { + updateContainerErrors.add(UpdateContainerError.newInstance( + RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); + } + } else { + LOG.warn("Cannot demote non-existent (or completed) Container [" + + uReq.getContainerId() + "]"); + } + } + } + + private RMContainer createDemotedRMContainer( + SchedulerApplicationAttempt appAttempt, + OpportunisticContainerContext oppCntxt, + RMContainer rmContainer) { + SchedulerRequestKey sk = + SchedulerRequestKey.extractFrom(rmContainer.getContainer()); + Container demotedContainer = BuilderUtils.newContainer( + ContainerId.newContainerId(appAttempt.getApplicationAttemptId(), + oppCntxt.getContainerIdGenerator().generateContainerId()), + rmContainer.getContainer().getNodeId(), + rmContainer.getContainer().getNodeHttpAddress(), + rmContainer.getContainer().getResource(), + sk.getPriority(), null, ExecutionType.OPPORTUNISTIC, + sk.getAllocationRequestId()); + demotedContainer.setVersion(rmContainer.getContainer().getVersion()); + return createRmContainer(demotedContainer, false); + } + + public List createResourceReqsForPromotion( + OpportunisticContainerContext oppCntxt, + List updateContainerRequests, + List updateContainerErrors) { + List retList = new ArrayList<>(); + for (UpdateContainerRequest uReq : updateContainerRequests) { + RMContainer rmContainer = + rmContext.getScheduler().getRMContainer(uReq.getContainerId()); + // Check if this is a container update + // And not in the middle of a Demotion + if (rmContainer != null) { + // Check if this is an executionType change request + // If so, fix the rr to make it look like a normal rr + // with relaxLocality=false and numContainers=1 + SchedulerNode schedulerNode = rmContext.getScheduler() + .getSchedulerNode(rmContainer.getContainer().getNodeId()); + + // Add only if no outstanding promote requests exist. + SchedulerRequestKey schedulerKey = oppCntxt + .checkAndAddToOutstandingPromotions(rmContainer.getContainer()); + if (schedulerKey != null) { + long promotionReqId = schedulerKey.getAllocationRequestId(); + // Create a new Ask + retList.add( + createResourceReqForPromotion(promotionReqId, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, + rmContainer.getContainer().getNodeId().getHost())); + + // TODO: The below are also required now.. Since the Schedulers + // actually update demand only for * requests. + + // Add rack local ask + retList.add( + createResourceReqForPromotion(promotionReqId, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, schedulerNode.getRackName())); + + // Add ANY ask + retList.add( + createResourceReqForPromotion(promotionReqId, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, ResourceRequest.ANY)); + } else { + updateContainerErrors.add(UpdateContainerError.newInstance( + RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); + } + } else { + LOG.warn("Cannot promote non-existent (or completed) Container [" + + uReq.getContainerId() + "]"); + } + } + return retList; + } + + private static ResourceRequest createResourceReqForPromotion(long allocReqId, + ResourceRequest rr, RMContainer rmContainer, String resourceName) { + rr.setResourceName(resourceName); + rr.setNumContainers(1); + rr.setRelaxLocality(false); + rr.setPriority(rmContainer.getContainer().getPriority()); + rr.setAllocationRequestId(allocReqId); + rr.setCapability(rmContainer.getContainer().getResource()); + rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression()); + rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true)); + return rr; } @Override @@ -304,24 +461,31 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( } private void handleNewContainers(List allocContainers, - boolean isRemotelyAllocated) { + boolean isRemotelyAllocated) { for (Container container : allocContainers) { // Create RMContainer - SchedulerApplicationAttempt appAttempt = - ((AbstractYarnScheduler) rmContext.getScheduler()) - .getCurrentAttemptForContainer(container.getId()); - RMContainer rmContainer = new RMContainerImpl(container, - appAttempt.getApplicationAttemptId(), container.getNodeId(), - appAttempt.getUser(), rmContext, isRemotelyAllocated); - appAttempt.addRMContainer(container.getId(), rmContainer); - ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( - container.getNodeId()).allocateContainer(rmContainer); + RMContainer rmContainer = + createRmContainer(container, isRemotelyAllocated); rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.ACQUIRED)); } } + private RMContainer createRmContainer( + Container container, boolean isRemotelyAllocated) { + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getCurrentAttemptForContainer(container.getId()); + RMContainer rmContainer = new RMContainerImpl(container, + appAttempt.getApplicationAttemptId(), container.getNodeId(), + appAttempt.getUser(), rmContext, isRemotelyAllocated); + appAttempt.addRMContainer(container.getId(), rmContainer); + ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( + container.getNodeId()).allocateContainer(rmContainer); + return rmContainer; + } + @Override public void handle(SchedulerEvent event) { switch (event.getType()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index a0cdf68..e0c7676 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -80,7 +82,7 @@ */ public class RMServerUtils { - private static final String UPDATE_OUTSTANDING_ERROR = + public static final String UPDATE_OUTSTANDING_ERROR = "UPDATE_OUTSTANDING_ERROR"; private static final String INCORRECT_CONTAINER_VERSION_ERROR = "INCORRECT_CONTAINER_VERSION_ERROR"; @@ -123,6 +125,48 @@ } /** + * + * @param rmContext RM context + * @param request Allocate Request + * @param promoteExecTypeReqs Promotion requests + * @param demoteExecTypeReqs Demotion requests + * @return List of container update Errors + */ + public static List + validateAndSplitUpdateExecutionTypeRequests(RMContext rmContext, + AllocateRequest request, List promoteExecTypeReqs, + List demoteExecTypeReqs) { + List errors = new ArrayList<>(); + Set outstandingUpdate = new HashSet<>(); + for (UpdateContainerRequest updateReq : request.getUpdateRequests()) { + if (updateReq.getContainerUpdateType() == + ContainerUpdateType.UPDATE_EXECUTION_TYPE) { + RMContainer rmContainer = rmContext.getScheduler().getRMContainer( + updateReq.getContainerId()); + String msg = validateContainerIdAndVersion(outstandingUpdate, + updateReq, rmContainer); + if (msg == null) { + ExecutionType original = rmContainer.getExecutionType(); + ExecutionType target = updateReq.getExecutionType(); + if (target != original) { + if (target == ExecutionType.GUARANTEED && + original == ExecutionType.OPPORTUNISTIC) { + promoteExecTypeReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else if (target == ExecutionType.OPPORTUNISTIC && + original == ExecutionType.GUARANTEED) { + demoteExecTypeReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } + } + } + checkAndcreateUpdateError(errors, updateReq, msg); + } + } + return errors; + } + + /** * Check if we have: * - Request for same containerId and different target resource * - If targetResources violates maximum/minimumAllocation @@ -131,7 +175,7 @@ * @param maximumAllocation Maximum Allocation * @param increaseResourceReqs Increase Resource Request * @param decreaseResourceReqs Decrease Resource Request - * @return List of container Errors + * @return List of container update Errors */ public static List validateAndSplitUpdateResourceRequests(RMContext rmContext, @@ -141,59 +185,76 @@ List errors = new ArrayList<>(); Set outstandingUpdate = new HashSet<>(); for (UpdateContainerRequest updateReq : request.getUpdateRequests()) { - RMContainer rmContainer = rmContext.getScheduler().getRMContainer( - updateReq.getContainerId()); - String msg = null; - if (rmContainer == null) { - msg = INVALID_CONTAINER_ID; - } - // Only allow updates if the requested version matches the current - // version - if (msg == null && updateReq.getContainerVersion() != - rmContainer.getContainer().getVersion()) { - msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" - + updateReq.getContainerVersion() + "|" - + rmContainer.getContainer().getVersion(); - } - // No more than 1 container update per request. - if (msg == null && - outstandingUpdate.contains(updateReq.getContainerId())) { - msg = UPDATE_OUTSTANDING_ERROR; - } - if (msg == null) { - Resource original = rmContainer.getContainer().getResource(); - Resource target = updateReq.getCapability(); - if (Resources.fitsIn(target, original)) { - // This is a decrease request - if (validateIncreaseDecreaseRequest(rmContext, updateReq, - maximumAllocation, false)) { - decreaseResourceReqs.add(updateReq); - outstandingUpdate.add(updateReq.getContainerId()); + if (updateReq.getContainerUpdateType() != + ContainerUpdateType.UPDATE_EXECUTION_TYPE) { + RMContainer rmContainer = rmContext.getScheduler().getRMContainer( + updateReq.getContainerId()); + String msg = validateContainerIdAndVersion(outstandingUpdate, + updateReq, rmContainer); + if (msg == null) { + Resource original = rmContainer.getContainer().getResource(); + Resource target = updateReq.getCapability(); + if (Resources.fitsIn(target, original)) { + // This is a decrease request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, false)) { + decreaseResourceReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } } else { - msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; - } - } else { - // This is an increase request - if (validateIncreaseDecreaseRequest(rmContext, updateReq, - maximumAllocation, true)) { - increaseResourceReqs.add(updateReq); - outstandingUpdate.add(updateReq.getContainerId()); - } else { - msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + // This is an increase request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, true)) { + increaseResourceReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } } } - } - if (msg != null) { - UpdateContainerError updateError = RECORD_FACTORY - .newRecordInstance(UpdateContainerError.class); - updateError.setReason(msg); - updateError.setUpdateContainerRequest(updateReq); - errors.add(updateError); + checkAndcreateUpdateError(errors, updateReq, msg); } } return errors; } + private static void checkAndcreateUpdateError( + List errors, UpdateContainerRequest updateReq, + String msg) { + if (msg != null) { + UpdateContainerError updateError = RECORD_FACTORY + .newRecordInstance(UpdateContainerError.class); + updateError.setReason(msg); + updateError.setUpdateContainerRequest(updateReq); + errors.add(updateError); + } + } + + private static String validateContainerIdAndVersion( + Set outstandingUpdate, UpdateContainerRequest updateReq, + RMContainer rmContainer) { + String msg = null; + if (rmContainer == null) { + msg = INVALID_CONTAINER_ID; + } + // Only allow updates if the requested version matches the current + // version + if (msg == null && updateReq.getContainerVersion() != + rmContainer.getContainer().getVersion()) { + msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" + + updateReq.getContainerVersion() + "|" + + rmContainer.getContainer().getVersion(); + } + // No more than 1 container update per request. + if (msg == null && + outstandingUpdate.contains(updateReq.getContainerId())) { + msg = UPDATE_OUTSTANDING_ERROR; + } + return msg; + } + /** * Utility method to validate a list resource requests, by insuring that the * requested memory/vcore is non-negative and not greater than max diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index a244ad8..020764b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -25,14 +25,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 0cfa1d3..8824730 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode .RMNodeDecreaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -108,6 +108,8 @@ // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, RMContainerEventType.LAUNCHED) + .addTransition(RMContainerState.ACQUIRED, RMContainerState.ACQUIRED, + RMContainerEventType.ACQUIRED) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, RMContainerEventType.FINISHED, new FinishedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, @@ -125,6 +127,8 @@ .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, + RMContainerEventType.ACQUIRED) + .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.RESERVED, new ContainerReservedTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) @@ -163,13 +167,13 @@ private final WriteLock writeLock; private final ApplicationAttemptId appAttemptId; private final NodeId nodeId; - private final Container container; private final RMContext rmContext; private final EventHandler eventHandler; private final ContainerAllocationExpirer containerAllocationExpirer; private final String user; private final String nodeLabelExpression; + private Container container; private Resource reservedResource; private NodeId reservedNode; private SchedulerRequestKey reservedSchedulerKey; @@ -276,6 +280,10 @@ public Container getContainer() { return this.container; } + public synchronized void setContainer(Container container) { + this.container = container; + } + @Override public RMContainerState getState() { this.readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java index d7d1e94..80e7c0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java @@ -21,7 +21,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** * The event signifying that a container has been reserved. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index feb20ee..30f7ef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -965,6 +967,7 @@ public void recoverContainer(RMContainer rmContainer) { public ResourceRequest cloneResourceRequest(ResourceRequest request) { ResourceRequest newRequest = ResourceRequest.newBuilder() .priority(request.getPriority()) + .allocationRequestId(request.getAllocationRequestId()) .resourceName(request.getResourceName()) .capability(request.getCapability()) .numContainers(1) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b3ef471..f2cdc65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -47,14 +48,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -73,12 +75,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -135,6 +136,9 @@ private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); protected List newlyAllocatedContainers = new ArrayList<>(); + protected Map newlyPromotedContainers = new HashMap<>(); + protected Map newlyDemotedContainers = new HashMap<>(); + protected List tempContainerToKill = new ArrayList<>(); protected Map newlyDecreasedContainers = new HashMap<>(); protected Map newlyIncreasedContainers = new HashMap<>(); protected Set updatedNMTokens = new HashSet<>(); @@ -637,10 +641,10 @@ public Resource getCurrentConsumption() { } private Container updateContainerAndNMToken(RMContainer rmContainer, - boolean newContainer, boolean increasedContainer) { + ContainerUpdateType updateType) { Container container = rmContainer.getContainer(); ContainerType containerType = ContainerType.TASK; - if (!newContainer) { + if (updateType != null) { container.setVersion(container.getVersion() + 1); } // The working knowledge is that masterContainer for AM is null as it @@ -664,12 +668,14 @@ private Container updateContainerAndNMToken(RMContainer rmContainer, return null; } - if (newContainer) { + if (updateType == null || + ContainerUpdateType.UPDATE_EXECUTION_TYPE == updateType) { rmContainer.handle(new RMContainerEvent( rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); } else { rmContainer.handle(new RMContainerUpdatesAcquiredEvent( - rmContainer.getContainerId(), increasedContainer)); + rmContainer.getContainerId(), + ContainerUpdateType.INCREASE_RESOURCE == updateType)); } return container; } @@ -701,8 +707,8 @@ private void updateNMToken(Container container) { Iterator i = newlyAllocatedContainers.iterator(); while (i.hasNext()) { RMContainer rmContainer = i.next(); - Container updatedContainer = updateContainerAndNMToken(rmContainer, - true, false); + Container updatedContainer = + updateContainerAndNMToken(rmContainer, null); // Only add container to return list when it's not null. // updatedContainer could be null when generate token failed, it can be // caused by DNS resolving failed. @@ -715,9 +721,120 @@ private void updateNMToken(Container container) { } finally { writeLock.unlock(); } + } + public void addToNewlyDemotedContainers(ContainerId containerId, + RMContainer rmContainer) { + newlyDemotedContainers.put(containerId, rmContainer); } - + + protected synchronized void addToNewlyAllocatedContainers( + RMContainer rmContainer) { + if (oppContainerContext == null) { + newlyAllocatedContainers.add(rmContainer); + return; + } + ContainerId matchedContainerId = + oppContainerContext.matchContainerToOutstandingPromotionReq( + rmContainer.getContainer()); + if (matchedContainerId != null) { + if (OpportunisticContainerContext.UNDEFINED == matchedContainerId) { + // This is a spurious allocation (relaxLocality = false + // resulted in the Container being allocated on an NM on the same host + // but not on the NM running the container to be updated. Can + // happen if more than one NM exists on the same host.. usually + // occurs when using MiniYARNCluster to test). + tempContainerToKill.add(rmContainer); + } else { + newlyPromotedContainers.put(matchedContainerId, rmContainer); + } + } else { + newlyAllocatedContainers.add(rmContainer); + } + } + + /** + * A container is promoted if its executionType is changed from + * OPPORTUNISTIC to GUARANTEED. It id demoted if the change is from + * GUARANTEED to OPPORTUNISTIC. + * @return Newly Promoted and Demoted containers + */ + public List pullContainersWithUpdatedExecType() { + List updatedContainers = new ArrayList<>(); + if (oppContainerContext == null) { + return updatedContainers; + } + try { + writeLock.lock(); + for (Map newlyUpdatedContainers : + Arrays.asList(newlyPromotedContainers, newlyDemotedContainers)) { + Iterator> i = + newlyUpdatedContainers.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = i.next(); + ContainerId matchedContainerId = entry.getKey(); + RMContainer rmContainer = entry.getValue(); + + // swap containers + RMContainer existingRMContainer = swapContainer( + rmContainer, matchedContainerId); + oppContainerContext.completeContainerExecTypeUpdate( + existingRMContainer.getContainer()); + Container updatedContainer = updateContainerAndNMToken( + existingRMContainer, ContainerUpdateType.UPDATE_EXECUTION_TYPE); + updatedContainers.add(updatedContainer); + + // Mark container for release (set RRs to null, so RM does not think + // it is a recoverable container) + ((RMContainerImpl) rmContainer).setResourceRequests(null); + tempContainerToKill.add(rmContainer); + i.remove(); + } + } + // Release all temporary containers + Iterator tempIter = tempContainerToKill.iterator(); + while (tempIter.hasNext()) { + RMContainer c = tempIter.next(); + ((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c, + SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(), + SchedulerUtils.UPDATED_CONTAINER), + RMContainerEventType.KILL); + tempIter.remove(); + } + return updatedContainers; + } finally { + writeLock.unlock(); + } + } + + private RMContainer swapContainer(RMContainer rmContainer, ContainerId + matchedContainerId) { + RMContainer existingRMContainer = + getRMContainer(matchedContainerId); + if (existingRMContainer != null) { + // Swap updated container with the existing container + Container updatedContainer = rmContainer.getContainer(); + + Container newContainer = Container.newInstance(matchedContainerId, + existingRMContainer.getContainer().getNodeId(), + existingRMContainer.getContainer().getNodeHttpAddress(), + updatedContainer.getResource(), + existingRMContainer.getContainer().getPriority(), null, + updatedContainer.getExecutionType()); + newContainer.setAllocationRequestId( + existingRMContainer.getContainer().getAllocationRequestId()); + newContainer.setVersion(existingRMContainer.getContainer().getVersion()); + + rmContainer.getContainer().setResource( + existingRMContainer.getContainer().getResource()); + rmContainer.getContainer().setExecutionType( + existingRMContainer.getContainer().getExecutionType()); + + ((RMContainerImpl)existingRMContainer).setContainer(newContainer); + } + return existingRMContainer; + } + private List pullNewlyUpdatedContainers( Map updatedContainerMap, boolean increase) { try { @@ -730,7 +847,8 @@ private void updateNMToken(Container container) { while (i.hasNext()) { RMContainer rmContainer = i.next().getValue(); Container updatedContainer = updateContainerAndNMToken(rmContainer, - false, increase); + increase ? ContainerUpdateType.INCREASE_RESOURCE : + ContainerUpdateType.DECREASE_RESOURCE); if (updatedContainer != null) { returnContainerList.add(updatedContainer); i.remove(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 995a7b0..c1dca89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.ImmutableSet; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 6f905b9..57e5f27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -57,6 +57,9 @@ public static final String RELEASED_CONTAINER = "Container released by application"; + + public static final String UPDATED_CONTAINER = + "Temporary container killed by application for ExeutionType update"; public static final String LOST_CONTAINER = "Container released on a *lost* node"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 74a64c1..0dc527f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 3e8282f..c12bc6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java index 8b4907b..159fb09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java @@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** * Contexts for a container inside scheduler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 498f34f..604a92b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; @@ -69,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -553,7 +554,7 @@ public void apply(Resource cluster, // Update this application for the allocated container if (!allocation.isIncreasedAllocation()) { // Allocate a new container - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers(rmContainer); liveContainers.put(containerId, rmContainer); // Deduct pending resource requests diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index d79fcaf..344daf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 39f4a3d..7ecf8aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -454,7 +454,7 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers(rmContainer); liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index a27a222..d983ea0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import java.util.Collection; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index d275bfd..9d6fbb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -33,10 +33,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + import java.util.List; public class FifoAppAttempt extends FiCaSchedulerApp { @@ -74,7 +76,7 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node, updateAMContainerDiagnostics(AMState.ASSIGNED, null); // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers(rmContainer); ContainerId containerId = container.getId(); liveContainers.put(containerId, rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b5122c0..81c1f30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -77,7 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -709,7 +709,7 @@ private int assignContainer(FiCaSchedulerNode node, FifoAppAttempt application, // Inform the application RMContainer rmContainer = application.allocate(type, node, schedulerKey, request, container); - + // Inform the node node.allocateContainer(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java index f87f764..86843b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java @@ -21,7 +21,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import java.util.Iterator; import java.util.List; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index e70c3e0..3288d39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -61,7 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 593de08..fbeca7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -251,6 +251,13 @@ public AllocateResponse sendContainerResizingRequest( return allocate(req); } + public AllocateResponse sendContainerUpdateRequest( + List updateRequests) throws Exception { + final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null, + null, updateRequests); + return allocate(req); + } + public AllocateResponse allocate(AllocateRequest allocateRequest) throws Exception { UserGroupInformation ugi = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 32cdb1b..2d76127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -195,6 +195,12 @@ public NodeHeartbeatResponse nodeHeartbeat(Map updatedStats, boolean isHealthy) throws Exception { + return nodeHeartbeat(updatedStats, Collections.emptyList(), + isHealthy, ++responseId); + } + public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, List increasedConts, boolean isHealthy, int resId) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java index 35218bd..31b372e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .TestUtils; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 73d9e5c..8d45a51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -34,11 +34,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -64,8 +68,11 @@ import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -75,13 +82,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; /** @@ -91,8 +102,10 @@ private static final int GB = 1024; - @Test(timeout = 60000) - public void testNodeRemovalDuringAllocate() throws Exception { + private MockRM rm; + + @Before + public void createAndStartRM() { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); YarnConfiguration conf = new YarnConfiguration(csConf); @@ -102,8 +115,445 @@ public void testNodeRemovalDuringAllocate() throws Exception { YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setInt( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); - MockRM rm = new MockRM(conf); + rm = new MockRM(conf); rm.start(); + } + + @After + public void stopRM() { + if (rm != null) { + rm.stop(); + } + } + + @Test(timeout = 60000) + public void testContainerPromoteAndBeforeContainerStart() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h1:4321", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h2:4321", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); + RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode3) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode4) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode3)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode4)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode3)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode4)); + // All nodes 1 - 4 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + MockNM sameHostDiffNode = null; + for (NodeId n : nodes.keySet()) { + if (n.getHost().equals(allocNode.getNodeId().getHost()) && + n.getPort() != allocNode.getNodeId().getPort()) { + sameHostDiffNode = nodes.get(n); + } + } + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + // Node on same host should not result in allocation + sameHostDiffNode.nodeHeartbeat(true); + Thread.sleep(200); + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + // Send Promotion req again... this should result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Send Promotion req again with incorrect version... + // this should also result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(1, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Ensure after correct node heartbeats, we should get the allocation + allocNode.nodeHeartbeat(true); + Thread.sleep(200); + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + Container uc = + allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 1); + + // Verify Metrics After OPP allocation : + // Allocated cores+mem should have increased, available should decrease + verifyMetrics(metrics, 14336, 14, 2048, 2, 2); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + Thread.sleep(200); + + // Verify that the container is still in ACQUIRED state wrt the RM. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId()); + Assert.assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); + + // Now demote the container back.. + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(uc.getVersion(), + uc.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC))); + // This should happen in the same heartbeat.. + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + uc = allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 2); + + // Verify Metrics After OPP allocation : + // Everything should have reverted to what it was + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + } + + @Test(timeout = 60000) + public void testContainerPromoteAfterContainerStart() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + // Send Promotion req again... this should result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + Container uc = + allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 1); + + // Verify that the Container is still in RUNNING state wrt RM.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Verify Metrics After OPP allocation : + // Allocated cores+mem should have increased, available should decrease + verifyMetrics(metrics, 6144, 6, 2048, 2, 2); + } + + @Test(timeout = 600000) + public void testContainerPromoteAfterContainerComplete() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Container Completed in the NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), + true); + Thread.sleep(200); + + // Verify that container has been removed.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertNull(rmContainer); + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + // Send Promotion req... this should result in update error + // Since the container doesn't exist anymore.. + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + Assert.assertEquals(1, + allocateResponse.getCompletedContainersStatuses().size()); + Assert.assertEquals(container.getId(), + allocateResponse.getCompletedContainersStatuses().get(0) + .getContainerId()); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("INVALID_CONTAINER_ID", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + } + + private void verifyMetrics(QueueMetrics metrics, long availableMB, + int availableVirtualCores, long allocatedMB, + int allocatedVirtualCores, int allocatedContainers) { + Assert.assertEquals(availableMB, metrics.getAvailableMB()); + Assert.assertEquals(availableVirtualCores, metrics.getAvailableVirtualCores()); + Assert.assertEquals(allocatedMB, metrics.getAllocatedMB()); + Assert.assertEquals(allocatedVirtualCores, metrics.getAllocatedVirtualCores()); + Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers()); + } + + @Test(timeout = 60000) + public void testNodeRemovalDuringAllocate() throws Exception { MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); nm1.registerNode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 0281c19..981c292 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -36,8 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -68,7 +67,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 881004c..41079fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -35,8 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index 7f9c719..468e760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.junit.Assert; import org.junit.Test; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index 3cb668c..ff5dc02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.junit.After; import org.junit.Test; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 0aeedce..4b90ad0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -120,7 +120,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 2ce5fcb..c0fa43d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 3e05456..c9eb8b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index b982fab..e81ffbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 5964d2f..4cc99b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 61c5743..46187d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .TestUtils;