diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index ac3af4f..725ab91 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1517,7 +1517,8 @@ public void transition(final TaskAttemptImpl taskAttempt, taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent( taskAttempt.attemptId, taskAttempt.containerID, taskAttempt.containerMgrAddress, taskAttempt.containerToken, - launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask)); + launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask, + cEvent.getContainer().getClusterTimeStamp())); // send event to speculator that our container needs are satisfied taskAttempt.eventHandler.handle diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 86281f6..99ec211 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -153,7 +153,8 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) { org.apache.hadoop.yarn.api.records.Container container = BuilderUtils.newContainer(containerID, null, null, - event.getResource(), null, containerToken); + event.getResource(), null, containerToken, + event.getClusterTimeStamp()); // Now launch the actual container StartContainerRequest startRequest = Records .newRecord(StartContainerRequest.class); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java index eb95f3b..08a4864 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java @@ -30,6 +30,7 @@ private final ContainerLaunchContext container; private final Task task; private final Resource resource; + private long clusterTimeStamp; public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID, ContainerId containerID, String containerMgrAddress, @@ -42,6 +43,18 @@ public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID, this.task = remoteTask; this.resource = resource; } + public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID, + ContainerId containerID, String containerMgrAddress, + ContainerToken containerToken, + ContainerLaunchContext containerLaunchContext, Resource resource, + Task remoteTask, long clusterTimeStamp) { + super(taskAttemptID, containerID, containerMgrAddress, containerToken, + ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH); + this.container = containerLaunchContext; + this.task = remoteTask; + this.resource = resource; + this.clusterTimeStamp = clusterTimeStamp; + } public ContainerLaunchContext getContainer() { return this.container; @@ -55,6 +68,10 @@ public Task getRemoteTask() { return this.task; } + public long getClusterTimeStamp() { + return this.clusterTimeStamp; + } + @Override public int hashCode() { return super.hashCode(); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 4ef4d8d..c187bc8 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -519,7 +519,7 @@ public void handle(ContainerAllocatorEvent event) { cId.setId(containerCount++); NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT); Container container = BuilderUtils.newContainer(cId, nodeId, - NM_HOST + ":" + NM_HTTP_PORT, null, null, null); + NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0); JobID id = TypeConverter.fromYarn(applicationId); JobId jobId = TypeConverter.toYarn(id); getContext().getEventHandler().handle(new JobHistoryEvent(jobId, diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java index efb8b7a..380df64 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java @@ -243,7 +243,7 @@ public AllocateResponse allocate(AllocateRequest request) .newContainer(containerId, BuilderUtils.newNodeId("host" + containerId.getId(), 2345), "host" + containerId.getId() + ":5678", req - .getCapability(), req.getPriority(), null)); + .getCapability(), req.getPriority(), null, 0)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 9478d34..9b67b77 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -135,4 +135,16 @@ @Private @Unstable void setContainerToken(ContainerToken containerToken); + + /** + * Get the clusterTimeStamp of RM in which containers are allocated + * @return clusterTimeStamp + */ + @Private + @Unstable + long getClusterTimeStamp(); + + @Private + @Unstable + void setClusterTimeStamp(long clusterTimeStamp); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index dd6941f..a39262d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -230,6 +230,18 @@ public void setContainerToken(ContainerToken containerToken) { this.containerToken = containerToken; } + @Override + public long getClusterTimeStamp() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + return p.getClusterTimeStamp(); + } + + @Override + public void setClusterTimeStamp(long clusterTimeStamp) { + maybeInitBuilder(); + builder.setClusterTimeStamp((clusterTimeStamp)); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index aec162c..1031cc6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -68,6 +68,7 @@ message ContainerProto { optional ResourceProto resource = 4; optional PriorityProto priority = 5; optional hadoop.common.TokenProto container_token = 6; + optional int64 cluster_time_stamp = 7; } enum YarnApplicationStateProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java index bbb8ad1..d95ce64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java @@ -56,7 +56,7 @@ public void testAMRMClientAsync() throws Exception { BuilderUtils.newContainerId(0, 0, 0, 0), ContainerState.COMPLETE, "", 0)); List allocated1 = Arrays.asList( - BuilderUtils.newContainer(null, null, null, null, null, null)); + BuilderUtils.newContainer(null, null, null, null, null, null, 0)); final AllocateResponse response1 = createAllocateResponse( new ArrayList(), allocated1); final AllocateResponse response2 = createAllocateResponse(completed1, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index c7502c1..dbb44a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -237,9 +237,9 @@ public static ContainerStatus newContainerStatus(ContainerId containerId, return containerStatus; } - public static Container newContainer(ContainerId containerId, - NodeId nodeId, String nodeHttpAddress, - Resource resource, Priority priority, ContainerToken containerToken) { + public static Container newContainer(ContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + ContainerToken containerToken, long clusterTimeStamp) { Container container = recordFactory.newRecordInstance(Container.class); container.setId(containerId); container.setNodeId(nodeId); @@ -247,6 +247,7 @@ public static Container newContainer(ContainerId containerId, container.setResource(resource); container.setPriority(priority); container.setContainerToken(containerToken); + container.setClusterTimeStamp(clusterTimeStamp); return container; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 295a38c..7454955 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -105,7 +105,7 @@ private void testRPCTimeout(String rpcClass) throws Exception { containerId.setId(100); Container container = BuilderUtils.newContainer(containerId, null, null, recordFactory - .newRecordInstance(Resource.class), null, null); + .newRecordInstance(Resource.class), null, null, 0); StartContainerRequest scRequest = recordFactory .newRecordInstance(StartContainerRequest.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 7d941e9..92bbb8d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -128,7 +128,7 @@ private void test(String rpcClass) throws Exception { containerId.setId(100); Container mockContainer = BuilderUtils.newContainer(containerId, null, null, recordFactory - .newRecordInstance(Resource.class), null, null); + .newRecordInstance(Resource.class), null, null, 0); // containerLaunchContext.env = new HashMap(); // containerLaunchContext.command = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index 1c1a9dd..1470d35 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -30,4 +30,7 @@ void setNodeAction(NodeAction nodeAction); + long getClusterTimeStamp(); + + void setClusterTimeStamp(long clusterTimestamp); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index c28d4c9..1d90eea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -121,6 +121,18 @@ public void setNodeAction(NodeAction nodeAction) { rebuild = true; } + @Override + public long getClusterTimeStamp() { + RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.getClusterTimeStamp()); + } + + @Override + public void setClusterTimeStamp(long clusterTimestamp) { + maybeInitBuilder(); + builder.setClusterTimeStamp(clusterTimestamp); + } + private NodeAction convertFromProtoFormat(NodeActionProto p) { return NodeAction.valueOf(p.name()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 931dccd..66e32ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -33,6 +33,7 @@ message RegisterNodeManagerRequestProto { message RegisterNodeManagerResponseProto { optional MasterKeyProto master_key = 1; optional NodeActionProto nodeAction = 2; + optional int64 cluster_time_stamp = 3; } message NodeHeartbeatRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a5d16c5..390f92d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -226,8 +226,13 @@ protected void cleanupContainersOnResync() { new Thread() { @Override public void run() { + containerManager.setBlockNewContainerRequest(true); + LOG.info("Notify container manager to " + + "block new container requests while resyncing"); cleanupContainers(NodeManagerEventType.RESYNC); ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater(); + LOG.info("Notify container manager to reset blockNewContainerRequest"); + containerManager.setBlockNewContainerRequest(false); } }.start(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index 41949e7..cbd7302 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -25,4 +25,5 @@ void sendOutofBandHeartBeat(); NodeStatus getNodeStatusAndUpdateContainersInContext(); + long getClusterTimeStamp(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index e9583c2..50b0384 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -95,6 +95,7 @@ private Runnable statusUpdaterRunnable; private Thread statusUpdater; + private long clusterTimeStamp = 0; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -267,6 +268,7 @@ protected void registerWithRM() throws YarnRemoteException { this.resourceTracker = getRMClient(); regNMResponse = this.resourceTracker.registerNodeManager(request); + this.clusterTimeStamp = regNMResponse.getClusterTimeStamp(); break; } catch(Throwable e) { LOG.warn("Trying to connect to ResourceManager, " + @@ -334,6 +336,7 @@ protected void registerWithRM() throws YarnRemoteException { return appList; } + @Override public NodeStatus getNodeStatusAndUpdateContainersInContext() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); @@ -407,6 +410,11 @@ public void sendOutofBandHeartBeat() { } } + @Override + public long getClusterTimeStamp() { + return this.clusterTimeStamp; + } + protected void startStatusUpdater() { statusUpdaterRunnable = new Runnable() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 8fc8a3e..d449f9e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -125,6 +126,7 @@ private final ApplicationACLsManager aclsManager; private final DeletionService deletionService; + private AtomicBoolean blockNewContainerRequest = new AtomicBoolean(false); public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -393,6 +395,13 @@ private void authorizeRequest(String containerIDStr, @Override public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException { + + if (blockNewContainerRequest.get()) { + throw RPCUtil + .getRemoteException("Rejecting new containers as" + + " NodeManager is resyncing with the ResourceManager"); + } + ContainerLaunchContext launchContext = request.getContainerLaunchContext(); org.apache.hadoop.yarn.api.records.Container lauchContainer = request.getContainer(); @@ -402,6 +411,15 @@ public StartContainerResponse startContainer(StartContainerRequest request) UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr); authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi); + // Is the container coming from previous RM + if (lauchContainer.getClusterTimeStamp() != nodeStatusUpdater + .getClusterTimeStamp()) { + String msg = "\n Invalid container request " + containerIDStr + + " from previous RM"; + LOG.error(msg); + throw RPCUtil.getRemoteException(msg); + } + LOG.info("Start request for " + containerIDStr + " by user " + launchContext.getUser()); @@ -615,6 +633,10 @@ public void handle(ContainerManagerEvent event) { } } + public void setBlockNewContainerRequest(boolean blockNewContainerRequest) { + this.blockNewContainerRequest.set(blockNewContainerRequest); + } + @Override public void stateChanged(Service service) { // TODO Auto-generated method stub diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index 44328db..a23f125 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -142,6 +142,17 @@ public void testLocalFilesCleanup() throws InterruptedException, super.testLocalFilesCleanup(); } + @Override + public void testContainerLaunchFromPreviousRM() throws InterruptedException, + IOException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerLaunchFromPreviousRM"); + super.testContainerLaunchFromPreviousRM(); + } private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java new file mode 100644 index 0000000..ee87499 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -0,0 +1,371 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; + +import junit.framework.Assert; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestNodeManagerResync { + static final File basedir = + new File("target", TestNodeManagerResync.class.getName()); + static final File tmpDir = new File(basedir, "tmpDir"); + static final File logsDir = new File(basedir, "logs"); + static final File remoteLogsDir = new File(basedir, "remotelogs"); + static final File nmLocalDir = new File(basedir, "nm0"); + static final File processStartFile = new File(tmpDir, "start_file.txt") + .getAbsoluteFile(); + + static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + static final String user = "nobody"; + private FileContext localFS; + private CyclicBarrier syncBarrier = new CyclicBarrier(2); + private boolean skipLaunchContainersThread = false; + + @Before + public void setup() throws UnsupportedFileSystemException { + localFS = FileContext.getLocalFSFileContext(); + tmpDir.mkdirs(); + logsDir.mkdirs(); + remoteLogsDir.mkdirs(); + nmLocalDir.mkdirs(); + } + + @After + public void tearDown() throws IOException, InterruptedException { + localFS.delete(new Path(basedir.getPath()), true); + syncBarrier.reset(); + } + + @SuppressWarnings("unchecked") + @Test + public void testKillContainersOnResync() throws IOException, InterruptedException { + NodeManager nm = new TestNodeManager(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + startContainers(nm); + + assert ((TestNodeManager) nm).getNMRegistrationCount() == 1; + nm.getNMDispatcher().getEventHandler(). + handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + assert ((TestNodeManager) nm).getNMRegistrationCount() == 2; + nm.stop(); + } + + @SuppressWarnings("unchecked") + @Test + public void testBlockNewContainerRequestOnResync() throws IOException, + InterruptedException { + NodeManager nm = new TestNodeManager2(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + // Start containers in running state + startContainers(nm); + + nm.getNMDispatcher().getEventHandler() + .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + //In normal stop, skip launching test thread + skipLaunchContainersThread = true; + nm.stop(); + } + + private void startContainers(NodeManager nm) throws IOException { + ContainerManagerImpl containerManager = nm.getContainerManager(); + File scriptFile = createUnhaltingScriptFile(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + Container mockContainer = mock(Container.class); + // Construct the Container-id + ContainerId cId = createContainerId(); + when(mockContainer.getId()).thenReturn(cId); + + containerLaunchContext.setUser(user); + + URL localResourceUri = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource localResource = + recordFactory.newRecordInstance(LocalResource.class); + localResource.setResource(localResourceUri); + localResource.setSize(-1); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); + localResource.setType(LocalResourceType.FILE); + localResource.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, localResource); + containerLaunchContext.setLocalResources(localResources); + containerLaunchContext.setUser(containerLaunchContext.getUser()); + List commands = new ArrayList(); + commands.add("/bin/bash"); + commands.add(scriptFile.getAbsolutePath()); + containerLaunchContext.setCommands(commands); + Resource resource = BuilderUtils.newResource(1024, 1); + when(mockContainer.getResource()).thenReturn(resource); + StartContainerRequest startRequest = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + startRequest.setContainer(mockContainer); + containerManager.startContainer(startRequest); + + GetContainerStatusRequest request = + recordFactory.newRecordInstance(GetContainerStatusRequest.class); + request.setContainerId(cId); + ContainerStatus containerStatus = + containerManager.getContainerStatus(request).getStatus(); + Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); + } + + private ContainerId createContainerId() { + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(0); + appId.setId(0); + ApplicationAttemptId appAttemptId = + recordFactory.newRecordInstance(ApplicationAttemptId.class); + appAttemptId.setApplicationId(appId); + appAttemptId.setAttemptId(1); + ContainerId containerId = + recordFactory.newRecordInstance(ContainerId.class); + containerId.setApplicationAttemptId(appAttemptId); + return containerId; + } + + private YarnConfiguration createNMConfig() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB + conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346"); + conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); + return conf; + } + + /** + * Creates a script to run a container that will run forever unless + * stopped by external means. + */ + private File createUnhaltingScriptFile() throws IOException { + File scriptFile = new File(tmpDir, "scriptFile.sh"); + BufferedWriter fileWriter = new BufferedWriter(new FileWriter(scriptFile)); + fileWriter.write("#!/bin/bash\n\n"); + fileWriter.write("echo \"Running testscript for delayed kill\"\n"); + fileWriter.write("hello=\"Got SIGTERM\"\n"); + fileWriter.write("umask 0\n"); + fileWriter.write("trap \"echo $hello >> " + processStartFile + "\" SIGTERM\n"); + fileWriter.write("echo \"Writing pid to start file\"\n"); + fileWriter.write("echo $$ >> " + processStartFile + "\n"); + fileWriter.write("while true; do\ndate >> /dev/null;\n done\n"); + + fileWriter.close(); + return scriptFile; + } + + class TestNodeManager extends NodeManager { + + private int registrationCount = 0; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl(context, dispatcher, + healthChecker, metrics); + } + + public int getNMRegistrationCount() { + return registrationCount; + } + + class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void registerWithRM() throws YarnRemoteException { + super.registerWithRM(); + registrationCount++; + } + + @Override + protected void rebootNodeStatusUpdater() { + ConcurrentMap containers = + getNMContext().getContainers(); + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + try { + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } + } + } + } + + class TestNodeManager2 extends NodeManager { + + Thread launchContainersThread = null; + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl(context, dispatcher, + healthChecker, metrics); + } + + @Override + public void cleanupContainers(NodeManagerEventType eventType) { + + // Create a thread keeping launching containers while resyncing + launchContainersThread = new Thread() { + @Override + public void run() { + int numContainers = 0; + int numContainersRejected = 0; + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + while (numContainers++ < 10) { + ContainerId cId = createContainerId(); + Container container = + BuilderUtils.newContainer(cId, null, null, null, null, null, 0); + StartContainerRequest startRequest = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + startRequest.setContainer(container); + System.out.println("no. of containers to be launched: " + + numContainers); + try { + getContainerManager().startContainer(startRequest); + } catch (YarnRemoteException e) { + numContainersRejected++; + Assert.assertTrue(e.getMessage().contains( + "Rejecting new containers as" + + " NodeManager is resyncing with the ResourceManager")); + } + } + // no. of containers to be launched should equal to no. of + // containers rejected + Assert.assertEquals(numContainers - 1, numContainersRejected); + } + }; + if (!skipLaunchContainersThread) { + launchContainersThread.start(); + } + try { + super.cleanupContainers(eventType); + } catch (Exception e) { + } + } + + class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void rebootNodeStatusUpdater() { + ConcurrentMap containers = + getNMContext().getContainers(); + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + + // After this point new containers are free to be launched, except + // containers from previous RM + try { + launchContainersThread.join(); + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } + } + + @Override + public NodeStatus getNodeStatusAndUpdateContainersInContext() { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + return super.getNodeStatusAndUpdateContainersInContext(); + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index 1efe80d..428e9f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -31,9 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CyclicBarrier; import junit.framework.Assert; @@ -56,11 +53,9 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; @@ -81,7 +76,6 @@ .getRecordFactory(null); static final String user = "nobody"; private FileContext localFS; - private CyclicBarrier syncBarrier = new CyclicBarrier(2); @Before public void setup() throws UnsupportedFileSystemException { @@ -135,25 +129,6 @@ public void testKillContainersOnShutdown() throws IOException { Assert.assertTrue("Did not find sigterm message", foundSigTermMessage); reader.close(); } - - @SuppressWarnings("unchecked") - @Test - public void testKillContainersOnResync() throws IOException, InterruptedException { - NodeManager nm = new TestNodeManager(); - YarnConfiguration conf = createNMConfig(); - nm.init(conf); - nm.start(); - startContainers(nm); - - assert ((TestNodeManager) nm).getNMRegistrationCount() == 1; - nm.getNMDispatcher().getEventHandler(). - handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); - try { - syncBarrier.await(); - } catch (BrokenBarrierException e) { - } - assert ((TestNodeManager) nm).getNMRegistrationCount() == 2; - } private void startContainers(NodeManager nm) throws IOException { ContainerManagerImpl containerManager = nm.getContainerManager(); @@ -260,48 +235,4 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } }; } - - class TestNodeManager extends NodeManager { - - private int registrationCount = 0; - - @Override - protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - return new TestNodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics); - } - - public int getNMRegistrationCount() { - return registrationCount; - } - - class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater { - - public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); - } - - @Override - protected void registerWithRM() throws YarnRemoteException { - super.registerWithRM(); - registrationCount++; - } - - @Override - protected void rebootNodeStatusUpdater() { - ConcurrentMap containers = - getNMContext().getContainers(); - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdater(); - try { - syncBarrier.await(); - } catch (InterruptedException e) { - } catch (BrokenBarrierException e) { - } - } - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index d405a7c..abd3262 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -48,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; @@ -61,7 +65,6 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; -import static org.mockito.Mockito.*; public class TestContainerManager extends BaseContainerManagerTest { @@ -500,4 +503,58 @@ public void testLocalFilesCleanup() throws InterruptedException, Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!", targetFile.exists()); } + + @Test + public void testContainerLaunchFromPreviousRM() throws IOException, + InterruptedException { + containerManager.start(); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + ContainerId cId1 = createContainerId(); + ContainerId cId2 = createContainerId(); + containerLaunchContext.setUser(user); + containerLaunchContext + .setLocalResources(new HashMap()); + containerLaunchContext.setUser(containerLaunchContext.getUser()); + Resource mockResource = mock(Resource.class); + + Container mockContainer1 = mock(Container.class); + when(mockContainer1.getId()).thenReturn(cId1); + // Construct the Container with timestamp from previous RM,its 0 by default + when(mockContainer1.getClusterTimeStamp()).thenReturn((long) -1); + StartContainerRequest startRequest1 = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest1.setContainerLaunchContext(containerLaunchContext); + startRequest1.setContainer(mockContainer1); + boolean catchException = false; + try { + containerManager.startContainer(startRequest1); + } catch (YarnRemoteException e) { + catchException = true; + Assert.assertTrue(e.getMessage().contains( + "Invalid container request " + cId1 + " from previous RM")); + } + + // Verify that startContainer fail because of invalid container request + Assert.assertTrue(catchException); + + // Construct the Container with a timestamp within current RM + Container mockContainer2 = mock(Container.class); + when(mockContainer2.getId()).thenReturn(cId2); + when(mockContainer2.getClusterTimeStamp()).thenReturn((long) 0); + when(mockContainer2.getResource()).thenReturn(mockResource); + StartContainerRequest startRequest2 = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest2.setContainerLaunchContext(containerLaunchContext); + startRequest2.setContainer(mockContainer2); + boolean noException = true; + try { + containerManager.startContainer(startRequest2); + } catch (YarnRemoteException e) { + noException = false; + } + // Verify that startContainer get no YarnRemoteException + Assert.assertTrue(noException); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 258c7dc..f0d4c53 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -196,6 +196,7 @@ public RegisterNodeManagerResponse registerNodeManager( + capability + ", assigned nodeId " + nodeId); response.setNodeAction(NodeAction.NORMAL); + response.setClusterTimeStamp(ResourceManager.clusterTimeStamp); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 58dcb73..64f7114 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -1243,7 +1244,7 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - null); + null, ResourceManager.clusterTimeStamp); return container; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 32328be..7bee02a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -169,7 +170,7 @@ public Container createContainer( // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - containerToken); + containerToken, ResourceManager.clusterTimeStamp); return container; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 2dbb498..535cf55 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; @@ -565,7 +566,7 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - containerToken); + containerToken, ResourceManager.clusterTimeStamp); // Allocate! diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index dba5acd..46683dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -188,6 +188,7 @@ synchronized public StartContainerResponse startContainer( this.nodeId, nodeHttpAddress, requestContainer.getResource(), null, null // DKDC - Doesn't matter + , 0 ); ContainerStatus containerStatus = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 6e2b834..abceecf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -69,7 +69,7 @@ public void testReleaseWhileRunning() { Priority priority = BuilderUtils.newPriority(5); Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); + "host:3465", resource, priority, null, 0); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, eventHandler, expirer); @@ -139,7 +139,7 @@ public void testExpireWhileRunning() { Priority priority = BuilderUtils.newPriority(5); Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); + "host:3465", resource, priority, null, 0); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, eventHandler, expirer); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 69e197a..42633fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -360,7 +360,7 @@ public Void run() { Container container = BuilderUtils.newContainer(newTokenId.getContainerID(), null, null, BuilderUtils.newResource(newTokenId.getResource().getMemory(), - newTokenId.getResource().getVirtualCores()), null, null); + newTokenId.getResource().getVirtualCores()), null, null, 0); StartContainerRequest request = Records.newRecord(StartContainerRequest.class); request.setContainerLaunchContext(context); request.setContainer(container); @@ -547,7 +547,7 @@ void callWithIllegalResource(ContainerManager client, createContainerLaunchContextForTest(tokenId); Container container = BuilderUtils.newContainer(tokenId.getContainerID(), null, null, - BuilderUtils.newResource(2048, 1), null, null); + BuilderUtils.newResource(2048, 1), null, null, 0); request.setContainerLaunchContext(context); request.setContainer(container); try { @@ -575,7 +575,7 @@ void callWithIllegalUserName(ContainerManager client, Container container = BuilderUtils.newContainer(tokenId.getContainerID(), null, null, BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId - .getResource().getVirtualCores()), null, null); + .getResource().getVirtualCores()), null, null, 0); request.setContainerLaunchContext(context); request.setContainer(container); try {