diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c8ac82a6ea7..69519e1f31d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2507,6 +2507,11 @@ public static boolean isAclEnabled(Configuration conf) { YARN_MC_PREFIX + YarnConfiguration.NM_PMEM_MB; public static final int DEFAULT_YARN_MINICLUSTER_NM_PMEM_MB = 4 * 1024; + /** Allow changing the vcore for the NodeManager in the MiniYARNCluster */ + public static final String YARN_MINICLUSTER_NM_VCORES = + YARN_MC_PREFIX + YarnConfiguration.NM_VCORES; + public static final int DEFAULT_YARN_MINICLUSTER_NM_VCORES = 8; + /** The log directory for the containers */ public static final String YARN_APP_CONTAINER_LOG_DIR = YARN_PREFIX + "app.container.log.dir"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index c05f7acfd28..e6d2170ac32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -38,8 +38,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -96,6 +99,8 @@ private int lastResponseId = 0; + protected ApplicationAttemptId appAttemptId; + protected String appHostName; protected int appHostPort; protected String appTrackingUrl; @@ -167,6 +172,11 @@ public int compare(Resource res0, Resource res1) { private List schedulingRequests = new ArrayList<>(); private Map, List> outstandingSchedRequests = new HashMap<>(); + // containersFromPreviousAttempts holds containers from previous attempts. + // AMRMClient may receive containers from previous attempts once RM restarts, + // so it's necessary to distinguish newly reported containers from known ones. + protected final Set containersFromPreviousAttempts = + new TreeSet<>(); public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); @@ -191,6 +201,7 @@ protected void serviceStart() throws Exception { if (rmClient == null) { rmClient = ClientRMProxy.createRMProxy( conf, ApplicationMasterProtocol.class); + appAttemptId = getApplicationAttempt(); } } catch (IOException e) { throw new YarnRuntimeException(e); @@ -252,8 +263,9 @@ private RegisterApplicationMasterResponse registerApplicationMaster() this.resourceProfilesMap = response.getResourceProfiles(); List prevContainers = response.getContainersFromPreviousAttempts(); - AMRMClientUtils.removeFromOutstandingSchedulingRequests(prevContainers, - this.outstandingSchedRequests); + AMRMClientUtils.removePreviousContainersFromOutstandingSchedulingRequests( + prevContainers, appAttemptId, containersFromPreviousAttempts, + outstandingSchedRequests); } return response; } @@ -381,12 +393,20 @@ public AllocateResponse allocate(float progressIndicator) removePendingChangeRequests(changed); } } + if (!allocateResponse.getCompletedContainersStatuses().isEmpty() + && !containersFromPreviousAttempts.isEmpty()) { + AMRMClientUtils.refreshContainersFromPreviousAttempts( + allocateResponse.getCompletedContainersStatuses(), + appAttemptId, containersFromPreviousAttempts); + } AMRMClientUtils.removeFromOutstandingSchedulingRequests( allocateResponse.getAllocatedContainers(), this.outstandingSchedRequests); - AMRMClientUtils.removeFromOutstandingSchedulingRequests( - allocateResponse.getContainersFromPreviousAttempts(), - this.outstandingSchedRequests); + AMRMClientUtils + .removePreviousContainersFromOutstandingSchedulingRequests( + allocateResponse.getContainersFromPreviousAttempts(), + appAttemptId, containersFromPreviousAttempts, + outstandingSchedRequests); } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -1046,4 +1066,27 @@ private void updateAMRMToken(Token token) throws IOException { RemoteRequestsTable table) { return remoteRequests.put(Long.valueOf(allocationRequestId), table); } + + private static ApplicationAttemptId getApplicationAttempt() { + try { + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + Iterator> iter = + credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + org.apache.hadoop.security.token.Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + AMRMTokenIdentifier amrmTokenIdentifier = + ((org.apache.hadoop.security.token.Token) + token).decodeIdentifier(); + return amrmTokenIdentifier.getApplicationAttemptId(); + } + } + } catch (Exception e) { + LOG.warn("Failed to get current application attempt ID, ", e); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java index 3465274a3a0..c4c2d2691b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java @@ -85,7 +85,12 @@ @Before public void setup() throws Exception { - conf = new YarnConfiguration(); + setup(new YarnConfiguration(), 3); + } + + public void setup(Configuration conf, int nodeCount) throws Exception { + this.conf = conf; + this.nodeCount = nodeCount; createClusterAndStartApplication(conf); } @@ -157,6 +162,7 @@ protected void createClusterAndStartApplication(Configuration conf) new HashMap()); appContext.setAMContainerSpec(amContainer); appContext.setResource(Resource.newInstance(1024, 1)); + appContext.setKeepContainersAcrossApplicationAttempts(true); // Create the request to send to the applications manager SubmitApplicationRequest appRequest = Records .newRecord(SubmitApplicationRequest.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientUsingSchedulingRequestOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientUsingSchedulingRequestOnRMRestart.java new file mode 100644 index 00000000000..2268f131c50 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientUsingSchedulingRequestOnRMRestart.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestAMRMClientUsingSchedulingRequestOnRMRestart + extends BaseAMRMClientTest { + private static final Logger LOG = LoggerFactory.getLogger( + TestAMRMClientUsingSchedulingRequestOnRMRestart.class.getName()); + + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + private String workingDirPathURI; + + private NMTokenCache nmTokenCache; + private AMRMClientImpl amClient; + private AMRMClientAsyncImpl amAsyncClient; + private NMClientImpl nmClient; + + @Override + @Before + public void setup() throws Exception { + } + + @Test + public void testAMRMClientWithNegativePendingRequestsOnRMRestart() + throws Exception { + testAMRMClientOnPendingRequestsOnRMRestart(true); + } + + @Test + public void testAMRMClientOnUnexpectedlyDecreasedPendingRequestsOnRMRestart() + throws Exception { + testAMRMClientOnPendingRequestsOnRMRestart(false); + } + + private void testAMRMClientOnPendingRequestsOnRMRestart( + boolean testNegative) throws Exception { + workingDirPathURI = "file://" + tempDir.newFolder(); + + // 0. prepare configurations + Configuration conf = new YarnConfiguration(); + + // RM HA related configurations + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); + conf.set(YarnConfiguration.RM_STORE, FileSystemRMStateStore.class.getName()); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, + workingDirPathURI.toString()); + conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8); + conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, 100L); + + conf.setInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 3000); + + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); + + // NM resource configurations + // 1024MB (for AM container) + 2048MB * 2 (for Non-AM containers) + conf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, 5120); + // 1 vcore per container + conf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_VCORES, 3); + conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "1"); + + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + + super.setup(conf, 1); + + nmTokenCache = new NMTokenCache(); + + amClient = new AMRMClientImpl(); + amClient.setNMTokenCache(nmTokenCache); + + List allocatedContainers = new ArrayList<>(); + List rejectedSchedulingRequests = + new ArrayList<>(); + + amAsyncClient = new AMRMClientAsyncImpl<>(amClient, 3000, + new AMRMClientAsync.AbstractCallbackHandler() { + @Override + public void onContainersAllocated(List containers) { + allocatedContainers.addAll(containers); + } + + @Override + public void onRequestsRejected( + List rejReqs) { + rejectedSchedulingRequests.addAll(rejReqs); + } + + @Override + public void onContainersCompleted(List statuses) {} + @Override + public void onContainersUpdated(List containers) {} + @Override + public void onShutdownRequest() {} + @Override + public void onNodesUpdated(List updatedNodes) {} + @Override + public void onError(Throwable e) {} + + @Override + public float getProgress() { + return 0.1f; + } + }); + + Configuration confForAmAsyncClient = new Configuration(conf); + // Once AM-RM client finds RM disconnects, wait for a relatively long time + // in order to wait for NM reporting to RM after RM restarts. Thus AM-RM + // client could receive previously allocated containers again. + confForAmAsyncClient.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, + 5000); + + amAsyncClient.init(confForAmAsyncClient); + amAsyncClient.start(); + assertEquals(Service.STATE.STARTED, amClient.getServiceState()); + + // start am nm client + nmClient = (NMClientImpl) NMClient.createNMClient(); + + //propagating the AMRMClient NMTokenCache instance + nmClient.setNMTokenCache(amClient.getNMTokenCache()); + nmClient.init(conf); + nmClient.start(); + assertNotNull(nmClient); + assertEquals(Service.STATE.STARTED, nmClient.getServiceState()); + + amAsyncClient.registerApplicationMaster(MiniYARNCluster.getHostname(), 0, ""); + amAsyncClient.addSchedulingRequests( + Arrays.asList( + schedulingRequest(10, 1, 1, 1, 2048, "foo"))); + + assertTrue(waitForContainerAllocation(allocatedContainers, + rejectedSchedulingRequests, 2, 0)); + + { + Map, List> outstandingRequestMap = + amClient.getOutstandingSchedRequests(); + assertEquals(1, outstandingRequestMap.size()); + List outstandingRequests = outstandingRequestMap + .values().iterator().next(); + assertEquals(1, outstandingRequests.size()); + assertEquals(8, outstandingRequests.get(0).getResourceSizing().getNumAllocations()); + } + + startContainers(allocatedContainers); + + if (testNegative) { + // decrease the numAllocations of current request id to zero + amAsyncClient.addSchedulingRequests( + Arrays.asList( + schedulingRequest(0, 1, 1, 1, 2048, "foo"))); + } + + // TODO: need to make sure AM-RM client has updated its outstandingRequests. + sleep(3000); + + LOG.info("Restart ResourceManager."); + yarnCluster.stopResourceManager(yarnCluster.getActiveRMIndex()); + Thread.sleep(5000); + yarnCluster.restartResourceManager(yarnCluster.getActiveRMIndex()); + + while (yarnCluster.getResourceManager(yarnCluster.getActiveRMIndex()) + .getRMContext().getRMNodes().size() < 1) { + LOG.info("Wait for nm1 registering rm1, sleep 1s."); + sleep(1000); + } + + int containerCnt = -1; + while ((containerCnt = ((RMNodeImpl) yarnCluster.getResourceManager() + .getRMContext().getRMNodes().values().iterator().next()) + .getLaunchedContainers().size()) < 3) { + LOG.info("Wait for nm1 launched containers, current container size: " + + containerCnt); + } + LOG.info("ResourceManager recovered."); + + Thread.sleep(5000); + + { + Map, List> outstandingRequestMap = + amClient.getOutstandingSchedRequests(); + assertEquals(1, outstandingRequestMap.size()); + List outstandingRequests = + outstandingRequestMap.values().iterator().next(); + assertEquals(1, outstandingRequests.size()); + + if (testNegative) { + // NumAllocations won't turn negative. + assertEquals(0, outstandingRequests.get(0).getResourceSizing().getNumAllocations()); + } else { + // NumAllocations won't get decreased. + assertEquals(8, outstandingRequests.get(0).getResourceSizing().getNumAllocations()); + } + } + } + + private static boolean waitForContainerAllocation( + List allocatedContainers, + List rejectedRequests, + int containerNum, int rejNum) throws Exception { + + int maxCount = 10; + while (maxCount >= 0 && + (allocatedContainers.size() < containerNum || + rejectedRequests.size() < rejNum)) { + maxCount--; + sleep(1000); + } + if (allocatedContainers.size() < containerNum || + rejectedRequests.size() < rejNum) { + LOG.error("Failed to wait for expected allocation status, " + + "allocatedContainerSize: " + allocatedContainers.size() + + ", rejectedRequestSize: " + rejectedRequests.size()); + return false; + } else { + return true; + } + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, String... tags) { + return schedulingRequest(numAllocations, priority, allocReqId, cores, mem, + ExecutionType.GUARANTEED, tags); + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, + ExecutionType execType, String... tags) { + return SchedulingRequest.newBuilder() + .priority(Priority.newInstance(priority)) + .allocationRequestId(allocReqId) + .allocationTags(new HashSet<>(Arrays.asList(tags))) + .executionType(ExecutionTypeRequest.newInstance(execType, true)) + .resourceSizing( + ResourceSizing.newInstance(numAllocations, + Resource.newInstance(mem, cores))) + .build(); + } + + private void startContainers(List allocatedContainers) + throws Exception { + if (allocatedContainers.size() < 1) { + return; + } + + LOG.info("Start containers, count: " + allocatedContainers.size()); + for (Container container : allocatedContainers) { + Credentials ts = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ContainerLaunchContext clc = Records.newRecord(ContainerLaunchContext.class); + if (Shell.WINDOWS) { + clc.setCommands( + Arrays.asList("ping", "-n", "10000000", "127.0.0.1", ">nul")); + } else { + clc.setCommands(Arrays.asList("sleep", "1000000")); + } + clc.setTokens(securityTokens); + + nmClient.startContainer(container, clc); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index 1e363cbf24c..bed5712cf92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -38,6 +39,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -185,7 +188,7 @@ public static boolean isMatchingSchedulingRequests( SchedulingRequest schedReq1, SchedulingRequest schedReq2) { return schedReq1.getPriority().equals(schedReq2.getPriority()) && schedReq1.getExecutionType().getExecutionType().equals( - schedReq1.getExecutionType().getExecutionType()) && + schedReq2.getExecutionType().getExecutionType()) && schedReq1.getAllocationRequestId() == schedReq2.getAllocationRequestId(); } @@ -210,7 +213,7 @@ public static void removeFromOutstandingSchedulingRequests( int numAllocations = schedReq.getResourceSizing().getNumAllocations(); numAllocations--; - if (numAllocations == 0) { + if (numAllocations <= 0) { iter.remove(); } else { schedReq.getResourceSizing().setNumAllocations(numAllocations); @@ -221,4 +224,38 @@ public static void removeFromOutstandingSchedulingRequests( } } } + + public static void removePreviousContainersFromOutstandingSchedulingRequests( + List prevContainers, + ApplicationAttemptId appAttemptId, + Set containersFromPreviousAttempts, + Map, List> outstandingSchedRequests) { + if (prevContainers == null || prevContainers.isEmpty()) { + return; + } + List incrPrevContainers = new ArrayList<>(prevContainers.size()); + prevContainers.forEach(container -> { + if (!container.getId().getApplicationAttemptId().equals(appAttemptId) + && !containersFromPreviousAttempts.contains(container)) { + LOG.info("Received container " + container.getId() + + " from previous attempt."); + incrPrevContainers.add(container); + containersFromPreviousAttempts.add(container.getId()); + } + }); + removeFromOutstandingSchedulingRequests(incrPrevContainers, + outstandingSchedRequests); + } + + public static void refreshContainersFromPreviousAttempts( + List completedContainerStatuses, + ApplicationAttemptId appAttemptId, + Set containersFromPreviousAttempts) { + completedContainerStatuses.forEach(containerStatus -> { + if (!containerStatus.getContainerId().getApplicationAttemptId().equals( + appAttemptId)) { + containersFromPreviousAttempts.remove(containerStatus.getContainerId()); + } + }); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index fa69f186d97..a7d039415af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -572,6 +572,10 @@ protected synchronized void serviceInit(Configuration conf) YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_NM_PMEM_MB)); + config.setInt(YarnConfiguration.NM_VCORES, config.getInt( + YarnConfiguration.YARN_MINICLUSTER_NM_VCORES, + YarnConfiguration.DEFAULT_YARN_MINICLUSTER_NM_VCORES)); + config.set(YarnConfiguration.NM_ADDRESS, MiniYARNCluster.getHostname() + ":0"); config.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,