diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c8ac82a6ea7..69519e1f31d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2507,6 +2507,11 @@ public static boolean isAclEnabled(Configuration conf) { YARN_MC_PREFIX + YarnConfiguration.NM_PMEM_MB; public static final int DEFAULT_YARN_MINICLUSTER_NM_PMEM_MB = 4 * 1024; + /** Allow changing the vcore for the NodeManager in the MiniYARNCluster */ + public static final String YARN_MINICLUSTER_NM_VCORES = + YARN_MC_PREFIX + YarnConfiguration.NM_VCORES; + public static final int DEFAULT_YARN_MINICLUSTER_NM_VCORES = 8; + /** The log directory for the containers */ public static final String YARN_APP_CONTAINER_LOG_DIR = YARN_PREFIX + "app.container.log.dir"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index c05f7acfd28..d6e410b283a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -38,8 +38,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -96,6 +99,8 @@ private int lastResponseId = 0; + protected ApplicationAttemptId appAttemptId; + protected String appHostName; protected int appHostPort; protected String appTrackingUrl; @@ -167,6 +172,11 @@ public int compare(Resource res0, Resource res1) { private List schedulingRequests = new ArrayList<>(); private Map, List> outstandingSchedRequests = new HashMap<>(); + // containersFromPreviousAttempts holds containers from previous attempts. + // AMRMClient may receive containers from previous attempts once RM restarts, + // so it's necessary to distinguish newly reported containers from known ones. + protected final Set containersFromPreviousAttempts = + new TreeSet<>(); public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); @@ -191,6 +201,7 @@ protected void serviceStart() throws Exception { if (rmClient == null) { rmClient = ClientRMProxy.createRMProxy( conf, ApplicationMasterProtocol.class); + initApplicationAttemptId(); } } catch (IOException e) { throw new YarnRuntimeException(e); @@ -252,8 +263,9 @@ private RegisterApplicationMasterResponse registerApplicationMaster() this.resourceProfilesMap = response.getResourceProfiles(); List prevContainers = response.getContainersFromPreviousAttempts(); - AMRMClientUtils.removeFromOutstandingSchedulingRequests(prevContainers, - this.outstandingSchedRequests); + AMRMClientUtils.removePreviousContainersFromOutstandingSchedulingRequests( + prevContainers, appAttemptId, containersFromPreviousAttempts, + outstandingSchedRequests); } return response; } @@ -381,12 +393,20 @@ public AllocateResponse allocate(float progressIndicator) removePendingChangeRequests(changed); } } + if (!allocateResponse.getCompletedContainersStatuses().isEmpty() + && !containersFromPreviousAttempts.isEmpty()) { + AMRMClientUtils.refreshContainersFromPreviousAttempts( + allocateResponse.getCompletedContainersStatuses(), + appAttemptId, containersFromPreviousAttempts); + } AMRMClientUtils.removeFromOutstandingSchedulingRequests( allocateResponse.getAllocatedContainers(), this.outstandingSchedRequests); - AMRMClientUtils.removeFromOutstandingSchedulingRequests( - allocateResponse.getContainersFromPreviousAttempts(), - this.outstandingSchedRequests); + AMRMClientUtils + .removePreviousContainersFromOutstandingSchedulingRequests( + allocateResponse.getContainersFromPreviousAttempts(), + appAttemptId, containersFromPreviousAttempts, + outstandingSchedRequests); } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -1046,4 +1066,28 @@ private void updateAMRMToken(Token token) throws IOException { RemoteRequestsTable table) { return remoteRequests.put(Long.valueOf(allocationRequestId), table); } + + private void initApplicationAttemptId() { + try { + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + Iterator> iter = + credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + org.apache.hadoop.security.token.Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + AMRMTokenIdentifier amrmTokenIdentifier = + ((org.apache.hadoop.security.token.Token) + token).decodeIdentifier(); + synchronized (this) { + appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); + } + } + } + } catch (Exception e) { + LOG.warn("Failed to get current application attempt ID, ", e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java index 3465274a3a0..c4c2d2691b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java @@ -85,7 +85,12 @@ @Before public void setup() throws Exception { - conf = new YarnConfiguration(); + setup(new YarnConfiguration(), 3); + } + + public void setup(Configuration conf, int nodeCount) throws Exception { + this.conf = conf; + this.nodeCount = nodeCount; createClusterAndStartApplication(conf); } @@ -157,6 +162,7 @@ protected void createClusterAndStartApplication(Configuration conf) new HashMap()); appContext.setAMContainerSpec(amContainer); appContext.setResource(Resource.newInstance(1024, 1)); + appContext.setKeepContainersAcrossApplicationAttempts(true); // Create the request to send to the applications manager SubmitApplicationRequest appRequest = Records .newRecord(SubmitApplicationRequest.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientUsingSchedulingRequestOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientUsingSchedulingRequestOnRMRestart.java new file mode 100644 index 00000000000..2268f131c50 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientUsingSchedulingRequestOnRMRestart.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestAMRMClientUsingSchedulingRequestOnRMRestart + extends BaseAMRMClientTest { + private static final Logger LOG = LoggerFactory.getLogger( + TestAMRMClientUsingSchedulingRequestOnRMRestart.class.getName()); + + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + private String workingDirPathURI; + + private NMTokenCache nmTokenCache; + private AMRMClientImpl amClient; + private AMRMClientAsyncImpl amAsyncClient; + private NMClientImpl nmClient; + + @Override + @Before + public void setup() throws Exception { + } + + @Test + public void testAMRMClientWithNegativePendingRequestsOnRMRestart() + throws Exception { + testAMRMClientOnPendingRequestsOnRMRestart(true); + } + + @Test + public void testAMRMClientOnUnexpectedlyDecreasedPendingRequestsOnRMRestart() + throws Exception { + testAMRMClientOnPendingRequestsOnRMRestart(false); + } + + private void testAMRMClientOnPendingRequestsOnRMRestart( + boolean testNegative) throws Exception { + workingDirPathURI = "file://" + tempDir.newFolder(); + + // 0. prepare configurations + Configuration conf = new YarnConfiguration(); + + // RM HA related configurations + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); + conf.set(YarnConfiguration.RM_STORE, FileSystemRMStateStore.class.getName()); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, + workingDirPathURI.toString()); + conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8); + conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, 100L); + + conf.setInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 3000); + + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); + + // NM resource configurations + // 1024MB (for AM container) + 2048MB * 2 (for Non-AM containers) + conf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, 5120); + // 1 vcore per container + conf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_VCORES, 3); + conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "1"); + + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + + super.setup(conf, 1); + + nmTokenCache = new NMTokenCache(); + + amClient = new AMRMClientImpl(); + amClient.setNMTokenCache(nmTokenCache); + + List allocatedContainers = new ArrayList<>(); + List rejectedSchedulingRequests = + new ArrayList<>(); + + amAsyncClient = new AMRMClientAsyncImpl<>(amClient, 3000, + new AMRMClientAsync.AbstractCallbackHandler() { + @Override + public void onContainersAllocated(List containers) { + allocatedContainers.addAll(containers); + } + + @Override + public void onRequestsRejected( + List rejReqs) { + rejectedSchedulingRequests.addAll(rejReqs); + } + + @Override + public void onContainersCompleted(List statuses) {} + @Override + public void onContainersUpdated(List containers) {} + @Override + public void onShutdownRequest() {} + @Override + public void onNodesUpdated(List updatedNodes) {} + @Override + public void onError(Throwable e) {} + + @Override + public float getProgress() { + return 0.1f; + } + }); + + Configuration confForAmAsyncClient = new Configuration(conf); + // Once AM-RM client finds RM disconnects, wait for a relatively long time + // in order to wait for NM reporting to RM after RM restarts. Thus AM-RM + // client could receive previously allocated containers again. + confForAmAsyncClient.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, + 5000); + + amAsyncClient.init(confForAmAsyncClient); + amAsyncClient.start(); + assertEquals(Service.STATE.STARTED, amClient.getServiceState()); + + // start am nm client + nmClient = (NMClientImpl) NMClient.createNMClient(); + + //propagating the AMRMClient NMTokenCache instance + nmClient.setNMTokenCache(amClient.getNMTokenCache()); + nmClient.init(conf); + nmClient.start(); + assertNotNull(nmClient); + assertEquals(Service.STATE.STARTED, nmClient.getServiceState()); + + amAsyncClient.registerApplicationMaster(MiniYARNCluster.getHostname(), 0, ""); + amAsyncClient.addSchedulingRequests( + Arrays.asList( + schedulingRequest(10, 1, 1, 1, 2048, "foo"))); + + assertTrue(waitForContainerAllocation(allocatedContainers, + rejectedSchedulingRequests, 2, 0)); + + { + Map, List> outstandingRequestMap = + amClient.getOutstandingSchedRequests(); + assertEquals(1, outstandingRequestMap.size()); + List outstandingRequests = outstandingRequestMap + .values().iterator().next(); + assertEquals(1, outstandingRequests.size()); + assertEquals(8, outstandingRequests.get(0).getResourceSizing().getNumAllocations()); + } + + startContainers(allocatedContainers); + + if (testNegative) { + // decrease the numAllocations of current request id to zero + amAsyncClient.addSchedulingRequests( + Arrays.asList( + schedulingRequest(0, 1, 1, 1, 2048, "foo"))); + } + + // TODO: need to make sure AM-RM client has updated its outstandingRequests. + sleep(3000); + + LOG.info("Restart ResourceManager."); + yarnCluster.stopResourceManager(yarnCluster.getActiveRMIndex()); + Thread.sleep(5000); + yarnCluster.restartResourceManager(yarnCluster.getActiveRMIndex()); + + while (yarnCluster.getResourceManager(yarnCluster.getActiveRMIndex()) + .getRMContext().getRMNodes().size() < 1) { + LOG.info("Wait for nm1 registering rm1, sleep 1s."); + sleep(1000); + } + + int containerCnt = -1; + while ((containerCnt = ((RMNodeImpl) yarnCluster.getResourceManager() + .getRMContext().getRMNodes().values().iterator().next()) + .getLaunchedContainers().size()) < 3) { + LOG.info("Wait for nm1 launched containers, current container size: " + + containerCnt); + } + LOG.info("ResourceManager recovered."); + + Thread.sleep(5000); + + { + Map, List> outstandingRequestMap = + amClient.getOutstandingSchedRequests(); + assertEquals(1, outstandingRequestMap.size()); + List outstandingRequests = + outstandingRequestMap.values().iterator().next(); + assertEquals(1, outstandingRequests.size()); + + if (testNegative) { + // NumAllocations won't turn negative. + assertEquals(0, outstandingRequests.get(0).getResourceSizing().getNumAllocations()); + } else { + // NumAllocations won't get decreased. + assertEquals(8, outstandingRequests.get(0).getResourceSizing().getNumAllocations()); + } + } + } + + private static boolean waitForContainerAllocation( + List allocatedContainers, + List rejectedRequests, + int containerNum, int rejNum) throws Exception { + + int maxCount = 10; + while (maxCount >= 0 && + (allocatedContainers.size() < containerNum || + rejectedRequests.size() < rejNum)) { + maxCount--; + sleep(1000); + } + if (allocatedContainers.size() < containerNum || + rejectedRequests.size() < rejNum) { + LOG.error("Failed to wait for expected allocation status, " + + "allocatedContainerSize: " + allocatedContainers.size() + + ", rejectedRequestSize: " + rejectedRequests.size()); + return false; + } else { + return true; + } + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, String... tags) { + return schedulingRequest(numAllocations, priority, allocReqId, cores, mem, + ExecutionType.GUARANTEED, tags); + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, + ExecutionType execType, String... tags) { + return SchedulingRequest.newBuilder() + .priority(Priority.newInstance(priority)) + .allocationRequestId(allocReqId) + .allocationTags(new HashSet<>(Arrays.asList(tags))) + .executionType(ExecutionTypeRequest.newInstance(execType, true)) + .resourceSizing( + ResourceSizing.newInstance(numAllocations, + Resource.newInstance(mem, cores))) + .build(); + } + + private void startContainers(List allocatedContainers) + throws Exception { + if (allocatedContainers.size() < 1) { + return; + } + + LOG.info("Start containers, count: " + allocatedContainers.size()); + for (Container container : allocatedContainers) { + Credentials ts = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ContainerLaunchContext clc = Records.newRecord(ContainerLaunchContext.class); + if (Shell.WINDOWS) { + clc.setCommands( + Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul")); + } else { + clc.setCommands(Arrays.asList("sleep", "1000000")); + } + clc.setTokens(securityTokens); + + nmClient.startContainer(container, clc); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index 1e363cbf24c..bed5712cf92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -38,6 +39,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -185,7 +188,7 @@ public static boolean isMatchingSchedulingRequests( SchedulingRequest schedReq1, SchedulingRequest schedReq2) { return schedReq1.getPriority().equals(schedReq2.getPriority()) && schedReq1.getExecutionType().getExecutionType().equals( - schedReq1.getExecutionType().getExecutionType()) && + schedReq2.getExecutionType().getExecutionType()) && schedReq1.getAllocationRequestId() == schedReq2.getAllocationRequestId(); } @@ -210,7 +213,7 @@ public static void removeFromOutstandingSchedulingRequests( int numAllocations = schedReq.getResourceSizing().getNumAllocations(); numAllocations--; - if (numAllocations == 0) { + if (numAllocations <= 0) { iter.remove(); } else { schedReq.getResourceSizing().setNumAllocations(numAllocations); @@ -221,4 +224,38 @@ public static void removeFromOutstandingSchedulingRequests( } } } + + public static void removePreviousContainersFromOutstandingSchedulingRequests( + List prevContainers, + ApplicationAttemptId appAttemptId, + Set containersFromPreviousAttempts, + Map, List> outstandingSchedRequests) { + if (prevContainers == null || prevContainers.isEmpty()) { + return; + } + List incrPrevContainers = new ArrayList<>(prevContainers.size()); + prevContainers.forEach(container -> { + if (!container.getId().getApplicationAttemptId().equals(appAttemptId) + && !containersFromPreviousAttempts.contains(container)) { + LOG.info("Received container " + container.getId() + + " from previous attempt."); + incrPrevContainers.add(container); + containersFromPreviousAttempts.add(container.getId()); + } + }); + removeFromOutstandingSchedulingRequests(incrPrevContainers, + outstandingSchedRequests); + } + + public static void refreshContainersFromPreviousAttempts( + List completedContainerStatuses, + ApplicationAttemptId appAttemptId, + Set containersFromPreviousAttempts) { + completedContainerStatuses.forEach(containerStatus -> { + if (!containerStatus.getContainerId().getApplicationAttemptId().equals( + appAttemptId)) { + containersFromPreviousAttempts.remove(containerStatus.getContainerId()); + } + }); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index fea635b42bc..af38841deb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2833,6 +2833,15 @@ 4096 + + + As yarn.nodemanager.resource.cpu-vcores property but for the NodeManager + in a MiniYARNCluster. + + yarn.minicluster.yarn.nodemanager.resource.cpu-vcores + 8 + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 70b7498b9e6..bbbd026e3a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -49,11 +49,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -388,6 +391,9 @@ public AllocateResponse allocate(AllocateRequest request) this.amLivelinessMonitor.receivedPing(appAttemptId); + // Sanity check on incoming AllocateRequest. + validateAllocateRequest(request, appAttemptId); + /* check if its in cache */ AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { @@ -524,4 +530,41 @@ public synchronized void setAllocateResponse(AllocateResponse response) { public Server getServer() { return this.server; } + + /** + * Perform sanity check on incoming AllocateRequest. + * @param request allocation request + * @param appAttemptId application attempt ID of this request + */ + private static void validateAllocateRequest( + AllocateRequest request, ApplicationAttemptId appAttemptId) + throws InvalidResourceRequestException { + try { + if (request.getAskList() != null && !request.getAskList().isEmpty()) { + for (ResourceRequest rr : request.getAskList()) { + if (rr.getNumContainers() < 0) { + throw new InvalidResourceRequestException( + "numContainers of ResourceRequest should not be negative [" + + rr.getNumContainers() + "], request detail: " + + rr.toString()); + } + } + } + if (request.getSchedulingRequests() != null + && !request.getSchedulingRequests().isEmpty()) { + for (SchedulingRequest schedReq : request.getSchedulingRequests()) { + if (schedReq.getResourceSizing().getNumAllocations() < 0) { + throw new InvalidResourceRequestException( + "numAllocations of SchedulingRequest should not be negative [" + + schedReq.getResourceSizing().getNumAllocations() + + "], request detail: " + schedReq.toString()); + } + } + } + } catch (InvalidResourceRequestException e) { + LOG.warn("Invalid AllocateRequest from application attempt: " + + appAttemptId + " , error message: " + e.getMessage()); + throw e; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java index 7fc2a53cc19..c4b93bdcf68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java @@ -25,10 +25,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; @@ -579,4 +583,49 @@ public void testRequestCapacityMinMaxAllocationForResourceTypes() rm.close(); } + + @Test(timeout = 300000) + public void testInvalidAllocateRequest() throws Exception { + MockRM rm = new MockRM(conf); + + try { + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(1024); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + am1.addRequests(new String[] { "127.0.0.1" }, GB, 1, -1); + try { + AllocateResponse alloc1Response = am1.schedule(); // send the request + Assert.assertTrue("Expect exception occurs", false); + } catch (InvalidResourceRequestException e) { + // Get expected exception. + } + + am1.addSchedulingRequest(Collections.singletonList( + SchedulingRequest.newInstance(1L, Priority.newInstance(0), + ExecutionTypeRequest.newInstance(), Collections.emptySet(), + ResourceSizing.newInstance(-1, Resource.newInstance(100, 1)), + null))); + try { + AllocateResponse alloc1Response = am1.schedule(); // send the request + Assert.assertTrue("Expect exception occurs", false); + } catch (InvalidResourceRequestException e) { + // Get expected exception. + } + } finally { + if (rm != null) { + rm.stop(); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index fa69f186d97..a7d039415af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -572,6 +572,10 @@ protected synchronized void serviceInit(Configuration conf) YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_NM_PMEM_MB)); + config.setInt(YarnConfiguration.NM_VCORES, config.getInt( + YarnConfiguration.YARN_MINICLUSTER_NM_VCORES, + YarnConfiguration.DEFAULT_YARN_MINICLUSTER_NM_VCORES)); + config.set(YarnConfiguration.NM_ADDRESS, MiniYARNCluster.getHostname() + ":0"); config.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,