diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 7acaf11..732662e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -110,6 +110,7 @@ protected AMRMClient(String name) { final List nodes; final List racks; final Priority priority; + final long allocationRequestId; final boolean relaxLocality; final String nodeLabelsExpression; final ExecutionTypeRequest executionTypeRequest; @@ -181,10 +182,43 @@ public ContainerRequest(Resource capability, String[] nodes, */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean relaxLocality, String nodeLabelsExpression) { - this(capability, nodes, racks, priority, relaxLocality, + this(capability, nodes, racks, priority, 0, relaxLocality, nodeLabelsExpression, ExecutionTypeRequest.newInstance()); } + + /** + * Instantiates a {@link ContainerRequest} with the given constraints. + * + * @param capability + * The {@link Resource} to be requested for each container. + * @param nodes + * Any hosts to request that the containers are placed on. + * @param racks + * Any racks to request that the containers are placed on. The + * racks corresponding to any hosts requested will be automatically + * added to this list. + * @param priority + * The priority at which to request the containers. Higher + * priorities have lower numerical values. + * @param allocationRequestId + * The allocationRequestId of the request. To be used as a tracking + * id to match Containers allocated against this request. Will + * default to 0 if not specified. + * @param relaxLocality + * If true, containers for this request may be assigned on hosts + * and racks other than the ones explicitly requested. + * @param nodeLabelsExpression + * Set node labels to allocate resource, now we only support + * asking for only a single node label + */ + public ContainerRequest(Resource capability, String[] nodes, String[] racks, + Priority priority, long allocationRequestId, boolean relaxLocality, + String nodeLabelsExpression) { + this(capability, nodes, racks, priority, allocationRequestId, + relaxLocality, nodeLabelsExpression, + ExecutionTypeRequest.newInstance()); + } /** * Instantiates a {@link ContainerRequest} with the given constraints. @@ -200,6 +234,10 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, * @param priority * The priority at which to request the containers. Higher * priorities have lower numerical values. + * @param allocationRequestId + * The allocationRequestId of the request. To be used as a tracking + * id to match Containers allocated against this request. Will + * default to 0 if not specified. * @param relaxLocality * If true, containers for this request may be assigned on hosts * and racks other than the ones explicitly requested. @@ -210,7 +248,8 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, * Set the execution type of the container request. */ public ContainerRequest(Resource capability, String[] nodes, String[] racks, - Priority priority, boolean relaxLocality, String nodeLabelsExpression, + Priority priority, long allocationRequestId, boolean relaxLocality, + String nodeLabelsExpression, ExecutionTypeRequest executionTypeRequest) { // Validate request Preconditions.checkArgument(capability != null, @@ -223,6 +262,7 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, && (nodes == null || nodes.length == 0)), "Can't turn off locality relaxation on a " + "request with no location constraints"); + this.allocationRequestId = allocationRequestId; this.capability = capability; this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); @@ -247,6 +287,10 @@ public Resource getCapability() { public Priority getPriority() { return priority; } + + public long getAllocationRequestId() { + return allocationRequestId; + } public boolean getRelaxLocality() { return relaxLocality; @@ -264,6 +308,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); sb.append("Priority[").append(priority).append("]"); + sb.append("AllocationRequestId[").append(allocationRequestId).append("]"); sb.append("ExecutionTypeRequest[").append(executionTypeRequest) .append("]"); return sb.toString(); @@ -390,6 +435,10 @@ public abstract void requestContainerResourceChange( * Each collection in the list contains requests with identical * Resource size that fit in the given capability. In a * collection, requests will be returned in the same order as they were added. + * + * NOTE: This matches only requests that were made by the client WITHOUT the + * allocationRequestId specified. + * * @return Collection of request matching the parameters */ @InterfaceStability.Evolving @@ -407,7 +456,11 @@ public abstract void requestContainerResourceChange( * Each collection in the list contains requests with identical * Resource size that fit in the given capability. In a * collection, requests will be returned in the same order as they were added. - * specify an ExecutionType . + * specify an ExecutionType. + * + * NOTE: This matches only requests that were made by the client WITHOUT the + * allocationRequestId specified. + * * @param priority Priority * @param resourceName Location * @param executionType ExecutionType @@ -421,7 +474,23 @@ public abstract void requestContainerResourceChange( throw new UnsupportedOperationException("The sub-class extending" + " AMRMClient is expected to implement this !!"); } - + + /** + * Get outstanding ContainerRequests matching the given + * allocationRequestId. These ContainerRequests should have been added via + * addContainerRequest earlier in the lifecycle. For performance, + * the AMRMClient may return its internal collection directly without creating + * a copy. Users should not perform mutable operations on the return value. + * + * NOTE: This matches only requests that were made by the client WITH the + * allocationRequestId specified. + * + * @return Collection of request matching the parameters + */ + @InterfaceStability.Evolving + public abstract List> getMatchingRequests( + long allocationRequestId); + /** * Update application's blacklist with addition or removal resources. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 28d20c8..306a28f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -202,6 +202,10 @@ public void setHeartbeatInterval(int interval) { /** * Returns all matching ContainerRequests that match the given Priority, * ResourceName, ExecutionType and Capability. + * + * NOTE: This matches only requests that were made by the client WITHOUT the + * allocationRequestId specified. + * * @param priority Priority. * @param resourceName Location. * @param executionType ExecutionType. @@ -214,6 +218,21 @@ public void setHeartbeatInterval(int interval) { return client.getMatchingRequests(priority, resourceName, executionType, capability); } + + /** + * Returns all matching ContainerRequests that match the given + * AllocationRequestId. + * + * NOTE: This matches only requests that were made by the client WITH the + * allocationRequestId specified. + * + * @param allocationRequestId AllocationRequestId. + * @return All matching ContainerRequests + */ + public List> getMatchingRequests( + long allocationRequestId) { + return client.getMatchingRequests(allocationRequestId); + } /** * Registers this application master with the resource manager. On successful diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 4145944..3f0005c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -108,10 +108,11 @@ ResourceRequest remoteRequest; LinkedHashSet containerRequests; - ResourceRequestInfo(Priority priority, String resourceName, - Resource capability, boolean relaxLocality) { + ResourceRequestInfo(Long allocationRequestId, Priority priority, + String resourceName, Resource capability, boolean relaxLocality) { remoteRequest = ResourceRequest.newInstance(priority, resourceName, capability, 0); + remoteRequest.setAllocationRequestId(allocationRequestId); remoteRequest.setRelaxLocality(relaxLocality); containerRequests = new LinkedHashSet(); } @@ -154,7 +155,7 @@ static boolean canFit(Resource arg0, Resource arg1) { return (mem0 <= mem1 && cpu0 <= cpu1); } - final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable(); + final Map> remoteRequests =new HashMap<>(); protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); @@ -263,10 +264,12 @@ public AllocateResponse allocate(float progressIndicator) for(ResourceRequest r : ask) { // create a copy of ResourceRequest as we might change it while the // RPC layer is using it to send info across - askList.add(ResourceRequest.newInstance(r.getPriority(), + ResourceRequest ask = ResourceRequest.newInstance(r.getPriority(), r.getResourceName(), r.getCapability(), r.getNumContainers(), r.getRelaxLocality(), r.getNodeLabelExpression(), - r.getExecutionTypeRequest())); + r.getExecutionTypeRequest()); + ask.setAllocationRequestId(r.getAllocationRequestId()); + askList.add(ask); } List increaseList = new ArrayList<>(); List decreaseList = new ArrayList<>(); @@ -318,11 +321,14 @@ public AllocateResponse allocate(float progressIndicator) synchronized (this) { release.addAll(this.pendingRelease); blacklistAdditions.addAll(this.blacklistedNodes); - @SuppressWarnings("unchecked") - Iterator> reqIter = - remoteRequestsTable.iterator(); - while (reqIter.hasNext()) { - addResourceRequestToAsk(reqIter.next().remoteRequest); + for (RemoteRequestsTable remoteRequestsTable : + remoteRequests.values()) { + @SuppressWarnings("unchecked") + Iterator> reqIter = + remoteRequestsTable.iterator(); + while (reqIter.hasNext()) { + addResourceRequestToAsk(reqIter.next().remoteRequest); + } } change.putAll(this.pendingChange); } @@ -498,15 +504,16 @@ public synchronized void addContainerRequest(T req) { // check that specific and non-specific requests cannot be mixed within a // priority - checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST, - req.getRelaxLocality()); + checkLocalityRelaxationConflict(req.getAllocationRequestId(), + req.getPriority(), ANY_LIST, req.getRelaxLocality()); // check that specific rack cannot be mixed with specific node within a // priority. If node and its rack are both specified then they must be // in the same request. // For explicitly requested racks, we set locality relaxation to true - checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true); - checkLocalityRelaxationConflict(req.getPriority(), inferredRacks, - req.getRelaxLocality()); + checkLocalityRelaxationConflict(req.getAllocationRequestId(), + req.getPriority(), dedupedRacks, true); + checkLocalityRelaxationConflict(req.getAllocationRequestId(), + req.getPriority(), inferredRacks, req.getRelaxLocality()); // check if the node label expression specified is valid checkNodeLabelExpression(req); @@ -607,6 +614,24 @@ public synchronized int getClusterNodeCount() { return clusterNodeCount; } + + @Override + @SuppressWarnings("unchecked") + public List> getMatchingRequests(long + allocationRequestId) { + RemoteRequestsTable remoteRequestsTable = + remoteRequests.get(Long.valueOf(allocationRequestId)); + List> list = new LinkedList>(); + + Iterator> reqIter = + remoteRequestsTable.iterator(); + while (reqIter.hasNext()) { + ResourceRequestInfo resReqInfo = reqIter.next(); + list.add(resReqInfo.containerRequests); + } + return list; + } + @Override public synchronized List> getMatchingRequests( Priority priority, @@ -617,6 +642,7 @@ public synchronized int getClusterNodeCount() { } @Override + @SuppressWarnings("unchecked") public synchronized List> getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, Resource capability) { @@ -626,9 +652,10 @@ public synchronized int getClusterNodeCount() { "The priority at which to request containers should not be null "); List> list = new LinkedList>(); - @SuppressWarnings("unchecked") + RemoteRequestsTable remoteRequestsTable = + remoteRequests.get(Long.valueOf(0)); List> matchingRequests = - this.remoteRequestsTable.getMatchingRequests(priority, resourceName, + remoteRequestsTable.getMatchingRequests(priority, resourceName, executionType, capability); // If no exact match. Container may be larger than what was requested. // get all resources <= capability. map is reverse sorted. @@ -664,23 +691,27 @@ public synchronized int getClusterNodeCount() { * ContainerRequests with locality relaxation cannot be made at the same * priority as ContainerRequests without locality relaxation. */ - private void checkLocalityRelaxationConflict(Priority priority, - Collection locations, boolean relaxLocality) { + private void checkLocalityRelaxationConflict(Long allocationReqId, + Priority priority, Collection locations, boolean relaxLocality) { // Locality relaxation will be set to relaxLocality for all implicitly // requested racks. Make sure that existing rack requests match this. - @SuppressWarnings("unchecked") - List allCapabilityMaps = - remoteRequestsTable.getAllResourceRequestInfos(priority, locations); - for (ResourceRequestInfo reqs : allCapabilityMaps) { - ResourceRequest remoteRequest = reqs.remoteRequest; - boolean existingRelaxLocality = remoteRequest.getRelaxLocality(); - if (relaxLocality != existingRelaxLocality) { - throw new InvalidContainerRequestException("Cannot submit a " - + "ContainerRequest asking for location " - + remoteRequest.getResourceName() + " with locality relaxation " - + relaxLocality + " when it has already been requested" - + "with locality relaxation " + existingRelaxLocality); + RemoteRequestsTable remoteRequestsTable = + remoteRequests.get(allocationReqId); + if (remoteRequestsTable != null) { + @SuppressWarnings("unchecked") + List allCapabilityMaps = + remoteRequestsTable.getAllResourceRequestInfos(priority, locations); + for (ResourceRequestInfo reqs : allCapabilityMaps) { + ResourceRequest remoteRequest = reqs.remoteRequest; + boolean existingRelaxLocality = remoteRequest.getRelaxLocality(); + if (relaxLocality != existingRelaxLocality) { + throw new InvalidContainerRequestException("Cannot submit a " + + "ContainerRequest asking for location " + + remoteRequest.getResourceName() + " with locality relaxation " + + relaxLocality + " when it has already been requested" + + "with locality relaxation " + existingRelaxLocality); + } } } } @@ -742,10 +773,18 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { private void addResourceRequest(Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, Resource capability, T req, boolean relaxLocality, String labelExpression) { + RemoteRequestsTable remoteRequestsTable = + remoteRequests.get(req.getAllocationRequestId()); + if (remoteRequestsTable == null) { + remoteRequestsTable = new RemoteRequestsTable(); + remoteRequests.put(Long.valueOf(req.getAllocationRequestId()), + remoteRequestsTable); + } @SuppressWarnings("unchecked") ResourceRequestInfo resourceRequestInfo = remoteRequestsTable - .addResourceRequest(priority, resourceName, - execTypeReq, capability, req, relaxLocality, labelExpression); + .addResourceRequest(req.getAllocationRequestId(), priority, + resourceName, execTypeReq, capability, req, relaxLocality, + labelExpression); // Note this down for next interaction with ResourceManager addResourceRequestToAsk(resourceRequestInfo.remoteRequest); @@ -761,28 +800,34 @@ private void addResourceRequest(Priority priority, String resourceName, private void decResourceRequest(Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, Resource capability, T req) { - @SuppressWarnings("unchecked") - ResourceRequestInfo resourceRequestInfo = - remoteRequestsTable.decResourceRequest(priority, resourceName, - execTypeReq, capability, req); - // send the ResourceRequest to RM even if is 0 because it needs to override - // a previously sent value. If ResourceRequest was not sent previously then - // sending 0 aught to be a no-op on RM - if (resourceRequestInfo != null) { - addResourceRequestToAsk(resourceRequestInfo.remoteRequest); - - // delete entry from map if no longer needed - if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { - this.remoteRequestsTable.remove(priority, resourceName, - execTypeReq.getExecutionType(), capability); - } + RemoteRequestsTable remoteRequestsTable = + remoteRequests.get(req.getAllocationRequestId()); + if (remoteRequestsTable != null) { + @SuppressWarnings("unchecked") + ResourceRequestInfo resourceRequestInfo = + remoteRequestsTable.decResourceRequest(priority, resourceName, + execTypeReq, capability, req); + // send the ResourceRequest to RM even if is 0 because it needs to + // override + // a previously sent value. If ResourceRequest was not sent previously + // then + // sending 0 aught to be a no-op on RM + if (resourceRequestInfo != null) { + addResourceRequestToAsk(resourceRequestInfo.remoteRequest); + + // delete entry from map if no longer needed + if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { + remoteRequestsTable.remove(priority, resourceName, + execTypeReq.getExecutionType(), capability); + } - if (LOG.isDebugEnabled()) { - LOG.debug("AFTER decResourceRequest:" + " applicationId=" - + " priority=" + priority.getPriority() - + " resourceName=" + resourceName + " numContainers=" - + resourceRequestInfo.remoteRequest.getNumContainers() - + " #asks=" + ask.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("AFTER decResourceRequest:" + " applicationId=" + + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java index 853a512..110ca79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java @@ -264,15 +264,16 @@ ResourceRequestInfo remove(Priority priority, String resourceName, } @SuppressWarnings("unchecked") - ResourceRequestInfo addResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, Resource capability, T req, - boolean relaxLocality, String labelExpression) { + ResourceRequestInfo addResourceRequest(Long allocationRequestId, + Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, + Resource capability, T req, boolean relaxLocality, + String labelExpression) { ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, execTypeReq.getExecutionType(), capability); if (resourceRequestInfo == null) { resourceRequestInfo = - new ResourceRequestInfo(priority, resourceName, capability, - relaxLocality); + new ResourceRequestInfo(allocationRequestId, priority, resourceName, + capability, relaxLocality); put(priority, resourceName, execTypeReq.getExecutionType(), capability, resourceRequestInfo); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 57cdbfb..37188fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -364,26 +364,32 @@ public void testAMRMClientMatchingFitExecType() ContainerRequest storedGuarContainer2 = new ContainerRequest(capability2, nodes, racks, priority); ContainerRequest storedOpportContainer1 = - new ContainerRequest(capability1, nodes, racks, priority, true, null, + new ContainerRequest(capability1, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer2 = - new ContainerRequest(capability2, nodes, racks, priority, true, null, + new ContainerRequest(capability2, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer3 = - new ContainerRequest(capability3, nodes, racks, priority, true, null, + new ContainerRequest(capability3, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer4 = - new ContainerRequest(capability4, nodes, racks, priority, true, null, + new ContainerRequest(capability4, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer5 = - new ContainerRequest(capability5, nodes, racks, priority, true, null, + new ContainerRequest(capability5, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer6 = - new ContainerRequest(capability6, nodes, racks, priority, true, null, + new ContainerRequest(capability6, nodes, racks, priority, + 0, true, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); ContainerRequest storedOpportContainer7 = new ContainerRequest(capability7, nodes, racks, priority2, - false, null, + 0, false, null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); amClient.addContainerRequest(storedGuarContainer1); amClient.addContainerRequest(storedGuarContainer2); @@ -541,11 +547,13 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { amClient.addContainerRequest(storedContainer3); // test addition and storage - int containersRequestedAny = amClient.remoteRequestsTable.get(priority, + RemoteRequestsTable remoteRequestsTable = amClient + .remoteRequests.get(Long.valueOf(0)); + int containersRequestedAny = remoteRequestsTable.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedAny); - containersRequestedAny = amClient.remoteRequestsTable.get(priority1, + containersRequestedAny = remoteRequestsTable.get(priority1, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(1, containersRequestedAny); @@ -584,7 +592,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability); assertTrue(matches.isEmpty()); // 0 requests left. everything got cleaned up - assertTrue(amClient.remoteRequestsTable.isEmpty()); + assertTrue(amClient.remoteRequests.get(Long.valueOf(0)).isEmpty()); // go through an exemplary allocation, matching and release cycle amClient.addContainerRequest(storedContainer1); @@ -628,7 +636,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { assertEquals(0, amClient.ask.size()); assertEquals(0, allocResponse.getAllocatedContainers().size()); // 0 requests left. everything got cleaned up - assertTrue(amClient.remoteRequestsTable.isEmpty()); + assertTrue(remoteRequestsTable.isEmpty()); amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); @@ -1048,14 +1056,16 @@ private void testAllocation(final AMRMClientImpl amClient) new ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); - - int containersRequestedNode = amClient.remoteRequestsTable.get(priority, + + RemoteRequestsTable remoteRequestsTable = amClient + .remoteRequests.get(Long.valueOf(0)); + int containersRequestedNode = remoteRequestsTable.get(priority, node, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedRack = amClient.remoteRequestsTable.get(priority, + int containersRequestedRack = remoteRequestsTable.get(priority, rack, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedAny = amClient.remoteRequestsTable.get(priority, + int containersRequestedAny = remoteRequestsTable.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index 2db33c1..0eb16a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -61,7 +61,7 @@ public void testOpportunisticAndGuaranteedRequests() { verifyResourceRequest(client, request, ResourceRequest.ANY, true); ContainerRequest request2 = new ContainerRequest(capability, new String[] {"host1", "host2"}, - new String[] {"/rack2"}, Priority.newInstance(1), true, null, + new String[] {"/rack2"}, Priority.newInstance(1), 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true)); client.addContainerRequest(request2); @@ -274,8 +274,9 @@ private void verifyResourceRequest( AMRMClientImpl client, ContainerRequest request, String location, boolean expectedRelaxLocality, ExecutionType executionType) { - ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(), - location, executionType, request.getCapability()).remoteRequest; + ResourceRequest ask = client.remoteRequests.get(Long.valueOf(0)) + .get(request.getPriority(),location, executionType, + request.getCapability()).remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(1, ask.getNumContainers()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index 71321e3..cda1816 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -364,12 +364,12 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); @@ -379,21 +379,23 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); - int containersRequestedNode = amClient.remoteRequestsTable.get(priority, + RemoteRequestsTable remoteRequestsTable = amClient + .remoteRequests.get(Long.valueOf(0)); + int containersRequestedNode = remoteRequestsTable.get(priority, node, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedRack = amClient.remoteRequestsTable.get(priority, + int containersRequestedRack = remoteRequestsTable.get(priority, rack, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); - int containersRequestedAny = amClient.remoteRequestsTable.get(priority, + int containersRequestedAny = remoteRequestsTable.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); int oppContainersRequestedAny = - amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY, + remoteRequestsTable.get(priority2, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); @@ -455,7 +457,7 @@ public void testAMRMClient() throws Exception { new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); assertEquals(4, amClient.ask.size()); @@ -467,7 +469,7 @@ public void testAMRMClient() throws Exception { nodes, racks, priority)); amClient.addContainerRequest( new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, - true, null, + 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); @@ -488,7 +490,7 @@ public AllocateResponse answer(InvocationOnMock invocation) priority)); amc.removeContainerRequest( new AMRMClient.ContainerRequest(capability, null, null, - priority2, true, null, + priority2, 0, true, null, ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true))); throw new Exception(); @@ -569,7 +571,7 @@ public void testAMOpportunistic() throws Exception { ExecutionTypeRequest execTypeRequest = ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true); ContainerRequest containerRequest = new AMRMClient.ContainerRequest( - capability, nodes, racks, priority, true, null, execTypeRequest); + capability, nodes, racks, priority, 0, true, null, execTypeRequest); amClient.addContainerRequest(containerRequest); // Wait until the container is allocated diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 969fb70..e2d4e46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -252,9 +252,9 @@ public void testNMClient() racks, priority)); } - int containersRequestedAny = rmClient.remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) - .remoteRequest.getNumContainers(); + int containersRequestedAny = rmClient.remoteRequests.get(Long.valueOf(0)) + .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, + capability).remoteRequest.getNumContainers(); // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0;