diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 3ec0899..d0c81d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -29,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -66,6 +68,14 @@ return client; } + @VisibleForTesting + @Public + public static AMRMClient createAMRMClient( + ApplicationMasterProtocol protocol) { + AMRMClient client = new AMRMClientImpl(protocol); + return client; + } + private NMTokenCache nmTokenCache; @Private @@ -392,6 +402,15 @@ public abstract void requestContainerResourceChange( Priority priority, String resourceName, Resource capability); + + /** + * Performs the same function as the above but also allows the user to + * specify an ExecutionType + * @return Collection of request matching the parameters + */ + public abstract List> getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + Resource capability); /** * Update application's blacklist with addition or removal resources. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 286ca28..0beb693 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; @@ -154,6 +155,13 @@ public void setHeartbeatInterval(int interval) { Resource capability) { return client.getMatchingRequests(priority, resourceName, capability); } + + public List> getMatchingRequests( + Priority priority, String resourceName, Resource capability, + ExecutionType executionType) { + return client.getMatchingRequests(priority, resourceName, executionType, + capability); + } /** * Registers this application master with the resource manager. On successful diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 46ddc4d..c623a4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; @@ -157,10 +158,8 @@ static boolean canFit(Resource arg0, Resource arg1) { //Value->Map //Key->Resource Capability //Value->ResourceRequest - protected final - Map>> - remoteRequestsTable = - new TreeMap>>(); + protected final Map>>> remoteRequestsTable = new TreeMap<>(); protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); @@ -185,6 +184,11 @@ public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); } + public AMRMClientImpl(ApplicationMasterProtocol protocol) { + super(AMRMClientImpl.class.getName()); + this.rmClient = protocol; + } + @Override protected void serviceInit(Configuration conf) throws Exception { RackResolver.init(conf); @@ -195,8 +199,10 @@ protected void serviceInit(Configuration conf) throws Exception { protected void serviceStart() throws Exception { final YarnConfiguration conf = new YarnConfiguration(getConfig()); try { - rmClient = - ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); + if (rmClient == null) { + rmClient = ClientRMProxy.createRMProxy( + conf, ApplicationMasterProtocol.class); + } } catch (IOException e) { throw new YarnRuntimeException(e); } @@ -263,7 +269,8 @@ public AllocateResponse allocate(float progressIndicator) // RPC layer is using it to send info across askList.add(ResourceRequest.newInstance(r.getPriority(), r.getResourceName(), r.getCapability(), r.getNumContainers(), - r.getRelaxLocality(), r.getNodeLabelExpression())); + r.getRelaxLocality(), r.getNodeLabelExpression(), + r.getExecutionType())); } List increaseList = new ArrayList<>(); List decreaseList = new ArrayList<>(); @@ -315,11 +322,15 @@ public AllocateResponse allocate(float progressIndicator) synchronized (this) { release.addAll(this.pendingRelease); blacklistAdditions.addAll(this.blacklistedNodes); - for (Map> rr : remoteRequestsTable - .values()) { - for (Map capabalities : rr.values()) { - for (ResourceRequestInfo request : capabalities.values()) { - addResourceRequestToAsk(request.remoteRequest); + for (Map>> rre : remoteRequestsTable.values()) { + for (Map> + execRr : rre.values()) { + for (Map capabalities : execRr + .values()) { + for (ResourceRequestInfo request : capabalities.values()) { + addResourceRequestToAsk(request.remoteRequest); + } } } } @@ -517,26 +528,27 @@ public synchronized void addContainerRequest(T req) { + joiner.join(req.getNodes())); } for (String node : dedupedNodes) { - addResourceRequest(req.getPriority(), node, req.getCapability(), req, - true, req.getNodeLabelExpression()); + addResourceRequest(req.getPriority(), node, req.getCapability(), + req.getExecutionType(), req, true, req.getNodeLabelExpression()); } } for (String rack : dedupedRacks) { - addResourceRequest(req.getPriority(), rack, req.getCapability(), req, - true, req.getNodeLabelExpression()); + addResourceRequest(req.getPriority(), rack, req.getCapability(), + req.getExecutionType(), req, true, req.getNodeLabelExpression()); } // Ensure node requests are accompanied by requests for // corresponding rack for (String rack : inferredRacks) { - addResourceRequest(req.getPriority(), rack, req.getCapability(), req, - req.getRelaxLocality(), req.getNodeLabelExpression()); + addResourceRequest(req.getPriority(), rack, req.getCapability(), + req.getExecutionType(), req, req.getRelaxLocality(), + req.getNodeLabelExpression()); } - // Off-switch addResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression()); + req.getCapability(), req.getExecutionType(), req, + req.getRelaxLocality(), req.getNodeLabelExpression()); } @Override @@ -552,16 +564,18 @@ public synchronized void removeContainerRequest(T req) { // Update resource requests if (req.getNodes() != null) { for (String node : new HashSet(req.getNodes())) { - decResourceRequest(req.getPriority(), node, req.getCapability(), req); + decResourceRequest(req.getPriority(), node, req.getCapability(), + req.getExecutionType(), req); } } for (String rack : allRacks) { - decResourceRequest(req.getPriority(), rack, req.getCapability(), req); + decResourceRequest(req.getPriority(), rack, req.getCapability(), + req.getExecutionType(), req); } decResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getCapability(), req); + req.getCapability(), req.getExecutionType(), req); } @Override @@ -601,47 +615,57 @@ public synchronized Resource getAvailableResources() { public synchronized int getClusterNodeCount() { return clusterNodeCount; } - + @Override public synchronized List> getMatchingRequests( - Priority priority, - String resourceName, - Resource capability) { + Priority priority, + String resourceName, + Resource capability) { + return getMatchingRequests(priority, resourceName, + ExecutionType.GUARANTEED, capability); + } + + @Override + public synchronized List> getMatchingRequests( + Priority priority, String resourceName, ExecutionType executionType, + Resource capability) { Preconditions.checkArgument(capability != null, "The Resource to be requested should not be null "); Preconditions.checkArgument(priority != null, "The priority at which to request containers should not be null "); List> list = new LinkedList>(); - Map> remoteRequests = - this.remoteRequestsTable.get(priority); + Map>> + remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { return list; } - TreeMap reqMap = remoteRequests - .get(resourceName); - if (reqMap == null) { + Map> reqExecMap = + remoteRequests.get(resourceName); + if (reqExecMap == null) { return list; } + TreeMap reqMap = + reqExecMap.get(executionType); ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); if (resourceRequestInfo != null && !resourceRequestInfo.containerRequests.isEmpty()) { list.add(resourceRequestInfo.containerRequests); return list; } - + // no exact match. Container may be larger than what was requested. - // get all resources <= capability. map is reverse sorted. - SortedMap tailMap = - reqMap.tailMap(capability); - for(Map.Entry entry : tailMap.entrySet()) { + // get all resources <= capability. map is reverse sorted. + SortedMap tailMap = + reqMap.tailMap(capability); + for (Map.Entry entry : + tailMap.entrySet()) { if (canFit(entry.getKey(), capability) && !entry.getValue().containerRequests.isEmpty()) { // match found that fits in the larger resource list.add(entry.getValue().containerRequests); } } - // no match found return list; } @@ -663,23 +687,28 @@ public synchronized int getClusterNodeCount() { return racks; } - + /** * ContainerRequests with locality relaxation cannot be made at the same * priority as ContainerRequests without locality relaxation. */ private void checkLocalityRelaxationConflict(Priority priority, Collection locations, boolean relaxLocality) { - Map> remoteRequests = - this.remoteRequestsTable.get(priority); + Map>> + remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { return; } // Locality relaxation will be set to relaxLocality for all implicitly // requested racks. Make sure that existing rack requests match this. for (String location : locations) { - TreeMap reqs = - remoteRequests.get(location); + Map> reqExecMap = + remoteRequests.get(location); + if (reqExecMap == null) { + // Should not happen.. + continue; + } + for (TreeMap reqs : reqExecMap.values()) { if (reqs != null && !reqs.isEmpty()) { boolean existingRelaxLocality = reqs.values().iterator().next().remoteRequest.getRelaxLocality(); @@ -687,10 +716,12 @@ private void checkLocalityRelaxationConflict(Priority priority, throw new InvalidContainerRequestException("Cannot submit a " + "ContainerRequest asking for location " + location + " with locality relaxation " + relaxLocality + " when it has " - + "already been requested with locality relaxation " + existingRelaxLocality); + + "already been requested with locality relaxation " + + existingRelaxLocality); } } } + } } /** @@ -747,27 +778,31 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { ask.add(remoteRequest); } - private void - addResourceRequest(Priority priority, String resourceName, - Resource capability, T req, boolean relaxLocality, - String labelExpression) { - Map> remoteRequests = - this.remoteRequestsTable.get(priority); + private void addResourceRequest(Priority priority, String resourceName, + Resource capability, ExecutionType execType, T req, + boolean relaxLocality, String labelExpression) { + Map>> + remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { - remoteRequests = - new HashMap>(); + remoteRequests = new HashMap<>(); this.remoteRequestsTable.put(priority, remoteRequests); if (LOG.isDebugEnabled()) { LOG.debug("Added priority=" + priority); } } - TreeMap reqMap = - remoteRequests.get(resourceName); + Map> reqExecMap = + remoteRequests.get(resourceName); + if (reqExecMap == null) { + reqExecMap = new HashMap<>(); + remoteRequests.put(resourceName, reqExecMap); + } + + TreeMap reqMap = reqExecMap.get(execType); if (reqMap == null) { // capabilities are stored in reverse sorted order. smallest last. reqMap = new TreeMap( new ResourceReverseMemoryThenCpuComparator()); - remoteRequests.put(resourceName, reqMap); + reqExecMap.put(execType, reqMap); } ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); if (resourceRequestInfo == null) { @@ -776,7 +811,8 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { relaxLocality); reqMap.put(capability, resourceRequestInfo); } - + + resourceRequestInfo.remoteRequest.setExecutionType(execType); resourceRequestInfo.remoteRequest.setNumContainers( resourceRequestInfo.remoteRequest.getNumContainers() + 1); @@ -802,10 +838,11 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { private void decResourceRequest(Priority priority, String resourceName, - Resource capability, + Resource capability, + ExecutionType execType, T req) { - Map> remoteRequests = - this.remoteRequestsTable.get(priority); + Map>> + remoteRequests = this.remoteRequestsTable.get(priority); if(remoteRequests == null) { if (LOG.isDebugEnabled()) { @@ -815,14 +852,25 @@ private void decResourceRequest(Priority priority, return; } - Map reqMap = remoteRequests.get(resourceName); - if (reqMap == null) { + Map> reqExecMap = + remoteRequests.get(resourceName); + if (reqExecMap == null) { if (LOG.isDebugEnabled()) { LOG.debug("Not decrementing resource as " + resourceName + " is not present in request table"); } return; } + + TreeMap reqMap = reqExecMap.get(execType); + if (reqMap == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not decrementing resource as " + execType + + " is not present in request table"); + } + return; + } + ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java new file mode 100644 index 0000000..83130a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +/** + * Base test case to be used for Testing frameworks that use AMRMProxy + */ +public abstract class BaseAMRMProxyE2ETest { + + protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, + ApplicationId appId, MiniYARNCluster cluster, + final Configuration yarnConf) + throws IOException, InterruptedException, YarnException { + + UserGroupInformation user = null; + + // Get the AMRMToken from AMRMProxy + + ApplicationReport report = rmClient.getApplicationReport(appId); + + user = UserGroupInformation.createProxyUser( + report.getCurrentApplicationAttemptId().toString(), + UserGroupInformation.getCurrentUser()); + + ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster + .getNodeManager(0).getNMContext().getContainerManager(); + + AMRMProxyTokenSecretManager amrmTokenSecretManager = + containerManager.getAMRMProxyService().getSecretManager(); + org.apache.hadoop.security.token.Token token = + amrmTokenSecretManager + .createAndGetAMRMToken(report.getCurrentApplicationAttemptId()); + + SecurityUtil.setTokenService(token, + containerManager.getAMRMProxyService().getBindAddress()); + user.addToken(token); + + // Start Application Master + + return user + .doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(yarnConf, + ApplicationMasterProtocol.class); + } + }); + } + + protected AllocateRequest createAllocateRequest(List listNode) { + // The test needs AMRMClient to create a real allocate request + AMRMClientImpl amClient = + new AMRMClientImpl<>(); + + Resource capability = Resource.newInstance(1024, 2); + Priority priority = Priority.newInstance(1); + List nodeReports = listNode; + String node = nodeReports.get(0).getNodeId().getHost(); + String[] nodes = new String[] { node }; + + AMRMClient.ContainerRequest storedContainer1 = + new AMRMClient.ContainerRequest(capability, nodes, null, priority); + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer1); + + List resourceAsk = new ArrayList<>(); + for (ResourceRequest rr : amClient.ask) { + resourceAsk.add(rr); + } + + ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest + .newInstance(new ArrayList<>(), new ArrayList<>()); + + int responseId = 1; + + return AllocateRequest.newInstance(responseId, 0, resourceAsk, + new ArrayList<>(), resourceBlacklistRequest); + } + + protected ApplicationAttemptId createApp(YarnClient yarnClient, + MiniYARNCluster yarnCluster, Configuration conf) throws Exception { + + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); + + appContext.setApplicationName("Test"); + + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + + appContext.setQueue("default"); + + ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext( + Collections. emptyMap(), + new HashMap(), Arrays.asList("sleep", "10000"), + new HashMap(), null, + new HashMap()); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(Resource.newInstance(1024, 1)); + + SubmitApplicationRequest appRequest = + Records.newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + + yarnClient.submitApplication(appContext); + + RMAppAttempt appAttempt = null; + ApplicationAttemptId attemptId = null; + while (true) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport + .getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + attemptId = + appReport.getCurrentApplicationAttemptId(); + appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } + break; + } + } + Thread.sleep(1000); + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + + // emulate RM setup of AMRM token in credentials by adding the token + // *before* setting the token service + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + appAttempt.getAMRMToken().setService( + ClientRMProxy.getAMRMTokenService(conf)); + return attemptId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 75b49d0..8981100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NMToken; @@ -414,10 +415,12 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { // test addition and storage int containersRequestedAny = amClient.remoteRequestsTable.get(priority) - .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED).get(capability) + .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedAny); containersRequestedAny = amClient.remoteRequestsTable.get(priority1) - .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED).get(capability) + .remoteRequest.getNumContainers(); assertEquals(1, containersRequestedAny); List> matches = amClient.getMatchingRequests(priority, node, capability); @@ -920,11 +923,14 @@ private void testAllocation(final AMRMClientImpl amClient) new ContainerRequest(capability, nodes, racks, priority)); int containersRequestedNode = amClient.remoteRequestsTable.get(priority) - .get(node).get(capability).remoteRequest.getNumContainers(); + .get(node).get(ExecutionType.GUARANTEED).get(capability) + .remoteRequest.getNumContainers(); int containersRequestedRack = amClient.remoteRequestsTable.get(priority) - .get(rack).get(capability).remoteRequest.getNumContainers(); + .get(rack).get(ExecutionType.GUARANTEED).get(capability) + .remoteRequest.getNumContainers(); int containersRequestedAny = amClient.remoteRequestsTable.get(priority) - .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED).get(capability) + .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedNode); assertEquals(2, containersRequestedRack); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index cb8c86a..45ae983 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -26,26 +26,67 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; +import org.junit.Assert; import org.junit.Test; public class TestAMRMClientContainerRequest { + + @Test + public void testOpportunisticAndGuaranteedUniquePriority() { + AMRMClientImpl client = + new AMRMClientImpl(); + + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + client.init(conf); + + Resource capability = Resource.newInstance(1024, 1); + ContainerRequest request = + new ContainerRequest(capability, new String[] {"host1", "host2"}, + new String[] {"/rack2"}, Priority.newInstance(1)); + client.addContainerRequest(request); + verifyResourceRequest(client, request, "host1", true); + verifyResourceRequest(client, request, "host2", true); + verifyResourceRequest(client, request, "/rack1", true); + verifyResourceRequest(client, request, "/rack2", true); + verifyResourceRequest(client, request, ResourceRequest.ANY, true); + ContainerRequest request2 = + new ContainerRequest(capability, new String[] {"host1", "host2"}, + new String[] {"/rack2"}, Priority.newInstance(1), true, null, + ExecutionType.OPPORTUNISTIC); + client.addContainerRequest(request2); + verifyResourceRequest(client, request, "host1", true, + ExecutionType.OPPORTUNISTIC); + verifyResourceRequest(client, request, "host2", true, + ExecutionType.OPPORTUNISTIC); + verifyResourceRequest(client, request, "/rack1", true, + ExecutionType.OPPORTUNISTIC); + verifyResourceRequest(client, request, "/rack2", true, + ExecutionType.OPPORTUNISTIC); + verifyResourceRequest(client, request, ResourceRequest.ANY, true, + ExecutionType.OPPORTUNISTIC); + } + @Test public void testFillInRacks() { AMRMClientImpl client = new AMRMClientImpl(); - + Configuration conf = new Configuration(); conf.setClass( CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); client.init(conf); - + Resource capability = Resource.newInstance(1024, 1); ContainerRequest request = new ContainerRequest(capability, new String[] {"host1", "host2"}, @@ -57,7 +98,7 @@ public void testFillInRacks() { verifyResourceRequest(client, request, "/rack2", true); verifyResourceRequest(client, request, ResourceRequest.ANY, true); } - + @Test public void testDisableLocalityRelaxation() { AMRMClientImpl client = @@ -67,7 +108,7 @@ public void testDisableLocalityRelaxation() { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); client.init(conf); - + Resource capability = Resource.newInstance(1024, 1); ContainerRequest nodeLevelRequest = new ContainerRequest(capability, new String[] {"host1", "host2"}, @@ -78,30 +119,30 @@ public void testDisableLocalityRelaxation() { verifyResourceRequest(client, nodeLevelRequest, "/rack1", false); verifyResourceRequest(client, nodeLevelRequest, "host1", true); verifyResourceRequest(client, nodeLevelRequest, "host2", true); - + // Make sure we don't get any errors with two node-level requests at the // same priority ContainerRequest nodeLevelRequest2 = new ContainerRequest(capability, new String[] {"host2", "host3"}, null, Priority.newInstance(1), false); client.addContainerRequest(nodeLevelRequest2); - + AMRMClient.ContainerRequest rackLevelRequest = new AMRMClient.ContainerRequest(capability, null, new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), false); client.addContainerRequest(rackLevelRequest); - + verifyResourceRequest(client, rackLevelRequest, ResourceRequest.ANY, false); verifyResourceRequest(client, rackLevelRequest, "/rack3", true); verifyResourceRequest(client, rackLevelRequest, "/rack4", true); - + // Make sure we don't get any errors with two rack-level requests at the // same priority AMRMClient.ContainerRequest rackLevelRequest2 = new AMRMClient.ContainerRequest(capability, null, new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), false); client.addContainerRequest(rackLevelRequest2); - + ContainerRequest bothLevelRequest = new ContainerRequest(capability, new String[] {"host3", "host4"}, new String[] {"rack1", "/otherrack"}, @@ -115,7 +156,7 @@ public void testDisableLocalityRelaxation() { true); verifyResourceRequest(client, bothLevelRequest, "host3", true); verifyResourceRequest(client, bothLevelRequest, "host4", true); - + // Make sure we don't get any errors with two both-level requests at the // same priority ContainerRequest bothLevelRequest2 = @@ -124,7 +165,7 @@ public void testDisableLocalityRelaxation() { Priority.newInstance(3), false); client.addContainerRequest(bothLevelRequest2); } - + @Test (expected = InvalidContainerRequestException.class) public void testDifferentLocalityRelaxationSamePriority() { AMRMClientImpl client = @@ -134,7 +175,7 @@ public void testDifferentLocalityRelaxationSamePriority() { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); client.init(conf); - + Resource capability = Resource.newInstance(1024, 1); ContainerRequest request1 = new ContainerRequest(capability, new String[] {"host1", "host2"}, @@ -145,7 +186,7 @@ public void testDifferentLocalityRelaxationSamePriority() { null, Priority.newInstance(1), true); client.addContainerRequest(request2); } - + @Test public void testInvalidValidWhenOldRemoved() { AMRMClientImpl client = @@ -155,36 +196,36 @@ public void testInvalidValidWhenOldRemoved() { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); client.init(conf); - + Resource capability = Resource.newInstance(1024, 1); ContainerRequest request1 = new ContainerRequest(capability, new String[] {"host1", "host2"}, null, Priority.newInstance(1), false); client.addContainerRequest(request1); - + client.removeContainerRequest(request1); ContainerRequest request2 = new ContainerRequest(capability, new String[] {"host3"}, null, Priority.newInstance(1), true); client.addContainerRequest(request2); - + client.removeContainerRequest(request2); - + ContainerRequest request3 = new ContainerRequest(capability, new String[] {"host1", "host2"}, null, Priority.newInstance(1), false); client.addContainerRequest(request3); - + client.removeContainerRequest(request3); - + ContainerRequest request4 = new ContainerRequest(capability, null, new String[] {"rack1"}, Priority.newInstance(1), true); client.addContainerRequest(request4); } - + @Test (expected = InvalidContainerRequestException.class) public void testLocalityRelaxationDifferentLevels() { AMRMClientImpl client = @@ -194,7 +235,7 @@ public void testLocalityRelaxationDifferentLevels() { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); client.init(conf); - + Resource capability = Resource.newInstance(1024, 1); ContainerRequest request1 = new ContainerRequest(capability, new String[] {"host1", "host2"}, @@ -205,7 +246,7 @@ public void testLocalityRelaxationDifferentLevels() { new String[] {"rack1"}, Priority.newInstance(1), true); client.addContainerRequest(request2); } - + private static class MyResolver implements DNSToSwitchMapping { @Override @@ -220,12 +261,21 @@ public void reloadCachedMappings() {} public void reloadCachedMappings(List names) { } } - + private void verifyResourceRequest( AMRMClientImpl client, ContainerRequest request, String location, boolean expectedRelaxLocality) { - ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority()) - .get(location).get(request.getCapability()).remoteRequest; + verifyResourceRequest(client, request, location, expectedRelaxLocality, + ExecutionType.GUARANTEED); + } + + private void verifyResourceRequest( + AMRMClientImpl client, ContainerRequest request, + String location, boolean expectedRelaxLocality, + ExecutionType executionType) { + ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority()) + .get(location).get(executionType).get(request.getCapability()) + .remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(1, ask.getNumContainers()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java index f1e3f03..394ef5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java @@ -19,20 +19,12 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -40,43 +32,22 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.ClientRMProxy; -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; -public class TestAMRMProxy { +public class TestAMRMProxy extends BaseAMRMProxyE2ETest { private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class); @@ -107,7 +78,8 @@ public void testAMRMProxyE2E() throws Exception { // Submit application - ApplicationId appId = createApp(rmClient, cluster); + ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf); + ApplicationId appId = appAttmptId.getApplicationId(); client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); @@ -201,7 +173,8 @@ public void testE2ETokenRenewal() throws Exception { // Submit - ApplicationId appId = createApp(rmClient, cluster); + ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf); + ApplicationId appId = appAttmptId.getApplicationId(); client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); @@ -270,7 +243,8 @@ public void testE2ETokenSwap() throws Exception { rmClient.init(yarnConf); rmClient.start(); - ApplicationId appId = createApp(rmClient, cluster); + ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf); + ApplicationId appId = appAttmptId.getApplicationId(); client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); @@ -290,124 +264,4 @@ public void testE2ETokenSwap() throws Exception { cluster.stop(); } } - - protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, - ApplicationId appId, MiniYARNCluster cluster, - final Configuration yarnConf) - throws IOException, InterruptedException, YarnException { - - UserGroupInformation user = null; - - // Get the AMRMToken from AMRMProxy - - ApplicationReport report = rmClient.getApplicationReport(appId); - - user = UserGroupInformation.createProxyUser( - report.getCurrentApplicationAttemptId().toString(), - UserGroupInformation.getCurrentUser()); - - ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster - .getNodeManager(0).getNMContext().getContainerManager(); - - AMRMProxyTokenSecretManager amrmTokenSecretManager = - containerManager.getAMRMProxyService().getSecretManager(); - org.apache.hadoop.security.token.Token token = - amrmTokenSecretManager - .createAndGetAMRMToken(report.getCurrentApplicationAttemptId()); - - SecurityUtil.setTokenService(token, - containerManager.getAMRMProxyService().getBindAddress()); - user.addToken(token); - - // Start Application Master - - return user - .doAs(new PrivilegedExceptionAction() { - @Override - public ApplicationMasterProtocol run() throws Exception { - return ClientRMProxy.createRMProxy(yarnConf, - ApplicationMasterProtocol.class); - } - }); - } - - protected AllocateRequest createAllocateRequest(List listNode) { - // The test needs AMRMClient to create a real allocate request - AMRMClientImpl amClient = - new AMRMClientImpl(); - - Resource capability = Resource.newInstance(1024, 2); - Priority priority = Priority.newInstance(1); - List nodeReports = listNode; - String node = nodeReports.get(0).getNodeId().getHost(); - String[] nodes = new String[] { node }; - - ContainerRequest storedContainer1 = - new ContainerRequest(capability, nodes, null, priority); - amClient.addContainerRequest(storedContainer1); - amClient.addContainerRequest(storedContainer1); - - List resourceAsk = new ArrayList(); - for (ResourceRequest rr : amClient.ask) { - resourceAsk.add(rr); - } - - ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest - .newInstance(new ArrayList(), new ArrayList()); - - int responseId = 1; - - return AllocateRequest.newInstance(responseId, 0, resourceAsk, - new ArrayList(), resourceBlacklistRequest); - } - - protected ApplicationId createApp(YarnClient yarnClient, - MiniYARNCluster yarnCluster) throws Exception { - - ApplicationSubmissionContext appContext = - yarnClient.createApplication().getApplicationSubmissionContext(); - ApplicationId appId = appContext.getApplicationId(); - - appContext.setApplicationName("Test"); - - Priority pri = Records.newRecord(Priority.class); - pri.setPriority(0); - appContext.setPriority(pri); - - appContext.setQueue("default"); - - ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext( - Collections. emptyMap(), - new HashMap(), Arrays.asList("sleep", "10000"), - new HashMap(), null, - new HashMap()); - appContext.setAMContainerSpec(amContainer); - appContext.setResource(Resource.newInstance(1024, 1)); - - SubmitApplicationRequest appRequest = - Records.newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); - - yarnClient.submitApplication(appContext); - - RMAppAttempt appAttempt = null; - while (true) { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - if (appReport - .getYarnApplicationState() == YarnApplicationState.ACCEPTED) { - ApplicationAttemptId attemptId = - appReport.getCurrentApplicationAttemptId(); - appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps() - .get(attemptId.getApplicationId()).getCurrentAppAttempt(); - while (true) { - if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { - break; - } - } - break; - } - } - Thread.sleep(1000); - return appId; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index b4dcf66..c49c719 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,19 +22,30 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; @@ -42,12 +53,23 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; import org.junit.Assert; -import org.junit.Ignore; +import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Validates End2End Distributed Scheduling flow which includes the AM @@ -56,11 +78,69 @@ * the NM and the DistributedSchedulingProtocol used by the framework to talk * to the DistributedSchedulingService running on the RM. */ -public class TestDistributedScheduling extends TestAMRMProxy { +public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { private static final Log LOG = LogFactory.getLog(TestDistributedScheduling.class); + protected MiniYARNCluster cluster; + protected YarnClient rmClient; + protected ApplicationMasterProtocol client; + protected Configuration conf; + protected Configuration yarnConf; + protected ApplicationAttemptId attemptId; + protected ApplicationId appId; + + @Before + public void doBefore() throws Exception { + cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1); + + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); + cluster.init(conf); + cluster.start(); + yarnConf = cluster.getConfig(); + + // the client has to connect to AMRMProxy + yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); + rmClient = YarnClient.createYarnClient(); + rmClient.init(yarnConf); + rmClient.start(); + + // Submit application + attemptId = createApp(rmClient, cluster, conf); + appId = attemptId.getApplicationId(); + client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); + } + + @After + public void doAfter() throws Exception { + if (client != null) { + try { + client.finishApplicationMaster(FinishApplicationMasterRequest + .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); + rmClient.killApplication(attemptId.getApplicationId()); + attemptId = null; + } catch (Exception e) { + } + } + if (rmClient != null) { + try { + rmClient.stop(); + } catch (Exception e) { + } + } + if (cluster != null) { + try { + cluster.stop(); + } catch (Exception e) { + } + } + } + /** * Validates if Allocate Requests containing only OPPORTUNISTIC container * requests are satisfied instantly. @@ -69,239 +149,359 @@ */ @Test(timeout = 60000) public void testOpportunisticExecutionTypeRequestE2E() throws Exception { - MiniYARNCluster cluster = - new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1); - YarnClient rmClient = null; - ApplicationMasterProtocol client; - try { - Configuration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); - conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); - cluster.init(conf); - cluster.start(); - final Configuration yarnConf = cluster.getConfig(); - - // the client has to connect to AMRMProxy - - yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); - rmClient = YarnClient.createYarnClient(); - rmClient.init(yarnConf); - rmClient.start(); - - // Submit application - - ApplicationId appId = createApp(rmClient, cluster); - - client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); - - LOG.info("testDistributedSchedulingE2E - Register"); - - RegisterApplicationMasterResponse responseRegister = - client.registerApplicationMaster(RegisterApplicationMasterRequest - .newInstance(NetUtils.getHostname(), 1024, "")); - - Assert.assertNotNull(responseRegister); - Assert.assertNotNull(responseRegister.getQueue()); - Assert.assertNotNull(responseRegister.getApplicationACLs()); - Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); - Assert - .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); - Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); - Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); - - RMApp rmApp = - cluster.getResourceManager().getRMContext().getRMApps().get(appId); - Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); - - LOG.info("testDistributedSchedulingE2E - Allocate"); - - AllocateRequest request = - createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); - - // Replace 'ANY' requests with OPPORTUNISTIC aks and remove - // everything else - List newAskList = new ArrayList<>(); - for (ResourceRequest rr : request.getAskList()) { - if (ResourceRequest.ANY.equals(rr.getResourceName())) { - ResourceRequest newRR = ResourceRequest.newInstance(rr - .getPriority(), rr.getResourceName(), - rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); - newAskList.add(newRR); - } + LOG.info("testDistributedSchedulingE2E - Register"); + + RegisterApplicationMasterResponse responseRegister = + client.registerApplicationMaster(RegisterApplicationMasterRequest + .newInstance(NetUtils.getHostname(), 1024, "")); + + Assert.assertNotNull(responseRegister); + Assert.assertNotNull(responseRegister.getQueue()); + Assert.assertNotNull(responseRegister.getApplicationACLs()); + Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); + Assert + .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); + Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); + Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); + + RMApp rmApp = + cluster.getResourceManager().getRMContext().getRMApps().get(appId); + Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + + LOG.info("testDistributedSchedulingE2E - Allocate"); + + AllocateRequest request = + createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); + + // Replace 'ANY' requests with OPPORTUNISTIC aks and remove + // everything else + List newAskList = new ArrayList<>(); + for (ResourceRequest rr : request.getAskList()) { + if (ResourceRequest.ANY.equals(rr.getResourceName())) { + ResourceRequest newRR = ResourceRequest.newInstance(rr + .getPriority(), rr.getResourceName(), + rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); + newAskList.add(newRR); } - request.setAskList(newAskList); - - AllocateResponse allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - - // Ensure that all the requests are satisfied immediately - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - - // Verify that the allocated containers are OPPORTUNISTIC - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - containerTokenIdentifier.getExecutionType()); + } + request.setAskList(newAskList); + + AllocateResponse allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + + // Ensure that all the requests are satisfied immediately + Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); + + // Verify that the allocated containers are OPPORTUNISTIC + for (Container allocatedContainer : allocResponse + .getAllocatedContainers()) { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier( + allocatedContainer.getContainerToken()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + containerTokenIdentifier.getExecutionType()); + } + + LOG.info("testDistributedSchedulingE2E - Finish"); + } + + /** + * Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC + * container requests works as expected. + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testMixedExecutionTypeRequestE2E() throws Exception { + LOG.info("testDistributedSchedulingE2E - Register"); + + RegisterApplicationMasterResponse responseRegister = + client.registerApplicationMaster(RegisterApplicationMasterRequest + .newInstance(NetUtils.getHostname(), 1024, "")); + + Assert.assertNotNull(responseRegister); + Assert.assertNotNull(responseRegister.getQueue()); + Assert.assertNotNull(responseRegister.getApplicationACLs()); + Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); + Assert + .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); + Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); + Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); + + RMApp rmApp = + cluster.getResourceManager().getRMContext().getRMApps().get(appId); + Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + + LOG.info("testDistributedSchedulingE2E - Allocate"); + + AllocateRequest request = + createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); + List askList = request.getAskList(); + List newAskList = new ArrayList<>(askList); + + // Duplicate all ANY requests marking them as opportunistic + for (ResourceRequest rr : askList) { + if (ResourceRequest.ANY.equals(rr.getResourceName())) { + ResourceRequest newRR = ResourceRequest.newInstance(rr + .getPriority(), rr.getResourceName(), + rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); + newAskList.add(newRR); } + } + request.setAskList(newAskList); + + AllocateResponse allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + + // Ensure that all the requests are satisfied immediately + Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); + + // Verify that the allocated containers are OPPORTUNISTIC + for (Container allocatedContainer : allocResponse + .getAllocatedContainers()) { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier( + allocatedContainer.getContainerToken()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + containerTokenIdentifier.getExecutionType()); + } - LOG.info("testDistributedSchedulingE2E - Finish"); + request.setAskList(new ArrayList()); + request.setResponseId(request.getResponseId() + 1); - FinishApplicationMasterResponse responseFinish = - client.finishApplicationMaster(FinishApplicationMasterRequest - .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); + Thread.sleep(1000); - Assert.assertNotNull(responseFinish); + // RM should allocate GUARANTEED containers within 2 calls to allocate() + allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - } finally { - if (rmClient != null) { - rmClient.stop(); - } - cluster.stop(); + // Verify that the allocated containers are GUARANTEED + for (Container allocatedContainer : allocResponse + .getAllocatedContainers()) { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier( + allocatedContainer.getContainerToken()); + Assert.assertEquals(ExecutionType.GUARANTEED, + containerTokenIdentifier.getExecutionType()); } + + LOG.info("testDistributedSchedulingE2E - Finish"); } + /** - * Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC - * container requests works as expected. + * Validates if AMRMClient can be used with Distributed Scheduling turned on * * @throws Exception */ @Test(timeout = 60000) - public void testMixedExecutionTypeRequestE2E() throws Exception { - MiniYARNCluster cluster = - new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1); - YarnClient rmClient = null; - ApplicationMasterProtocol client; - + @SuppressWarnings("unchecked") + public void testAMRMClient() throws Exception { + AMRMClientImpl amClient = null; try { - Configuration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); - conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); - cluster.init(conf); - cluster.start(); - final Configuration yarnConf = cluster.getConfig(); - - // the client has to connect to AMRMProxy - - yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); - rmClient = YarnClient.createYarnClient(); - rmClient.init(yarnConf); - rmClient.start(); - - // Submit application - - ApplicationId appId = createApp(rmClient, cluster); - - client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); - - LOG.info("testDistributedSchedulingE2E - Register"); - - RegisterApplicationMasterResponse responseRegister = - client.registerApplicationMaster(RegisterApplicationMasterRequest - .newInstance(NetUtils.getHostname(), 1024, "")); - - Assert.assertNotNull(responseRegister); - Assert.assertNotNull(responseRegister.getQueue()); - Assert.assertNotNull(responseRegister.getApplicationACLs()); - Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); - Assert - .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); - Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); - Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); - - RMApp rmApp = - cluster.getResourceManager().getRMContext().getRMApps().get(appId); - Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); - - LOG.info("testDistributedSchedulingE2E - Allocate"); - - AllocateRequest request = - createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); - List askList = request.getAskList(); - List newAskList = new ArrayList<>(askList); - - // Duplicate all ANY requests marking them as opportunistic - for (ResourceRequest rr : askList) { - if (ResourceRequest.ANY.equals(rr.getResourceName())) { - ResourceRequest newRR = ResourceRequest.newInstance(rr - .getPriority(), rr.getResourceName(), - rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); - newAskList.add(newRR); - } - } - request.setAskList(newAskList); - - AllocateResponse allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - - // Ensure that all the requests are satisfied immediately - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - - // Verify that the allocated containers are OPPORTUNISTIC - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - containerTokenIdentifier.getExecutionType()); - } - - request.setAskList(new ArrayList()); - request.setResponseId(request.getResponseId() + 1); - Thread.sleep(1000); + Priority priority = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + Resource capability = Resource.newInstance(1024, 1); + + List nodeReports = rmClient.getNodeReports(NodeState.RUNNING); + String node = nodeReports.get(0).getNodeId().getHost(); + String rack = nodeReports.get(0).getRackName(); + String[] nodes = new String[]{node}; + String[] racks = new String[]{rack}; + + // start am rm client + amClient = (AMRMClientImpl) AMRMClient.createAMRMClient(client); + amClient.init(yarnConf); + amClient.start(); + amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, ""); + + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, + true, null, ExecutionType.OPPORTUNISTIC)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, + true, null, ExecutionType.OPPORTUNISTIC)); + + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, + true, null, ExecutionType.OPPORTUNISTIC)); + + int containersRequestedNode = amClient.remoteRequestsTable.get(priority) + .get(node).get(ExecutionType.GUARANTEED).get(capability) + .remoteRequest.getNumContainers(); + int containersRequestedRack = amClient.remoteRequestsTable.get(priority) + .get(rack).get(ExecutionType.GUARANTEED).get(capability) + .remoteRequest.getNumContainers(); + int containersRequestedAny = amClient.remoteRequestsTable.get(priority) + .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED) + .get(capability).remoteRequest.getNumContainers(); + int oppContainersRequestedAny = + amClient.remoteRequestsTable.get(priority2).get(ResourceRequest.ANY) + .get(ExecutionType.OPPORTUNISTIC).get(capability).remoteRequest + .getNumContainers(); + + assertEquals(2, containersRequestedNode); + assertEquals(2, containersRequestedRack); + assertEquals(2, containersRequestedAny); + assertEquals(1, oppContainersRequestedAny); + + assertEquals(4, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int iterationsLeft = 10; + Set releases = new TreeSet<>(); + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache + ()); + HashMap receivedNMTokens = new HashMap<>(); + + while (allocatedContainerCount < + (containersRequestedAny + oppContainersRequestedAny) + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += allocResponse.getAllocatedContainers() + .size(); + for (Container container : allocResponse.getAllocatedContainers()) { + ContainerId rejectContainerId = container.getId(); + releases.add(rejectContainerId); + } - // RM should allocate GUARANTEED containers within 2 calls to allocate() - allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } - // Verify that the allocated containers are GUARANTEED - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.GUARANTEED, - containerTokenIdentifier.getExecutionType()); + if (allocatedContainerCount < containersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } } - LOG.info("testDistributedSchedulingE2E - Finish"); + assertEquals(allocatedContainerCount, + containersRequestedAny + oppContainersRequestedAny); + for (ContainerId rejectContainerId : releases) { + amClient.releaseAssignedContainer(rejectContainerId); + } + assertEquals(3, amClient.release.size()); + assertEquals(0, amClient.ask.size()); + + // need to tell the AMRMClient that we dont need these resources anymore + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, + true, null, ExecutionType.OPPORTUNISTIC)); + assertEquals(4, amClient.ask.size()); + + // test RPC exception handling + amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability, + nodes, racks, priority)); + amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability, + nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, + true, null, ExecutionType.OPPORTUNISTIC)); + + final AMRMClient amc = amClient; + ApplicationMasterProtocol realRM = amClient.rmClient; + try { + ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol + .class); + when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer( + new Answer() { + public AllocateResponse answer(InvocationOnMock invocation) + throws Exception { + amc.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, + racks, priority)); + amc.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, + priority)); + amc.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, + priority2, true, null, ExecutionType.OPPORTUNISTIC)); + throw new Exception(); + } + }); + amClient.rmClient = mockRM; + amClient.allocate(0.1f); + } catch (Exception ioe) { + } finally { + amClient.rmClient = realRM; + } - FinishApplicationMasterResponse responseFinish = - client.finishApplicationMaster(FinishApplicationMasterRequest - .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); + assertEquals(3, amClient.release.size()); + assertEquals(6, amClient.ask.size()); + + iterationsLeft = 3; + // do a few iterations to ensure RM is not going send new containers + while (iterationsLeft-- > 0) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + assertEquals(0, allocResponse.getAllocatedContainers().size()); + if (allocResponse.getCompletedContainersStatuses().size() > 0) { + for (ContainerStatus cStatus : allocResponse + .getCompletedContainersStatuses()) { + if (releases.contains(cStatus.getContainerId())) { + assertEquals(cStatus.getState(), ContainerState.COMPLETE); + assertEquals(-100, cStatus.getExitStatus()); + releases.remove(cStatus.getContainerId()); + } + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); - Assert.assertNotNull(responseFinish); + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); } finally { - if (rmClient != null) { - rmClient.stop(); + if (amClient != null && amClient.getServiceState() == Service.STATE + .STARTED) { + amClient.stop(); } - cluster.stop(); } } - @Ignore - @Override - public void testAMRMProxyE2E() throws Exception { } - - @Ignore - @Override - public void testE2ETokenRenewal() throws Exception { } - - @Ignore - @Override - public void testE2ETokenSwap() throws Exception { } + private void sleep(int sleepTime) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index cd04130..ec78eda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -252,8 +253,8 @@ public void testNMClient() } int containersRequestedAny = rmClient.remoteRequestsTable.get(priority) - .get(ResourceRequest.ANY).get(capability).remoteRequest - .getNumContainers(); + .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED).get(capability) + .remoteRequest.getNumContainers(); // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index 53ae2cd..a864990 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -186,6 +186,7 @@ public String toString() { + ", # Containers: " + getNumContainers() + ", Location: " + getResourceName() + ", Relax Locality: " + getRelaxLocality() + + ", Execution type: " + getExecutionType() + ", Node Label Expression: " + getNodeLabelExpression() + "}"; }