diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 6dcecde09aa..c05f7acfd28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -1037,6 +1037,11 @@ private void updateAMRMToken(Token token) throws IOException { return remoteRequests.get(Long.valueOf(allocationRequestId)); } + @VisibleForTesting + Map, List> getOutstandingSchedRequests() { + return outstandingSchedRequests; + } + RemoteRequestsTable putTable(long allocationRequestId, RemoteRequestsTable table) { return remoteRequests.put(Long.valueOf(allocationRequestId), table); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOutstandingSchedulingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOutstandingSchedulingRequest.java new file mode 100644 index 00000000000..26a85ec63d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOutstandingSchedulingRequest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Thread.sleep; + +/** + * Test Outstanding Scheduling Requests. + */ +public class TestAMRMClientOutstandingSchedulingRequest extends BaseAMRMClientTest { + + @Test(timeout=60000) + public void testAMRMClientOutstandingSchedulingRequest() + throws Exception { + // we have to create a new instance of MiniYARNCluster to avoid SASL qop + // mismatches between client and server + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler"); + createClusterAndStartApplication(conf); + + AMRMClient amClient = AMRMClient.createAMRMClient(); + amClient.setNMTokenCache(new NMTokenCache()); + //asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + + final List allocatedContainers = new ArrayList<>(); + AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000, + new AMRMClientAsync.AbstractCallbackHandler() { + @Override + public void onContainersAllocated(List containers) { + allocatedContainers.addAll(containers); + } + + @Override + public void onRequestsRejected(List rejReqs) { } + + @Override + public void onContainersCompleted(List statuses) {} + @Override + public void onContainersUpdated(List containers) {} + @Override + public void onShutdownRequest() {} + @Override + public void onNodesUpdated(List updatedNodes) {} + @Override + public void onError(Throwable e) {} + + @Override + public float getProgress() { + return 0.1f; + } + }); + + asyncClient.init(conf); + asyncClient.start(); + asyncClient.registerApplicationMaster("Host", 10000, ""); + asyncClient.addSchedulingRequests( + Arrays.asList( + schedulingRequest(2, 0, 1, 1, 512) + )); + + waitForContainerAllocation(allocatedContainers, 2); + Assert.assertEquals(2, allocatedContainers.size()); + Map, List> outstandingSchedRequests = ((AMRMClientImpl)amClient).getOutstandingSchedRequests(); + // All values of outstandingSchedRequests should be empty + for (List outstanding : outstandingSchedRequests.values()) { + Assert.assertEquals(0, outstanding.size()); + } + + asyncClient.stop(); + } + + private static void waitForContainerAllocation( + List allocatedContainers, + int containerNum) throws Exception { + int maxCount = 10; + while (maxCount >= 0 && allocatedContainers.size() < containerNum) { + maxCount--; + sleep(1000); + } + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem) { + return schedulingRequest(numAllocations, priority, allocReqId, cores, mem, + ExecutionType.GUARANTEED); + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, + ExecutionType execType) { + return SchedulingRequest.newBuilder() + .priority(Priority.newInstance(priority)) + .allocationRequestId(allocReqId) + .executionType(ExecutionTypeRequest.newInstance(execType, true)) + .resourceSizing( + ResourceSizing.newInstance(numAllocations, + Resource.newInstance(mem, cores))) + .build(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index 34a9b34fc74..fe955723471 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -197,8 +197,6 @@ public static void removeFromOutstandingSchedulingRequests( return; } for (Container container : containers) { - if (container.getAllocationTags() != null - && !container.getAllocationTags().isEmpty()) { List schedReqs = outstandingSchedRequests.get(container.getAllocationTags()); if (schedReqs != null && !schedReqs.isEmpty()) { @@ -219,7 +217,6 @@ public static void removeFromOutstandingSchedulingRequests( } } } - } } } } \ No newline at end of file