diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java index 323d31d..ca9034b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java @@ -34,5 +34,8 @@ RUNNING, /** Completed container */ - COMPLETE + COMPLETE, + + /** Queued at the NM */ + QUEUED } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java index 5f52f85..69b84bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -43,6 +43,14 @@ public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) { utilization.setCPU(cpu); return utilization; } + + @Public + @Unstable + public static ResourceUtilization newInstance( + ResourceUtilization resourceUtil) { + return newInstance(resourceUtil.getPhysicalMemory(), + resourceUtil.getVirtualMemory(), resourceUtil.getCPU()); + } /** * Get used virtual memory. @@ -147,4 +155,18 @@ public void addTo(int pmem, int vmem, float cpu) { this.setVirtualMemory(this.getVirtualMemory() + vmem); this.setCPU(this.getCPU() + cpu); } + + /** + * Subtract utilization from the current one. + * @param pmem Physical memory to be subtracted. + * @param vmem Virtual memory to be subtracted. + * @param cpu CPU utilization to be subtracted. + */ + @Public + @Unstable + public void subtractFrom(int pmem, int vmem, float cpu) { + this.setPhysicalMemory(this.getPhysicalMemory() - pmem); + this.setVirtualMemory(this.getVirtualMemory() - vmem); + this.setCPU(this.getCPU() - cpu); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 4fdd43c..ed1f87e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -215,6 +216,13 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, public static ContainerStatus newContainerStatus(ContainerId containerId, ContainerState containerState, String diagnostics, int exitStatus, Resource capability) { + return newContainerStatus(containerId, containerState, diagnostics, + exitStatus, capability, ExecutionType.GUARANTEED); + } + + public static ContainerStatus newContainerStatus(ContainerId containerId, + ContainerState containerState, String diagnostics, int exitStatus, + Resource capability, ExecutionType executionType) { ContainerStatus containerStatus = recordFactory .newRecordInstance(ContainerStatus.class); containerStatus.setState(containerState); @@ -222,6 +230,7 @@ public static ContainerStatus newContainerStatus(ContainerId containerId, containerStatus.setDiagnostics(diagnostics); containerStatus.setExitStatus(exitStatus); containerStatus.setCapability(capability); + containerStatus.setExecutionType(executionType); return containerStatus; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index e0a4da4..cb4c3e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -65,6 +66,10 @@ ConcurrentMap getIncreasedContainers(); + + ConcurrentMap getQueuedContainers(); + + ConcurrentMap getKilledQueuedContainers(); NMContainerTokenSecretManager getContainerTokenSecretManager(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index ef7b760..574e9e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -19,8 +19,11 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; @@ -453,6 +457,15 @@ public void run() { protected final ConcurrentMap increasedContainers = new ConcurrentHashMap<>(); + + // TODO KONSTANTINOS: Should I initialize queuedContainers and + // killedQueuedContainers only if distributed scheduling is enabled + // or is it fine like this? + protected final ConcurrentMap queuedContainers = + new ConcurrentSkipListMap<>(); + + protected final ConcurrentMap killedQueuedContainers = + new ConcurrentSkipListMap(); private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -516,6 +529,16 @@ public int getHttpPort() { getIncreasedContainers() { return this.increasedContainers; } + + @Override + public ConcurrentMap getQueuedContainers() { + return this.queuedContainers; + } + + @Override + public ConcurrentMap getKilledQueuedContainers() { + return this.killedQueuedContainers; + } @Override public NMContainerTokenSecretManager getContainerTokenSecretManager() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEvent.java new file mode 100644 index 0000000..491c164 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEvent.java @@ -0,0 +1,70 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; + +public class ContainerExecutionEvent + extends AbstractEvent { + + private final StartContainerRequest startRequest; + private final ContainerTokenIdentifier containerTokenIdentifier; + private final ContainerId containerId; + private final NMTokenIdentifier nmTokenIdentifier; + + public ContainerExecutionEvent(StartContainerRequest startRequest, + ContainerTokenIdentifier containerTokenIdentifier, + NMTokenIdentifier nmTokenIdentifier, + ContainerExecutionEventType eventType) { + super(eventType); + this.startRequest = startRequest; + this.containerTokenIdentifier = containerTokenIdentifier; + this.containerId = containerTokenIdentifier.getContainerID(); + this.nmTokenIdentifier = nmTokenIdentifier; + } + + public ContainerExecutionEvent(ContainerId containerId, + ContainerExecutionEventType eventType) { + super(eventType); + this.startRequest = null; + this.containerTokenIdentifier = null; + this.containerId = containerId; + this.nmTokenIdentifier = null; + } + + public StartContainerRequest getStartRequest() { + return this.startRequest; + } + + public ContainerTokenIdentifier getContainerTokenIdentifier() { + return this.containerTokenIdentifier; + } + + public ContainerId getContainerId() { + return this.containerId; + } + + NMTokenIdentifier getNMTokenIdentifier() { + return this.nmTokenIdentifier; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEventType.java new file mode 100644 index 0000000..46b90ad --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEventType.java @@ -0,0 +1,24 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +public enum ContainerExecutionEventType { + CONTAINER_EXECUTION_START, + CONTAINER_EXECUTION_STOP +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 7d51477..c007b76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -133,6 +134,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ChangeMonitoringContainerResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerQueuingEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerQueuingEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; @@ -257,6 +260,14 @@ public void serviceInit(Configuration conf) throws Exception { this.amrmProxyService = new AMRMProxyService(this.context, this.dispatcher); addService(this.amrmProxyService); + + // Should we add to the dispatcher when amrmProxy is enabled or + // explicitly when distributed scheduling is enabled? + dispatcher.register(ContainerExecutionEventType.class, + new ContainerExecutionEventDispatcher()); + dispatcher.register(ContainerQueuingEventType.class, + ((ContainersMonitorImpl) this.containersMonitor) + .createContainerQueuingEventDispatcher()); } else { LOG.info("AMRMProxyService is disabled"); } @@ -774,6 +785,7 @@ protected void authorizeStartAndResourceIncreaseRequest( /** * Start a list of containers on this NodeManager. */ + @SuppressWarnings("unchecked") @Override public StartContainersResponse startContainers( StartContainersRequest requests) throws YarnException, IOException { @@ -815,8 +827,21 @@ public StartContainersResponse startContainers( this.amrmProxyService.processApplicationStartRequest(request); } - startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, - request); + if (this.context.isDistributedSchedulingEnabled()) { + this.context.getQueuedContainers().put(containerId, + containerTokenIdentifier); + + this.dispatcher.getEventHandler() + .handle(new ContainerQueuingEvent(nmTokenIdentifier, + containerTokenIdentifier, + containerTokenIdentifier.getExecutionType(), request, + ContainerQueuingEventType.CONTAINER_REQUEST_ARRIVED)); + } else { + startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, + request); + } + // TODO KONSTANTINOS: Should containers be in the succeededContainers already + // in case distributed scheduling is enabled? succeededContainers.add(containerId); } catch (YarnException e) { failedContainers.put(containerId, SerializedException.newInstance(e)); @@ -894,6 +919,7 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, authorizeStartAndResourceIncreaseRequest( nmTokenIdentifier, containerTokenIdentifier, true); // update NMToken + // TODO KONSTANTINOS: Should this updateNMToken happen even for opportunistic containers? updateNMTokenIdentifier(nmTokenIdentifier); ContainerId containerId = containerTokenIdentifier.getContainerID(); @@ -901,6 +927,8 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, String user = containerTokenIdentifier.getApplicationSubmitter(); LOG.info("Start request for " + containerIdStr + " by user " + user); + + this.context.getQueuedContainers().remove(containerId); ContainerLaunchContext launchContext = request.getContainerLaunchContext(); @@ -975,6 +1003,22 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, this.readLock.unlock(); } } + + @SuppressWarnings("unchecked") + private void containerFailedToStart(ContainerId containerId, + ContainerTokenIdentifier containerTokenId) { + this.context.getQueuedContainers().remove(containerId); + + this.dispatcher.getEventHandler() + .handle(new ContainerQueuingEvent(null, containerTokenId, + containerTokenId.getExecutionType(), null, + ContainerQueuingEventType.CONTAINER_FAILED_TO_START)); + + // TODO KONSTANTINOS: Fix NodeStatusUpdater to take into account killed + // queued containers. + this.context.getKilledQueuedContainers().put(containerTokenId, + "Container removed from queue as it failed to start."); + } protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier( org.apache.hadoop.yarn.api.records.Token token, @@ -1058,6 +1102,8 @@ private void changeContainerResourceInternal( Container container = context.getContainers().get(containerId); // Check container existence if (container == null) { + // TODO KONSTANTINOS: If you can changeContainerResourceInternal for + // a container that is still queued, we need to account for this case. if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) { throw RPCUtil.getRemoteException("Container " + containerId.toString() + " was recently stopped on node manager."); @@ -1178,8 +1224,24 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, authorizeGetAndStopContainerRequest(containerID, container, true, nmTokenIdentifier); + // If container is null and distributed scheduling is enabled, container + // might be queued. Otherwise, container might not be handled by this NM. if (container == null) { - if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { + if (this.context.isDistributedSchedulingEnabled() + && this.context.getQueuedContainers().containsKey(containerID)) { + ContainerTokenIdentifier containerTokenId = this.context + .getQueuedContainers().remove(containerID); + + this.dispatcher.getEventHandler() + .handle(new ContainerQueuingEvent(null, containerTokenId, + containerTokenId.getExecutionType(), null, + ContainerQueuingEventType.QUEUED_CONTAINER_REMOVED)); + + this.context.getKilledQueuedContainers().put(containerTokenId, + "Queued container request removed by ApplicationMaster."); + + nodeStatusUpdater.sendOutofBandHeartBeat(); + } else if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { throw RPCUtil.getRemoteException("Container " + containerIDStr + " is not handled by this NodeManager"); } @@ -1199,6 +1261,18 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, nodeStatusUpdater.sendOutofBandHeartBeat(); } } + + private void stopContainerInternalIfNotQueued(ContainerId containerID) + throws YarnException, IOException { + if (this.context.getContainers().containsKey(containerID)) { + UserGroupInformation remoteUgi = getRemoteUgi(); + NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi); + if (identifier == null) { + throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG); + } + stopContainerInternal(identifier, containerID); + } + } /** * Get a list of container statuses running on this NodeManager @@ -1237,7 +1311,16 @@ private ContainerStatus getContainerStatusInternal(ContainerId containerID, nmTokenIdentifier); if (container == null) { - if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { + if (this.context.getQueuedContainers().containsKey(containerID)) { + // TODO KONSTANTINOS: Is the Resource.newInstance(0, 0) proper here or + // we need the actual value? + ExecutionType executionType = this.context.getQueuedContainers() + .get(containerID).getExecutionType(); + return BuilderUtils.newContainerStatus(containerID, + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "", + ContainerExitStatus.INVALID, Resource.newInstance(0, 0), + executionType); + } else if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { throw RPCUtil.getRemoteException("Container " + containerIDStr + " was recently stopped on node manager."); } else { @@ -1315,6 +1398,42 @@ public void handle(ApplicationEvent event) { } } } + + class ContainerExecutionEventDispatcher + implements EventHandler { + @Override + public void handle(ContainerExecutionEvent containerExecutionEvent) { + StartContainerRequest containerReq = containerExecutionEvent + .getStartRequest(); + ContainerId containerId = containerExecutionEvent + .getContainerTokenIdentifier().getContainerID(); + + switch (containerExecutionEvent.getType()) { + case CONTAINER_EXECUTION_START: + try { + startContainerInternal(containerExecutionEvent.getNMTokenIdentifier(), + containerExecutionEvent.getContainerTokenIdentifier(), + containerReq); + } catch (YarnException | IOException e) { + containerFailedToStart(containerId, + containerExecutionEvent.getContainerTokenIdentifier()); + LOG.error("Container failed to start.", e); + } + break; + case CONTAINER_EXECUTION_STOP: + try { + stopContainerInternalIfNotQueued(containerId); + } catch (YarnException | IOException e) { + LOG.error("Container did not get removed successfully.", e); + } + break; + default: + throw new YarnRuntimeException( + "Got an unknown ContainerExecutionEvent type: " + + containerExecutionEvent.getType()); + } + } + } @SuppressWarnings("unchecked") @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index fb1728a..51dd3b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -432,7 +432,8 @@ public ContainerStatus cloneAndGetContainerStatus() { this.readLock.lock(); try { return BuilderUtils.newContainerStatus(this.containerId, - getCurrentState(), diagnostics.toString(), exitCode, getResource()); + getCurrentState(), diagnostics.toString(), exitCode, getResource(), + this.containerTokenIdentifier.getExecutionType()); } finally { this.readLock.unlock(); } @@ -486,7 +487,8 @@ private void sendFinishedEvents() { EventHandler eventHandler = dispatcher.getEventHandler(); eventHandler.handle(new ApplicationContainerFinishedEvent(containerId)); // Remove the container from the resource-monitor - eventHandler.handle(new ContainerStopMonitoringEvent(containerId)); + eventHandler.handle(new ContainerStopMonitoringEvent(containerId, + containerTokenIdentifier.getExecutionType())); // Tell the logService too eventHandler.handle(new LogHandlerContainerFinishedEvent( containerId, exitCode)); @@ -520,10 +522,10 @@ private void sendContainerMonitorStartEvent() { int cpuVcores = getResource().getVirtualCores(); long localizationDuration = containerLaunchStartTime - containerLocalizationStartTime; - dispatcher.getEventHandler().handle( - new ContainerStartMonitoringEvent(containerId, - vmemBytes, pmemBytes, cpuVcores, launchDuration, - localizationDuration)); + dispatcher.getEventHandler() + .handle(new ContainerStartMonitoringEvent(containerId, + containerTokenIdentifier.getExecutionType(), vmemBytes, pmemBytes, + cpuVcores, launchDuration, localizationDuration)); } private void addDiagnostics(String... diags) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEvent.java new file mode 100644 index 0000000..ed2138b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEvent.java @@ -0,0 +1,69 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; + +public class ContainerQueuingEvent + extends AbstractEvent { + + private final NMTokenIdentifier nmTokenIdentifier; + private final ContainerTokenIdentifier containerTokenIdentifier; + private final ContainerId containerId; + private final ExecutionType executionType; + private final StartContainerRequest startRequest; + + public ContainerQueuingEvent(NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + ExecutionType executionType, StartContainerRequest startRequest, + ContainerQueuingEventType eventType) { + super(eventType); + this.nmTokenIdentifier = nmTokenIdentifier; + this.containerTokenIdentifier = containerTokenIdentifier; + this.containerId = containerTokenIdentifier.getContainerID(); + this.executionType = executionType; + this.startRequest = startRequest; + } + + NMTokenIdentifier getNMTokenIdentifier() { + return nmTokenIdentifier; + } + + public ContainerTokenIdentifier getContainerTokenIdentifier() { + return containerTokenIdentifier; + } + + public ContainerId getContainerId() { + return containerId; + } + + public ExecutionType getExecutionType() { + return executionType; + } + + public StartContainerRequest getStartRequest() { + return startRequest; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEventType.java new file mode 100644 index 0000000..ca26cc6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEventType.java @@ -0,0 +1,25 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; + +public enum ContainerQueuingEventType { + CONTAINER_REQUEST_ARRIVED, + CONTAINER_FAILED_TO_START, + QUEUED_CONTAINER_REMOVED +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java index c09bebf..27de052 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; public class ContainerStartMonitoringEvent extends ContainersMonitorEvent { @@ -28,10 +29,10 @@ private final long launchDuration; private final long localizationDuration; - public ContainerStartMonitoringEvent(ContainerId containerId, + public ContainerStartMonitoringEvent(ContainerId containerId, ExecutionType executionType, long vmemLimit, long pmemLimit, int cpuVcores, long launchDuration, long localizationDuration) { - super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER); + super(containerId, executionType, ContainersMonitorEventType.START_MONITORING_CONTAINER); this.vmemLimit = vmemLimit; this.pmemLimit = pmemLimit; this.cpuVcores = cpuVcores; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java index 240c5c0..d883281 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; public class ContainerStopMonitoringEvent extends ContainersMonitorEvent { - public ContainerStopMonitoringEvent(ContainerId containerId) { - super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER); + public ContainerStopMonitoringEvent(ContainerId containerId, + ExecutionType executionType) { + super(containerId, executionType, + ContainersMonitorEventType.STOP_MONITORING_CONTAINER); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java index 56e578b..f3aa721 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java @@ -19,21 +19,33 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.event.AbstractEvent; public class ContainersMonitorEvent extends AbstractEvent { private final ContainerId containerId; + private final ExecutionType executionType; public ContainersMonitorEvent(ContainerId containerId, ContainersMonitorEventType eventType) { + this(containerId, ExecutionType.GUARANTEED, eventType); + } + + public ContainersMonitorEvent(ContainerId containerId, + ExecutionType executionType, ContainersMonitorEventType eventType) { super(eventType); this.containerId = containerId; + this.executionType = executionType; } public ContainerId getContainerId() { return this.containerId; } + + public ExecutionType getExecutionType() { + return this.executionType; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index e6c3642..c0de0ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -18,11 +18,16 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -30,20 +35,25 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; -import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerExecutionEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerExecutionEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; -import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; public class ContainersMonitorImpl extends AbstractService implements @@ -61,6 +71,14 @@ @VisibleForTesting final Map trackingContainers = new ConcurrentHashMap<>(); + + private Map logicalGuarContainers; + private Map logicalOpportContainers; + private ResourceUtilization logicalContainersUtilization; + + private Queue queuedGuarRequests; + private Queue queuedOpportRequests; + private Set opportContainersToKill; private final ContainerExecutor containerExecutor; private final Dispatcher eventDispatcher; @@ -99,6 +117,17 @@ public ContainersMonitorImpl(ContainerExecutor exec, this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); this.queuedContainersStatus = QueuedContainersStatus.newInstance(); + + if (this.context.isDistributedSchedulingEnabled()) { + this.logicalGuarContainers = new ConcurrentHashMap<>(); + this.logicalOpportContainers = new ConcurrentHashMap<>(); + this.logicalContainersUtilization = ResourceUtilization.newInstance(0, 0, + 0.0f); + this.queuedGuarRequests = new LinkedList<>(); + this.queuedOpportRequests = new LinkedList<>(); + this.opportContainersToKill = Collections + .synchronizedSet(new HashSet()); + } } @Override @@ -313,6 +342,293 @@ public synchronized void setResourceLimit( this.cpuVcores = cpuVcores; } } + + private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId, + Resource resource) { + long pmemBytes = resource.getMemory() * 1024 * 1024L; + float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + long vmemBytes = (long) (pmemRatio * pmemBytes); + int cpuVcores = resource.getVirtualCores(); + + return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes, + cpuVcores); + } + + /** + * @return true if there are available logical resources for the given + * container to start. + */ + private boolean hasLogicalResourcesAvailable(ProcessTreeInfo pti) { + // TODO: Would it be better to copy the logicalContainersUtiliation locally, + // instead of holding the lock for the whole check? + synchronized (this.logicalContainersUtilization) { + // Check physical memory. + if (this.logicalContainersUtilization.getPhysicalMemory() + (int) (pti + .getPmemLimit() >> 20) > (int) (getPmemAllocatedForContainers() >> 20)) { + return false; + } + // Check virtual memory. + if (this.logicalContainersUtilization.getVirtualMemory() + (int) (pti + .getVmemLimit() >> 20) > (int) (getVmemAllocatedForContainers() >> 20)) { + return false; + } + // Check CPU. + if (this.logicalContainersUtilization.getCPU() + + logicalCpuUsage(pti) > 1.0f) { + return false; + } + } + return true; + } + + private void increaseResourceUtil(ResourceUtilization resourceUtil, + ProcessTreeInfo pti) { + resourceUtil.addTo((int) (pti.getPmemLimit() >> 20), + (int) (pti.getVmemLimit() >> 20), logicalCpuUsage(pti)); + } + + private void decreaseResourceUtil(ResourceUtilization resourceUtil, + ProcessTreeInfo pti) { + resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20), + (int) (pti.getVmemLimit() >> 20), logicalCpuUsage(pti)); + } + + private float logicalCpuUsage(ProcessTreeInfo pti) { + float cpuUsagePercentPerCore = pti.getCpuVcores() * 100.0f; + float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore + / resourceCalculatorPlugin.getNumProcessors(); + return (cpuUsageTotalCoresPercentage * 1000 * maxVCoresAllottedForContainers + / nodeCpuPercentageForYARN) / 1000.0f; + } + + private class LogicalContainerInfo { + private ContainerQueuingEvent contQueuingEvent; + private ProcessTreeInfo pti; + + LogicalContainerInfo(ContainerQueuingEvent contQueuingEvent, + ProcessTreeInfo pti) { + this.contQueuingEvent = contQueuingEvent; + this.pti = pti; + } + + ContainerQueuingEvent getContQueuingEvent() { + return this.contQueuingEvent; + } + + ProcessTreeInfo getPti() { + return this.pti; + } + } + + /** + * Start the execution of the given container. Add it to the logical + * containers, update logical resource utilization and send event to + * ContainerManager to start the actual execution of the container. + */ + @SuppressWarnings("unchecked") + private void startLogicalContainer(LogicalContainerInfo logicalContInfo) { + ContainerQueuingEvent contQueuingEvent = logicalContInfo.getContQueuingEvent(); + ProcessTreeInfo pti = logicalContInfo.getPti(); + + if (contQueuingEvent.getExecutionType() == ExecutionType.GUARANTEED) { + logicalGuarContainers.put(pti.getContainerId(), pti); + } else { + logicalOpportContainers.put(pti.getContainerId(), pti); + } + + increaseResourceUtil(this.logicalContainersUtilization, pti); + + // Send event to ContainerManager to start execution of container. + eventDispatcher.getEventHandler() + .handle(new ContainerExecutionEvent(contQueuingEvent.getStartRequest(), + contQueuingEvent.getContainerTokenIdentifier(), + contQueuingEvent.getNMTokenIdentifier(), + ContainerExecutionEventType.CONTAINER_EXECUTION_START)); + } + + /** + * If there are available resources, try to start as many pending containers + * as possible. + */ + private void startPendingContainers() { + boolean resourcesAvailable = true; + // Start pending guaranteed containers, if resources available. + synchronized (queuedGuarRequests) { + Iterator guarIter = queuedGuarRequests.iterator(); + while (guarIter.hasNext() && resourcesAvailable) { + LogicalContainerInfo logicalContInfo = guarIter.next(); + + if (hasLogicalResourcesAvailable(logicalContInfo.getPti())) { + startLogicalContainer(logicalContInfo); + guarIter.remove(); + } else { + resourcesAvailable = false; + } + } + } + if (!resourcesAvailable) { + return; + } + // Start opportunistic container, if resources available. + synchronized (queuedOpportRequests) { + Iterator opportIter = queuedOpportRequests.iterator(); + while (opportIter.hasNext() && resourcesAvailable) { + LogicalContainerInfo logicalContInfo = opportIter.next(); + + if (hasLogicalResourcesAvailable(logicalContInfo.getPti())) { + startLogicalContainer(logicalContInfo); + opportIter.remove(); + } else { + resourcesAvailable = false; + } + } + } + } + + /** + * Remove the given container from the logical containers, and update logical + * container utilization accordingly. + */ + private void removeLogicalContainer(ContainerId containerId, + ExecutionType executionType) { + ProcessTreeInfo ptiToRemove = null; + if (executionType == ExecutionType.GUARANTEED) { + ptiToRemove = logicalGuarContainers.remove(containerId); + } else { + ptiToRemove = logicalOpportContainers.remove(containerId); + } + // If container was indeed running, update logical resource utilization. + if (ptiToRemove != null) { + decreaseResourceUtil(this.logicalContainersUtilization, ptiToRemove); + } + } + + /** + * Remove the given container from the container queues. + * + * @return true if the container was found in one of the queues. + */ + private boolean removeContainerFromQueues(ContainerId containerId, + ExecutionType executionType) { + boolean foundInQueue = false; + if (executionType == ExecutionType.GUARANTEED) { + synchronized (queuedGuarRequests) { + Iterator guarIter = queuedGuarRequests.iterator(); + while (guarIter.hasNext() && !foundInQueue) { + if (guarIter.next().getPti().getContainerId().equals(containerId)) { + guarIter.remove(); + foundInQueue = true; + } + } + } + } else { + synchronized (queuedOpportRequests) { + Iterator opportIter = queuedOpportRequests + .iterator(); + while (opportIter.hasNext() && !foundInQueue) { + if (opportIter.next().getPti().getContainerId().equals(containerId)) { + opportIter.remove(); + foundInQueue = true; + } + } + } + } + + return foundInQueue; + } + + /** + * Kill opportunistic containers to free up resources for running the given + * container. + * + * @param logicalContInfo + * the container whose execution needs to start by freeing up + * resources occupied by opportunistic containers. + */ + @SuppressWarnings("unchecked") + private void killOpportContainers(LogicalContainerInfo logicalContInfo) { + ContainerId containerToStartId = logicalContInfo.getContQueuingEvent() + .getContainerId(); + // Track resources that need to be freed. + ResourceUtilization resourcesToFreeUp = resourcesToFreeUp( + containerToStartId); + + // Go over the running opportunistic containers. Avoid containers that have + // already been marked for killing. + for (Entry runningOpportCont : logicalOpportContainers + .entrySet()) { + ContainerId runningOpportContId = runningOpportCont.getKey(); + + // If there are sufficient resources to execute the given container, do + // not kill more opportunistic containers. + if (resourcesToFreeUp.getPhysicalMemory() <= 0 + && resourcesToFreeUp.getVirtualMemory() <= 0 + && resourcesToFreeUp.getCPU() <= 0.f) { + break; + } + + if (!opportContainersToKill.contains(runningOpportContId)) { + opportContainersToKill.add(runningOpportContId); + decreaseResourceUtil(resourcesToFreeUp, runningOpportCont.getValue()); + // Send event to ContainerManager for actually killing the container. + eventDispatcher.getEventHandler() + .handle(new ContainerExecutionEvent(runningOpportContId, + ContainerExecutionEventType.CONTAINER_EXECUTION_STOP)); + } + } + } + + /** + * Calculates the amount of resources that need to be free up (by killing + * opportunistic containers) in order for the given guaranteed container to + * start its execution. + * Resource utilization to be freed up = logicalContainersUtilization - + * utilization of opportContainersToKill + utilization of pending guaranteed + * containers that will start before the given container + utilization of + * given container - total resources of node. + * + * @param containerToStartId + * the ContainerId of the guaranteed container for which we need to + * free resources, so that its execution can start. + * @return the resources that need to be freed up for the given guaranteed + * container to start. + */ + private ResourceUtilization resourcesToFreeUp( + ContainerId containerToStartId) { + // Get current utilization of logical containers. + ResourceUtilization resourceUtilToFreeUp = ResourceUtilization + .newInstance(logicalContainersUtilization); + + // Subtract from the utilization the utilization of the opportunistic + // containers that are marked for killing. + synchronized (opportContainersToKill) { + for (ContainerId opportContId : opportContainersToKill) { + if (this.logicalOpportContainers.containsKey(opportContId)) { + decreaseResourceUtil(resourceUtilToFreeUp, + this.logicalOpportContainers.get(opportContId)); + } + } + } + // Add to the utilization the utilization of the pending guaranteed + // containers that + // will start before the current container will be started. + synchronized (queuedGuarRequests) { + for (LogicalContainerInfo guarContInfo : queuedGuarRequests) { + increaseResourceUtil(resourceUtilToFreeUp, guarContInfo.getPti()); + if (guarContInfo.getContQueuingEvent().getContainerId() + .equals(containerToStartId)) { + break; + } + } + } + // Subtract the overall node resources. + resourceUtilToFreeUp.subtractFrom( + (int) (getPmemAllocatedForContainers() >> 20), + (int) (getVmemAllocatedForContainers() >> 20), 1.0f); + + return resourceUtilToFreeUp; + } /** * Check whether a container's process tree's current memory usage is over @@ -408,6 +724,7 @@ public void run() { long pmemByAllContainers = 0; long cpuUsagePercentPerCoreByAllContainers = 0; long cpuUsageTotalCoresByAllContainers = 0; + for (Entry entry : trackingContainers .entrySet()) { ContainerId containerId = entry.getKey(); @@ -739,11 +1056,17 @@ public void handle(ContainersMonitorEvent monitoringEvent) { LOG.info("Stopping resource-monitoring for " + containerId); updateContainerMetrics(monitoringEvent); trackingContainers.remove(containerId); + // Remove finished container from the logical containers. + // Attempt to start new containers, if resources available. + removeLogicalContainer(containerId, monitoringEvent.getExecutionType()); + opportContainersToKill.remove(containerId); + startPendingContainers(); break; case CHANGE_MONITORING_CONTAINER_RESOURCE: ChangeMonitoringContainerResourceEvent changeEvent = (ChangeMonitoringContainerResourceEvent) monitoringEvent; ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId); + // TODO KONSTANTINOS: Update available resources. if (processTreeInfo == null) { LOG.warn("Failed to track container " + containerId.toString() @@ -762,4 +1085,71 @@ public void handle(ContainersMonitorEvent monitoringEvent) { // TODO: Wrong event. } } + + public ContainerQueuingEventDispatcher createContainerQueuingEventDispatcher() { + return new ContainerQueuingEventDispatcher(); + } + + public class ContainerQueuingEventDispatcher implements + EventHandler { + @SuppressWarnings("unchecked") + @Override + public void handle(ContainerQueuingEvent containerQueuingEvent) { + if (!isEnabled()) { + return; + } + + switch (containerQueuingEvent.getType()) { + case CONTAINER_REQUEST_ARRIVED: + ProcessTreeInfo pti = createProcessTreeInfo( + containerQueuingEvent.getContainerId(), + containerQueuingEvent.getContainerTokenIdentifier().getResource()); + + LogicalContainerInfo logicalContInfo = new LogicalContainerInfo( + containerQueuingEvent, pti); + + // If there are already free resources to start the container. + if (hasLogicalResourcesAvailable(pti)) { + startLogicalContainer(logicalContInfo); + } else { + if (containerQueuingEvent + .getExecutionType() == ExecutionType.GUARANTEED) { + synchronized (queuedGuarRequests) { + queuedGuarRequests.add(logicalContInfo); + } + // Kill running opportunistic containers to make space for + // guaranteed container. + killOpportContainers(logicalContInfo); + } else { + synchronized (queuedOpportRequests) { + queuedOpportRequests.add(logicalContInfo); + } + } + } + break; + case QUEUED_CONTAINER_REMOVED: + boolean foundInQueue = removeContainerFromQueues( + containerQueuingEvent.getContainerId(), + containerQueuingEvent.getExecutionType()); + + // The container started in the meanwhile, so the ContainerManager needs + // to be notified and stop it. + if (!foundInQueue) { + eventDispatcher.getEventHandler() + .handle(new ContainerExecutionEvent( + containerQueuingEvent.getContainerTokenIdentifier() + .getContainerID(), + ContainerExecutionEventType.CONTAINER_EXECUTION_STOP)); + } + break; + case CONTAINER_FAILED_TO_START: + removeLogicalContainer(containerQueuingEvent.getContainerId(), + containerQueuingEvent.getExecutionType()); + break; + default: + // TODO: Wrong event + } + } + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index e1ffd88..f0be1c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -619,6 +620,16 @@ public int getHttpPort() { public ConcurrentMap getContainers() { return null; } + + @Override + public ConcurrentMap getQueuedContainers() { + return null; + } + + @Override + public ConcurrentMap getKilledQueuedContainers() { + return null; + } @Override public ConcurrentMap getIncreasedContainers() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java index d7f89fc..c4247ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -153,7 +154,7 @@ public void testContainersResourceChange() throws Exception { containersMonitor.start(); // create container 1 containersMonitor.handle(new ContainerStartMonitoringEvent( - getContainerId(1), 2100L, 1000L, 1, 0, 0)); + getContainerId(1), ExecutionType.GUARANTEED, 2100L, 1000L, 1, 0, 0)); // verify that this container is properly tracked assertNotNull(getProcessTreeInfo(getContainerId(1))); assertEquals(1000L, getProcessTreeInfo(getContainerId(1)) @@ -173,8 +174,9 @@ public void testContainersResourceChange() throws Exception { assertTrue(containerEventHandler .isContainerKilled(getContainerId(1))); // create container 2 - containersMonitor.handle(new ContainerStartMonitoringEvent( - getContainerId(2), 2202009L, 1048576L, 1, 0, 0)); + containersMonitor + .handle(new ContainerStartMonitoringEvent(getContainerId(2), + ExecutionType.GUARANTEED, 2202009L, 1048576L, 1, 0, 0)); // verify that this container is properly tracked assertNotNull(getProcessTreeInfo(getContainerId(2))); assertEquals(1048576L, getProcessTreeInfo(getContainerId(2)) @@ -215,8 +217,9 @@ public void testContainersResourceChangeIsTriggeredImmediately() // now waiting for the next monitor cycle Thread.sleep(1000); // create a container with id 3 - containersMonitor.handle(new ContainerStartMonitoringEvent( - getContainerId(3), 2202009L, 1048576L, 1, 0, 0)); + containersMonitor + .handle(new ContainerStartMonitoringEvent(getContainerId(3), + ExecutionType.GUARANTEED, 2202009L, 1048576L, 1, 0, 0)); // Verify that this container has been tracked assertNotNull(getProcessTreeInfo(getContainerId(3))); // trigger a change resource event, check limit after change