diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 748e7f5..b971f9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -88,7 +88,7 @@ private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class); private static final List ANY_LIST = Collections.singletonList(ResourceRequest.ANY); - + private int lastResponseId = 0; protected String appHostName; @@ -98,25 +98,25 @@ protected ApplicationMasterProtocol rmClient; protected Resource clusterAvailableResources; protected int clusterNodeCount; - + // blacklistedNodes is required for keeping history of blacklisted nodes that // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to get // current blacklisted nodes and send back to RM. - protected final Set blacklistedNodes = new HashSet(); - protected final Set blacklistAdditions = new HashSet(); - protected final Set blacklistRemovals = new HashSet(); - + protected final Set blacklistedNodes = new HashSet<>(); + protected final Set blacklistAdditions = new HashSet<>(); + protected final Set blacklistRemovals = new HashSet<>(); + static class ResourceRequestInfo { ResourceRequest remoteRequest; LinkedHashSet containerRequests; - + ResourceRequestInfo(Long allocationRequestId, Priority priority, String resourceName, Resource capability, boolean relaxLocality) { remoteRequest = ResourceRequest.newBuilder().priority(priority) .resourceName(resourceName).capability(capability).numContainers(0) .allocationRequestId(allocationRequestId) .relaxLocality(relaxLocality).build(); - containerRequests = new LinkedHashSet(); + containerRequests = new LinkedHashSet<>(); } } @@ -141,11 +141,11 @@ public int compare(Resource arg0, Resource arg1) { } return -1; } - if(mem0 < mem1) { + if(mem0 < mem1) { return 1; } return -1; - } + } } static boolean canFit(Resource arg0, Resource arg1) { @@ -153,20 +153,20 @@ static boolean canFit(Resource arg0, Resource arg1) { long mem1 = arg1.getMemorySize(); long cpu0 = arg0.getVirtualCores(); long cpu1 = arg1.getVirtualCores(); - + return (mem0 <= mem1 && cpu0 <= cpu1); } private final Map> remoteRequests = new HashMap<>(); - protected final Set ask = new TreeSet( + protected final Set ask = new TreeSet<>( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); - protected final Set release = new TreeSet(); + protected final Set release = new TreeSet<>(); // pendingRelease holds history of release requests. // request is removed only if RM sends completedContainer. // How it different from release? --> release is for per allocate() request. - protected Set pendingRelease = new TreeSet(); + protected Set pendingRelease = new TreeSet<>(); // change map holds container resource change requests between two allocate() // calls, and are cleared after each successful allocate() call. protected final Map= 0, "Progress indicator should not be negative"); @@ -258,8 +258,8 @@ public AllocateResponse allocate(float progressIndicator) List askList = null; List releaseList = null; AllocateRequest allocateRequest = null; - List blacklistToAdd = new ArrayList(); - List blacklistToRemove = new ArrayList(); + List blacklistToAdd = new ArrayList<>(); + List blacklistToRemove = new ArrayList<>(); Map> oldChange = new HashMap<>(); try { @@ -268,7 +268,7 @@ public AllocateResponse allocate(float progressIndicator) // Save the current change for recovery oldChange.putAll(change); List updateList = createUpdateList(); - releaseList = new ArrayList(release); + releaseList = new ArrayList<>(release); // optimistically clear this collection assuming no RPC failure ask.clear(); release.clear(); @@ -276,7 +276,7 @@ public AllocateResponse allocate(float progressIndicator) blacklistToAdd.addAll(blacklistAdditions); blacklistToRemove.addAll(blacklistRemovals); - + ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest.newInstance(blacklistToAdd, blacklistToRemove); @@ -413,7 +413,7 @@ public AllocateResponse allocate(float progressIndicator) } private List cloneAsks() { - List askList = new ArrayList(ask.size()); + List askList = new ArrayList<>(ask.size()); for(ResourceRequest r : ask) { // create a copy of ResourceRequest as we might change it while the // RPC layer is using it to send info across @@ -500,12 +500,12 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl); } } - + @Override public synchronized void addContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); - Set dedupedRacks = new HashSet(); + Set dedupedRacks = new HashSet<>(); if (req.getRacks() != null) { dedupedRacks.addAll(req.getRacks()); if(req.getRacks().size() != dedupedRacks.size()) { @@ -521,8 +521,8 @@ public synchronized void addContainerRequest(T req) { // priority checkLocalityRelaxationConflict(req.getAllocationRequestId(), req.getPriority(), ANY_LIST, req.getRelaxLocality()); - // check that specific rack cannot be mixed with specific node within a - // priority. If node and its rack are both specified then they must be + // check that specific rack cannot be mixed with specific node within a + // priority. If node and its rack are both specified then they must be // in the same request. // For explicitly requested racks, we set locality relaxation to true checkLocalityRelaxationConflict(req.getAllocationRequestId(), @@ -533,11 +533,11 @@ public synchronized void addContainerRequest(T req) { checkNodeLabelExpression(req); if (req.getNodes() != null) { - HashSet dedupedNodes = new HashSet(req.getNodes()); + HashSet dedupedNodes = new HashSet<>(req.getNodes()); if(dedupedNodes.size() != req.getNodes().size()) { Joiner joiner = Joiner.on(','); LOG.warn("ContainerRequest has duplicate nodes: " - + joiner.join(req.getNodes())); + + joiner.join(req.getNodes())); } for (String node : dedupedNodes) { addResourceRequest(req.getPriority(), node, @@ -568,7 +568,7 @@ public synchronized void addContainerRequest(T req) { public synchronized void removeContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); - Set allRacks = new HashSet(); + Set allRacks = new HashSet<>(); if (req.getRacks() != null) { allRacks.addAll(req.getRacks()); } @@ -576,7 +576,7 @@ public synchronized void removeContainerRequest(T req) { // Update resource requests if (req.getNodes() != null) { - for (String node : new HashSet(req.getNodes())) { + for (String node : new HashSet<>(req.getNodes())) { decResourceRequest(req.getPriority(), node, req.getExecutionTypeRequest(), req.getCapability(), req); } @@ -644,12 +644,12 @@ public synchronized void releaseAssignedContainer(ContainerId containerId) { release.add(containerId); pendingChange.remove(containerId); } - + @Override public synchronized Resource getAvailableResources() { return clusterAvailableResources; } - + @Override public synchronized int getClusterNodeCount() { return clusterNodeCount; @@ -691,26 +691,28 @@ public synchronized int getClusterNodeCount() { "The Resource to be requested should not be null "); Preconditions.checkArgument(priority != null, "The priority at which to request containers should not be null "); - List> list = new LinkedList>(); - + List> list = new LinkedList<>(); RemoteRequestsTable remoteRequestsTable = getTable(0); - List> matchingRequests = - remoteRequestsTable.getMatchingRequests(priority, resourceName, - executionType, capability); - // If no exact match. Container may be larger than what was requested. - // get all resources <= capability. map is reverse sorted. - for (ResourceRequestInfo resReqInfo : matchingRequests) { - if (canFit(resReqInfo.remoteRequest.getCapability(), capability) && - !resReqInfo.containerRequests.isEmpty()) { - list.add(resReqInfo.containerRequests); + + if (remoteRequestsTable != null) { + List> matchingRequests = + remoteRequestsTable.getMatchingRequests(priority, resourceName, + executionType, capability); + // If no exact match. Container may be larger than what was requested. + // get all resources <= capability. map is reverse sorted. + for (ResourceRequestInfo resReqInfo : matchingRequests) { + if (canFit(resReqInfo.remoteRequest.getCapability(), capability) && + !resReqInfo.containerRequests.isEmpty()) { + list.add(resReqInfo.containerRequests); + } } } // no match found - return list; + return list; } - + private Set resolveRacks(List nodes) { - Set racks = new HashSet(); + Set racks = new HashSet<>(); if (nodes != null) { for (String node : nodes) { // Ensure node requests are accompanied by requests for @@ -723,7 +725,7 @@ public synchronized int getClusterNodeCount() { } } } - + return racks; } @@ -754,16 +756,16 @@ private void checkLocalityRelaxationConflict(Long allocationReqId, } } } - + /** * Valid if a node label expression specified on container request is valid or * not - * + * * @param containerRequest */ private void checkNodeLabelExpression(T containerRequest) { String exp = containerRequest.getNodeLabelExpression(); - + if (null == exp || exp.isEmpty()) { return; } @@ -825,13 +827,13 @@ private void validateContainerExecTypeChangeRequest( private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // This code looks weird but is needed because of the following scenario. - // A ResourceRequest is removed from the remoteRequestTable. A 0 container + // A ResourceRequest is removed from the remoteRequestTable. A 0 container // request is added to 'ask' to notify the RM about not needing it any more. - // Before the call to allocate, the user now requests more containers. If + // Before the call to allocate, the user now requests more containers. If // the locations of the 0 size request and the new request are the same // (with the difference being only container count), then the set comparator - // will consider both to be the same and not add the new request to ask. So - // we need to check for the "same" request being present and remove it and + // will consider both to be the same and not add the new request to ask. So + // we need to check for the "same" request being present and remove it and // then add it back. The comparator is container count agnostic. // This should happen only rarely but we do need to guard against it. if(ask.contains(remoteRequest)) { @@ -846,7 +848,7 @@ private void addResourceRequest(Priority priority, String resourceName, RemoteRequestsTable remoteRequestsTable = getTable(req.getAllocationRequestId()); if (remoteRequestsTable == null) { - remoteRequestsTable = new RemoteRequestsTable(); + remoteRequestsTable = new RemoteRequestsTable<>(); putTable(req.getAllocationRequestId(), remoteRequestsTable); } @SuppressWarnings("unchecked") @@ -862,7 +864,7 @@ private void addResourceRequest(Priority priority, String resourceName, LOG.debug("addResourceRequest:" + " applicationId=" + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" - + resourceRequestInfo.remoteRequest.getNumContainers() + + resourceRequestInfo.remoteRequest.getNumContainers() + " #asks=" + ask.size()); } } @@ -906,15 +908,15 @@ private void decResourceRequest(Priority priority, String resourceName, @Override public synchronized void updateBlacklist(List blacklistAdditions, List blacklistRemovals) { - + if (blacklistAdditions != null) { this.blacklistAdditions.addAll(blacklistAdditions); this.blacklistedNodes.addAll(blacklistAdditions); - // if some resources are also in blacklistRemovals updated before, we + // if some resources are also in blacklistRemovals updated before, we // should remove them here. this.blacklistRemovals.removeAll(blacklistAdditions); } - + if (blacklistRemovals != null) { this.blacklistRemovals.addAll(blacklistRemovals); this.blacklistedNodes.removeAll(blacklistRemovals); @@ -922,7 +924,7 @@ public synchronized void updateBlacklist(List blacklistAdditions, // them here. this.blacklistAdditions.removeAll(blacklistRemovals); } - + if (blacklistAdditions != null && blacklistRemovals != null && blacklistAdditions.removeAll(blacklistRemovals)) { // we allow resources to appear in addition list and removal list in the @@ -934,7 +936,7 @@ public synchronized void updateBlacklist(List blacklistAdditions, private void updateAMRMToken(Token token) throws IOException { org.apache.hadoop.security.token.Token amrmToken = - new org.apache.hadoop.security.token.Token(token + new org.apache.hadoop.security.token.Token<>(token .getIdentifier().array(), token.getPassword().array(), new Text( token.getKind()), new Text(token.getService())); // Preserve the token service sent by the RM when adding the token diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 5c32764..422348c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -102,7 +102,7 @@ private List nodeReports = null; private ApplicationAttemptId attemptId = null; private int nodeCount = 3; - + static final int rolling_interval_sec = 13; static final long am_expire_ms = 4000; @@ -121,7 +121,7 @@ public TestAMRMClient(String schedulerName) { @Parameterized.Parameters public static Collection data() { - List list = new ArrayList(2); + List list = new ArrayList<>(2); list.add(new Object[] {CapacityScheduler.class.getName()}); list.add(new Object[] {FairScheduler.class.getName()}); return list; @@ -175,7 +175,7 @@ private void createClusterAndStartApplication() throws Exception { racks = new String[]{ rack }; // submit new app - ApplicationSubmissionContext appContext = + ApplicationSubmissionContext appContext = yarnClient.createApplication().getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); // set the application name @@ -229,7 +229,7 @@ private void createClusterAndStartApplication() throws Exception { UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf)); } - + @After public void teardown() throws YarnException, IOException { yarnClient.killApplication(attemptId.getApplicationId()); @@ -242,7 +242,7 @@ public void teardown() throws YarnException, IOException { yarnCluster.stop(); } } - + @Test (timeout=60000) public void testAMRMClientMatchingFit() throws YarnException, IOException { AMRMClient amClient = null; @@ -252,7 +252,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { amClient.init(conf); amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); - + Resource capability1 = Resource.newInstance(1024, 2); Resource capability2 = Resource.newInstance(1024, 1); Resource capability3 = Resource.newInstance(1000, 2); @@ -261,19 +261,19 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { Resource capability6 = Resource.newInstance(2000, 1); Resource capability7 = Resource.newInstance(2000, 1); - ContainerRequest storedContainer1 = + ContainerRequest storedContainer1 = new ContainerRequest(capability1, nodes, racks, priority); - ContainerRequest storedContainer2 = + ContainerRequest storedContainer2 = new ContainerRequest(capability2, nodes, racks, priority); - ContainerRequest storedContainer3 = + ContainerRequest storedContainer3 = new ContainerRequest(capability3, nodes, racks, priority); - ContainerRequest storedContainer4 = + ContainerRequest storedContainer4 = new ContainerRequest(capability4, nodes, racks, priority); - ContainerRequest storedContainer5 = + ContainerRequest storedContainer5 = new ContainerRequest(capability5, nodes, racks, priority); - ContainerRequest storedContainer6 = + ContainerRequest storedContainer6 = new ContainerRequest(capability6, nodes, racks, priority); - ContainerRequest storedContainer7 = + ContainerRequest storedContainer7 = new ContainerRequest(capability7, nodes, racks, priority2, false); amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer2); @@ -294,7 +294,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { amClient.addContainerRequest(storedContainer11); amClient.addContainerRequest(storedContainer33); amClient.addContainerRequest(storedContainer43); - + // test matching of containers List> matches; ContainerRequest storedRequest; @@ -324,7 +324,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { storedRequest = iter.next(); assertEquals(storedContainer33, storedRequest); amClient.removeContainerRequest(storedContainer33); - + // exact matching with order maintained Resource testCapability2 = Resource.newInstance(2000, 1); matches = amClient.getMatchingRequests(priority, node, testCapability2); @@ -339,12 +339,12 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { } } amClient.removeContainerRequest(storedContainer6); - + // matching with larger container. all requests returned Resource testCapability3 = Resource.newInstance(4000, 4); matches = amClient.getMatchingRequests(priority, node, testCapability3); assert(matches.size() == 4); - + Resource testCapability4 = Resource.newInstance(1024, 2); matches = amClient.getMatchingRequests(priority, node, testCapability4); assert(matches.size() == 2); @@ -354,14 +354,14 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { ContainerRequest testRequest = testSet.iterator().next(); assertTrue(testRequest != storedContainer4); assertTrue(testRequest != storedContainer5); - assert(testRequest == storedContainer2 || + assert(testRequest == storedContainer2 || testRequest == storedContainer3); } - + Resource testCapability5 = Resource.newInstance(512, 4); matches = amClient.getMatchingRequests(priority, node, testCapability5); assert(matches.size() == 0); - + // verify requests without relaxed locality are only returned at specific // locations Resource testCapability7 = Resource.newInstance(2000, 1); @@ -370,7 +370,7 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException { assert(matches.size() == 0); matches = amClient.getMatchingRequests(priority2, node, testCapability7); assert(matches.size() == 1); - + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); @@ -513,27 +513,27 @@ public void testAMRMClientMatchingFitExecType() } } } - + private void verifyMatches( List> matches, int matchSize) { assertEquals(1, matches.size()); assertEquals(matches.get(0).size(), matchSize); } - + @Test (timeout=60000) public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException { AMRMClientImpl amClient = null; try { // start am rm client - amClient = new AMRMClientImpl(); + amClient = new AMRMClientImpl<>(); amClient.init(conf); amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); - + Resource capability = Resource.newInstance(1024, 2); - ContainerRequest storedContainer1 = + ContainerRequest storedContainer1 = new ContainerRequest(capability, nodes, null, priority); amClient.addContainerRequest(storedContainer1); @@ -550,12 +550,12 @@ public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOExce verifyMatches(matches, 1); storedRequest = matches.get(0).iterator().next(); assertEquals(storedContainer1, storedRequest); - + // inferred rack match no longer valid after request is removed amClient.removeContainerRequest(storedContainer1); matches = amClient.getMatchingRequests(priority, rack, capability); assertTrue(matches.isEmpty()); - + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); @@ -577,20 +577,20 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { amClient.init(conf); amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); - + Priority priority1 = Records.newRecord(Priority.class); priority1.setPriority(2); - - ContainerRequest storedContainer1 = + + ContainerRequest storedContainer1 = new ContainerRequest(capability, nodes, racks, priority); - ContainerRequest storedContainer2 = + ContainerRequest storedContainer2 = new ContainerRequest(capability, nodes, racks, priority); - ContainerRequest storedContainer3 = + ContainerRequest storedContainer3 = new ContainerRequest(capability, null, null, priority1); amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer2); amClient.addContainerRequest(storedContainer3); - + // test addition and storage RemoteRequestsTable remoteRequestsTable = amClient.getTable(0); @@ -602,20 +602,20 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(1, containersRequestedAny); - List> matches = + List> matches = amClient.getMatchingRequests(priority, node, capability); verifyMatches(matches, 2); matches = amClient.getMatchingRequests(priority, rack, capability); verifyMatches(matches, 2); - matches = + matches = amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); verifyMatches(matches, 2); matches = amClient.getMatchingRequests(priority1, rack, capability); assertTrue(matches.isEmpty()); - matches = + matches = amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability); verifyMatches(matches, 1); - + // test removal amClient.removeContainerRequest(storedContainer3); matches = amClient.getMatchingRequests(priority, node, capability); @@ -625,20 +625,20 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { verifyMatches(matches, 1); matches = amClient.getMatchingRequests(priority, rack, capability); verifyMatches(matches, 1); - + // test matching of containers ContainerRequest storedRequest = matches.get(0).iterator().next(); assertEquals(storedContainer1, storedRequest); amClient.removeContainerRequest(storedContainer1); - matches = + matches = amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); assertTrue(matches.isEmpty()); - matches = + matches = amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability); assertTrue(matches.isEmpty()); // 0 requests left. everything got cleaned up assertTrue(amClient.getTable(0).isEmpty()); - + // go through an exemplary allocation, matching and release cycle amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer3); @@ -651,15 +651,15 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { AllocateResponse allocResponse = amClient.allocate(0.1f); assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); - + assertEquals(nodeCount, amClient.getClusterNodeCount()); allocatedContainerCount += allocResponse.getAllocatedContainers().size(); for(Container container : allocResponse.getAllocatedContainers()) { - ContainerRequest expectedRequest = + ContainerRequest expectedRequest = container.getPriority().equals(storedContainer1.getPriority()) ? storedContainer1 : storedContainer3; - matches = amClient.getMatchingRequests(container.getPriority(), - ResourceRequest.ANY, + matches = amClient.getMatchingRequests(container.getPriority(), + ResourceRequest.ANY, container.getResource()); // test correct matched container is returned verifyMatches(matches, 1); @@ -674,7 +674,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { triggerSchedulingWithNMHeartBeat(); } } - + assertEquals(2, allocatedContainerCount); AllocateResponse allocResponse = amClient.allocate(0.1f); assertEquals(0, amClient.release.size()); @@ -682,7 +682,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { assertEquals(0, allocResponse.getAllocatedContainers().size()); // 0 requests left. everything got cleaned up assertTrue(remoteRequestsTable.isEmpty()); - + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); @@ -724,19 +724,19 @@ public void testAllocationWithBlacklist() throws YarnException, IOException { amClient.init(conf); amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); - + assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); - - ContainerRequest storedContainer1 = + + ContainerRequest storedContainer1 = new ContainerRequest(capability, nodes, racks, priority); amClient.addContainerRequest(storedContainer1); assertEquals(3, amClient.ask.size()); assertEquals(0, amClient.release.size()); - - List localNodeBlacklist = new ArrayList(); + + List localNodeBlacklist = new ArrayList<>(); localNodeBlacklist.add(node); - + // put node in black list, so no container assignment amClient.updateBlacklist(localNodeBlacklist, null); @@ -747,19 +747,19 @@ public void testAllocationWithBlacklist() throws YarnException, IOException { // Remove node from blacklist, so get assigned with 2 amClient.updateBlacklist(null, localNodeBlacklist); - ContainerRequest storedContainer2 = + ContainerRequest storedContainer2 = new ContainerRequest(capability, nodes, racks, priority); amClient.addContainerRequest(storedContainer2); allocatedContainerCount = getAllocatedContainersNumber(amClient, DEFAULT_ITERATION); assertEquals(2, allocatedContainerCount); - + // Test in case exception in allocate(), blacklist is kept assertTrue(amClient.blacklistAdditions.isEmpty()); assertTrue(amClient.blacklistRemovals.isEmpty()); - + // create a invalid ContainerRequest - memory value is minus - ContainerRequest invalidContainerRequest = + ContainerRequest invalidContainerRequest = new ContainerRequest(Resource.newInstance(-1024, 1), nodes, racks, priority); amClient.addContainerRequest(invalidContainerRequest); @@ -777,7 +777,7 @@ public void testAllocationWithBlacklist() throws YarnException, IOException { } } } - + @Test (timeout=60000) public void testAMRMClientWithBlacklist() throws YarnException, IOException { AMRMClientImpl amClient = null; @@ -790,35 +790,35 @@ public void testAMRMClientWithBlacklist() throws YarnException, IOException { amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); String[] nodes = {"node1", "node2", "node3"}; - + // Add nodes[0] and nodes[1] - List nodeList01 = new ArrayList(); + List nodeList01 = new ArrayList<>(); nodeList01.add(nodes[0]); nodeList01.add(nodes[1]); amClient.updateBlacklist(nodeList01, null); assertEquals(2, amClient.blacklistAdditions.size()); assertEquals(0, amClient.blacklistRemovals.size()); - + // Add nodes[0] again, verify it is not added duplicated. - List nodeList02 = new ArrayList(); + List nodeList02 = new ArrayList<>(); nodeList02.add(nodes[0]); nodeList02.add(nodes[2]); amClient.updateBlacklist(nodeList02, null); assertEquals(3, amClient.blacklistAdditions.size()); assertEquals(0, amClient.blacklistRemovals.size()); - - // Add nodes[1] and nodes[2] to removal list, + + // Add nodes[1] and nodes[2] to removal list, // Verify addition list remove these two nodes. - List nodeList12 = new ArrayList(); + List nodeList12 = new ArrayList<>(); nodeList12.add(nodes[1]); nodeList12.add(nodes[2]); amClient.updateBlacklist(null, nodeList12); assertEquals(1, amClient.blacklistAdditions.size()); assertEquals(2, amClient.blacklistRemovals.size()); - - // Add nodes[1] again to addition list, + + // Add nodes[1] again to addition list, // Verify removal list will remove this node. - List nodeList1 = new ArrayList(); + List nodeList1 = new ArrayList<>(); nodeList1.add(nodes[1]); amClient.updateBlacklist(nodeList1, null); assertEquals(2, amClient.blacklistAdditions.size()); @@ -839,10 +839,10 @@ private int getAllocatedContainersNumber( AllocateResponse allocResponse = amClient.allocate(0.1f); assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); - + assertEquals(nodeCount, amClient.getClusterNodeCount()); allocatedContainerCount += allocResponse.getAllocatedContainers().size(); - + if(allocatedContainerCount == 0) { // let NM heartbeat to RM and trigger allocations triggerSchedulingWithNMHeartBeat(); @@ -882,7 +882,7 @@ private void initAMRMClientAndTest(boolean useAllocReqId) //setting an instance NMTokenCache amClient.setNMTokenCache(new NMTokenCache()); //asserting we are not using the singleton instance cache - Assert.assertNotSame(NMTokenCache.getSingleton(), + Assert.assertNotSame(NMTokenCache.getSingleton(), amClient.getNMTokenCache()); amClient.init(conf); @@ -905,11 +905,11 @@ private void initAMRMClientAndTest(boolean useAllocReqId) } } } - + @Test(timeout=30000) public void testAskWithNodeLabels() { AMRMClientImpl client = - new AMRMClientImpl(); + new AMRMClientImpl<>(); // add exp=x to ANY client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, @@ -926,7 +926,7 @@ public void testAskWithNodeLabels() { assertEquals(1, client.ask.size()); assertEquals("a", client.ask.iterator().next() .getNodeLabelExpression()); - + // add exp=x to ANY, rack and node, only resource request has ANY resource // name will be assigned the label expression // add exp=x then add exp=a to ANY in same priority, only exp=a should kept @@ -953,7 +953,7 @@ public void testAskWithNodeLabels() { } } } - + private void verifyAddRequestFailed(AMRMClient client, ContainerRequest request) { try { @@ -963,11 +963,11 @@ private void verifyAddRequestFailed(AMRMClient client, } fail(); } - + @Test(timeout=30000) public void testAskWithInvalidNodeLabels() { AMRMClientImpl client = - new AMRMClientImpl(); + new AMRMClientImpl<>(); // specified exp with more than one node labels verifyAddRequestFailed(client, @@ -1467,10 +1467,10 @@ private void startContainer(AllocateResponse allocResponse, private void testAllocation(final AMRMClientImpl amClient) throws YarnException, IOException { // setup container request - + assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); - + amClient.addContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); amClient.addContainerRequest( @@ -1490,18 +1490,18 @@ private void testAllocation(final AMRMClientImpl amClient) // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; int iterationsLeft = 3; - Set releases = new TreeSet(); - + Set releases = new TreeSet<>(); + amClient.getNMTokenCache().clearCache(); assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache()); - HashMap receivedNMTokens = new HashMap(); - + HashMap receivedNMTokens = new HashMap<>(); + while (allocatedContainerCount < containersRequestedAny && iterationsLeft-- > 0) { AllocateResponse allocResponse = amClient.allocate(0.1f); assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); - + assertEquals(nodeCount, amClient.getClusterNodeCount()); allocatedContainerCount += allocResponse.getAllocatedContainers().size(); for(Container container : allocResponse.getAllocatedContainers()) { @@ -1509,7 +1509,7 @@ private void testAllocation(final AMRMClientImpl amClient) releases.add(rejectContainerId); amClient.releaseAssignedContainer(rejectContainerId); } - + for (NMToken token : allocResponse.getNMTokens()) { String nodeID = token.getNodeId().toString(); if (receivedNMTokens.containsKey(nodeID)) { @@ -1517,21 +1517,21 @@ private void testAllocation(final AMRMClientImpl amClient) } receivedNMTokens.put(nodeID, token.getToken()); } - + if(allocatedContainerCount < containersRequestedAny) { // let NM heartbeat to RM and trigger allocations triggerSchedulingWithNMHeartBeat(); } } - + // Should receive atleast 1 token assertTrue(receivedNMTokens.size() > 0 && receivedNMTokens.size() <= nodeCount); - + assertEquals(allocatedContainerCount, containersRequestedAny); assertEquals(2, releases.size()); assertEquals(0, amClient.ask.size()); - + // need to tell the AMRMClient that we dont need these resources anymore amClient.removeContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); @@ -1541,7 +1541,7 @@ private void testAllocation(final AMRMClientImpl amClient) // send 0 container count request for resources that are no longer needed ResourceRequest snoopRequest = amClient.ask.iterator().next(); assertEquals(0, snoopRequest.getNumContainers()); - + // test RPC exception handling amClient.addContainerRequest(new ContainerRequest(capability, nodes, racks, priority)); @@ -1549,7 +1549,7 @@ private void testAllocation(final AMRMClientImpl amClient) racks, priority)); snoopRequest = amClient.ask.iterator().next(); assertEquals(2, snoopRequest.getNumContainers()); - + ApplicationMasterProtocol realRM = amClient.rmClient; try { ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class); @@ -1558,7 +1558,7 @@ private void testAllocation(final AMRMClientImpl amClient) public AllocateResponse answer(InvocationOnMock invocation) throws Exception { amClient.removeContainerRequest( - new ContainerRequest(capability, nodes, + new ContainerRequest(capability, nodes, racks, priority)); amClient.removeContainerRequest( new ContainerRequest(capability, nodes, racks, priority)); @@ -1575,7 +1575,7 @@ public AllocateResponse answer(InvocationOnMock invocation) assertEquals(2, amClient.release.size()); assertEquals(3, amClient.ask.size()); snoopRequest = amClient.ask.iterator().next(); - // verify that the remove request made in between makeRequest and allocate + // verify that the remove request made in between makeRequest and allocate // has not been lost assertEquals(0, snoopRequest.getNumContainers()); @@ -1645,7 +1645,7 @@ private void testAllocRequestId( // RM should allocate container within 2 calls to allocate() List allocatedContainers = new ArrayList<>(); int iterationsLeft = 5; - Set releases = new TreeSet(); + Set releases = new TreeSet<>(); while (allocatedContainers.size() < containersRequestedAny && iterationsLeft-- > 0) { @@ -1735,7 +1735,7 @@ public void testWaitFor() throws InterruptedException { } } } - + private void sleep(int sleepTime) { try { Thread.sleep(sleepTime); @@ -1787,26 +1787,26 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException, // can do the allocate call with latest AMRMToken AllocateResponse response = amClient.allocate(0.1f); - + // Verify latest AMRMToken can be used to send allocation request. UserGroupInformation testUser1 = UserGroupInformation.createRemoteUser("testUser1"); - - AMRMTokenIdentifierForTest newVersionTokenIdentifier = + + AMRMTokenIdentifierForTest newVersionTokenIdentifier = new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message"); - + assertEquals("Message is changed after set to newVersionTokenIdentifier", "message", newVersionTokenIdentifier.getMessage()); - org.apache.hadoop.security.token.Token newVersionToken = - new org.apache.hadoop.security.token.Token ( - newVersionTokenIdentifier.getBytes(), + org.apache.hadoop.security.token.Token newVersionToken = + new org.apache.hadoop.security.token.Token<> ( + newVersionTokenIdentifier.getBytes(), amrmTokenSecretManager.retrievePassword(newVersionTokenIdentifier), newVersionTokenIdentifier.getKind(), new Text()); - + SecurityUtil.setTokenService(newVersionToken, yarnCluster .getResourceManager().getApplicationMasterService().getBindAddress()); testUser1.addToken(newVersionToken); - + AllocateRequest request = Records.newRecord(AllocateRequest.class); request.setResponseId(response.getResponseId()); testUser1.doAs(new PrivilegedAction() { @@ -1869,6 +1869,25 @@ public ApplicationMasterProtocol run() { } } + @Test (timeout=60000) + public void testGetMatchingRequestBeforeContainerRequest() + throws YarnException, IOException { + AMRMClient amClient = null; + + // start am rm client + amClient = AMRMClient.createAMRMClient(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + // query matching request before container request. + List> matches; + Resource testCapability1 = Resource.newInstance(1024, 2); + matches = amClient.getMatchingRequests(priority, node, testCapability1); + + assertTrue(matches.isEmpty()); + } + @SuppressWarnings("unchecked") private org.apache.hadoop.security.token.Token getAMRMToken() throws IOException {