diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 323d31d..ca9034b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -34,5 +34,8 @@
RUNNING,
/** Completed container */
- COMPLETE
+ COMPLETE,
+
+ /** Queued at the NM */
+ QUEUED
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
index 5f52f85..69b84bb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
@@ -43,6 +43,14 @@ public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) {
utilization.setCPU(cpu);
return utilization;
}
+
+ @Public
+ @Unstable
+ public static ResourceUtilization newInstance(
+ ResourceUtilization resourceUtil) {
+ return newInstance(resourceUtil.getPhysicalMemory(),
+ resourceUtil.getVirtualMemory(), resourceUtil.getCPU());
+ }
/**
* Get used virtual memory.
@@ -147,4 +155,18 @@ public void addTo(int pmem, int vmem, float cpu) {
this.setVirtualMemory(this.getVirtualMemory() + vmem);
this.setCPU(this.getCPU() + cpu);
}
+
+ /**
+ * Subtract utilization from the current one.
+ * @param pmem Physical memory to be subtracted.
+ * @param vmem Virtual memory to be subtracted.
+ * @param cpu CPU utilization to be subtracted.
+ */
+ @Public
+ @Unstable
+ public void subtractFrom(int pmem, int vmem, float cpu) {
+ this.setPhysicalMemory(this.getPhysicalMemory() - pmem);
+ this.setVirtualMemory(this.getVirtualMemory() - vmem);
+ this.setCPU(this.getCPU() - cpu);
+ }
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 4fdd43c..ed1f87e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -215,6 +216,13 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
public static ContainerStatus newContainerStatus(ContainerId containerId,
ContainerState containerState, String diagnostics, int exitStatus,
Resource capability) {
+ return newContainerStatus(containerId, containerState, diagnostics,
+ exitStatus, capability, ExecutionType.GUARANTEED);
+ }
+
+ public static ContainerStatus newContainerStatus(ContainerId containerId,
+ ContainerState containerState, String diagnostics, int exitStatus,
+ Resource capability, ExecutionType executionType) {
ContainerStatus containerStatus = recordFactory
.newRecordInstance(ContainerStatus.class);
containerStatus.setState(containerState);
@@ -222,6 +230,7 @@ public static ContainerStatus newContainerStatus(ContainerId containerId,
containerStatus.setDiagnostics(diagnostics);
containerStatus.setExitStatus(exitStatus);
containerStatus.setCapability(capability);
+ containerStatus.setExecutionType(executionType);
return containerStatus;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index e0a4da4..cb4c3e8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -65,6 +66,10 @@
ConcurrentMap
getIncreasedContainers();
+
+ ConcurrentMap getQueuedContainers();
+
+ ConcurrentMap getKilledQueuedContainers();
NMContainerTokenSecretManager getContainerTokenSecretManager();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index ef7b760..574e9e8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -19,8 +19,11 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
@@ -57,6 +60,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@@ -453,6 +457,15 @@ public void run() {
protected final ConcurrentMap increasedContainers =
new ConcurrentHashMap<>();
+
+ // TODO KONSTANTINOS: Should I initialize queuedContainers and
+ // killedQueuedContainers only if distributed scheduling is enabled
+ // or is it fine like this?
+ protected final ConcurrentMap queuedContainers =
+ new ConcurrentSkipListMap<>();
+
+ protected final ConcurrentMap killedQueuedContainers =
+ new ConcurrentSkipListMap();
private final NMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInNM nmTokenSecretManager;
@@ -516,6 +529,16 @@ public int getHttpPort() {
getIncreasedContainers() {
return this.increasedContainers;
}
+
+ @Override
+ public ConcurrentMap getQueuedContainers() {
+ return this.queuedContainers;
+ }
+
+ @Override
+ public ConcurrentMap getKilledQueuedContainers() {
+ return this.killedQueuedContainers;
+ }
@Override
public NMContainerTokenSecretManager getContainerTokenSecretManager() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEvent.java
new file mode 100644
index 0000000..491c164
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEvent.java
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+
+public class ContainerExecutionEvent
+ extends AbstractEvent {
+
+ private final StartContainerRequest startRequest;
+ private final ContainerTokenIdentifier containerTokenIdentifier;
+ private final ContainerId containerId;
+ private final NMTokenIdentifier nmTokenIdentifier;
+
+ public ContainerExecutionEvent(StartContainerRequest startRequest,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ NMTokenIdentifier nmTokenIdentifier,
+ ContainerExecutionEventType eventType) {
+ super(eventType);
+ this.startRequest = startRequest;
+ this.containerTokenIdentifier = containerTokenIdentifier;
+ this.containerId = containerTokenIdentifier.getContainerID();
+ this.nmTokenIdentifier = nmTokenIdentifier;
+ }
+
+ public ContainerExecutionEvent(ContainerId containerId,
+ ContainerExecutionEventType eventType) {
+ super(eventType);
+ this.startRequest = null;
+ this.containerTokenIdentifier = null;
+ this.containerId = containerId;
+ this.nmTokenIdentifier = null;
+ }
+
+ public StartContainerRequest getStartRequest() {
+ return this.startRequest;
+ }
+
+ public ContainerTokenIdentifier getContainerTokenIdentifier() {
+ return this.containerTokenIdentifier;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ NMTokenIdentifier getNMTokenIdentifier() {
+ return this.nmTokenIdentifier;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEventType.java
new file mode 100644
index 0000000..46b90ad
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerExecutionEventType.java
@@ -0,0 +1,24 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+public enum ContainerExecutionEventType {
+ CONTAINER_EXECUTION_START,
+ CONTAINER_EXECUTION_STOP
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 7d51477..c007b76 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -73,6 +73,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -133,6 +134,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ChangeMonitoringContainerResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerQueuingEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerQueuingEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -257,6 +260,14 @@ public void serviceInit(Configuration conf) throws Exception {
this.amrmProxyService =
new AMRMProxyService(this.context, this.dispatcher);
addService(this.amrmProxyService);
+
+ // Should we add to the dispatcher when amrmProxy is enabled or
+ // explicitly when distributed scheduling is enabled?
+ dispatcher.register(ContainerExecutionEventType.class,
+ new ContainerExecutionEventDispatcher());
+ dispatcher.register(ContainerQueuingEventType.class,
+ ((ContainersMonitorImpl) this.containersMonitor)
+ .createContainerQueuingEventDispatcher());
} else {
LOG.info("AMRMProxyService is disabled");
}
@@ -774,6 +785,7 @@ protected void authorizeStartAndResourceIncreaseRequest(
/**
* Start a list of containers on this NodeManager.
*/
+ @SuppressWarnings("unchecked")
@Override
public StartContainersResponse startContainers(
StartContainersRequest requests) throws YarnException, IOException {
@@ -815,8 +827,21 @@ public StartContainersResponse startContainers(
this.amrmProxyService.processApplicationStartRequest(request);
}
- startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
- request);
+ if (this.context.isDistributedSchedulingEnabled()) {
+ this.context.getQueuedContainers().put(containerId,
+ containerTokenIdentifier);
+
+ this.dispatcher.getEventHandler()
+ .handle(new ContainerQueuingEvent(nmTokenIdentifier,
+ containerTokenIdentifier,
+ containerTokenIdentifier.getExecutionType(), request,
+ ContainerQueuingEventType.CONTAINER_REQUEST_ARRIVED));
+ } else {
+ startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
+ request);
+ }
+ // TODO KONSTANTINOS: Should containers be in the succeededContainers already
+ // in case distributed scheduling is enabled?
succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -894,6 +919,7 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
authorizeStartAndResourceIncreaseRequest(
nmTokenIdentifier, containerTokenIdentifier, true);
// update NMToken
+ // TODO KONSTANTINOS: Should this updateNMToken happen even for opportunistic containers?
updateNMTokenIdentifier(nmTokenIdentifier);
ContainerId containerId = containerTokenIdentifier.getContainerID();
@@ -901,6 +927,8 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
String user = containerTokenIdentifier.getApplicationSubmitter();
LOG.info("Start request for " + containerIdStr + " by user " + user);
+
+ this.context.getQueuedContainers().remove(containerId);
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
@@ -975,6 +1003,22 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
this.readLock.unlock();
}
}
+
+ @SuppressWarnings("unchecked")
+ private void containerFailedToStart(ContainerId containerId,
+ ContainerTokenIdentifier containerTokenId) {
+ this.context.getQueuedContainers().remove(containerId);
+
+ this.dispatcher.getEventHandler()
+ .handle(new ContainerQueuingEvent(null, containerTokenId,
+ containerTokenId.getExecutionType(), null,
+ ContainerQueuingEventType.CONTAINER_FAILED_TO_START));
+
+ // TODO KONSTANTINOS: Fix NodeStatusUpdater to take into account killed
+ // queued containers.
+ this.context.getKilledQueuedContainers().put(containerTokenId,
+ "Container removed from queue as it failed to start.");
+ }
protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
org.apache.hadoop.yarn.api.records.Token token,
@@ -1058,6 +1102,8 @@ private void changeContainerResourceInternal(
Container container = context.getContainers().get(containerId);
// Check container existence
if (container == null) {
+ // TODO KONSTANTINOS: If you can changeContainerResourceInternal for
+ // a container that is still queued, we need to account for this case.
if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " was recently stopped on node manager.");
@@ -1178,8 +1224,24 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
authorizeGetAndStopContainerRequest(containerID, container, true,
nmTokenIdentifier);
+ // If container is null and distributed scheduling is enabled, container
+ // might be queued. Otherwise, container might not be handled by this NM.
if (container == null) {
- if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+ if (this.context.isDistributedSchedulingEnabled()
+ && this.context.getQueuedContainers().containsKey(containerID)) {
+ ContainerTokenIdentifier containerTokenId = this.context
+ .getQueuedContainers().remove(containerID);
+
+ this.dispatcher.getEventHandler()
+ .handle(new ContainerQueuingEvent(null, containerTokenId,
+ containerTokenId.getExecutionType(), null,
+ ContainerQueuingEventType.QUEUED_CONTAINER_REMOVED));
+
+ this.context.getKilledQueuedContainers().put(containerTokenId,
+ "Queued container request removed by ApplicationMaster.");
+
+ nodeStatusUpdater.sendOutofBandHeartBeat();
+ } else if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
throw RPCUtil.getRemoteException("Container " + containerIDStr
+ " is not handled by this NodeManager");
}
@@ -1199,6 +1261,18 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
nodeStatusUpdater.sendOutofBandHeartBeat();
}
}
+
+ private void stopContainerInternalIfNotQueued(ContainerId containerID)
+ throws YarnException, IOException {
+ if (this.context.getContainers().containsKey(containerID)) {
+ UserGroupInformation remoteUgi = getRemoteUgi();
+ NMTokenIdentifier identifier = selectNMTokenIdentifier(remoteUgi);
+ if (identifier == null) {
+ throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
+ }
+ stopContainerInternal(identifier, containerID);
+ }
+ }
/**
* Get a list of container statuses running on this NodeManager
@@ -1237,7 +1311,16 @@ private ContainerStatus getContainerStatusInternal(ContainerId containerID,
nmTokenIdentifier);
if (container == null) {
- if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
+ if (this.context.getQueuedContainers().containsKey(containerID)) {
+ // TODO KONSTANTINOS: Is the Resource.newInstance(0, 0) proper here or
+ // we need the actual value?
+ ExecutionType executionType = this.context.getQueuedContainers()
+ .get(containerID).getExecutionType();
+ return BuilderUtils.newContainerStatus(containerID,
+ org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "",
+ ContainerExitStatus.INVALID, Resource.newInstance(0, 0),
+ executionType);
+ } else if (nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
throw RPCUtil.getRemoteException("Container " + containerIDStr
+ " was recently stopped on node manager.");
} else {
@@ -1315,6 +1398,42 @@ public void handle(ApplicationEvent event) {
}
}
}
+
+ class ContainerExecutionEventDispatcher
+ implements EventHandler {
+ @Override
+ public void handle(ContainerExecutionEvent containerExecutionEvent) {
+ StartContainerRequest containerReq = containerExecutionEvent
+ .getStartRequest();
+ ContainerId containerId = containerExecutionEvent
+ .getContainerTokenIdentifier().getContainerID();
+
+ switch (containerExecutionEvent.getType()) {
+ case CONTAINER_EXECUTION_START:
+ try {
+ startContainerInternal(containerExecutionEvent.getNMTokenIdentifier(),
+ containerExecutionEvent.getContainerTokenIdentifier(),
+ containerReq);
+ } catch (YarnException | IOException e) {
+ containerFailedToStart(containerId,
+ containerExecutionEvent.getContainerTokenIdentifier());
+ LOG.error("Container failed to start.", e);
+ }
+ break;
+ case CONTAINER_EXECUTION_STOP:
+ try {
+ stopContainerInternalIfNotQueued(containerId);
+ } catch (YarnException | IOException e) {
+ LOG.error("Container did not get removed successfully.", e);
+ }
+ break;
+ default:
+ throw new YarnRuntimeException(
+ "Got an unknown ContainerExecutionEvent type: "
+ + containerExecutionEvent.getType());
+ }
+ }
+ }
@SuppressWarnings("unchecked")
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index fb1728a..51dd3b5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -432,7 +432,8 @@ public ContainerStatus cloneAndGetContainerStatus() {
this.readLock.lock();
try {
return BuilderUtils.newContainerStatus(this.containerId,
- getCurrentState(), diagnostics.toString(), exitCode, getResource());
+ getCurrentState(), diagnostics.toString(), exitCode, getResource(),
+ this.containerTokenIdentifier.getExecutionType());
} finally {
this.readLock.unlock();
}
@@ -486,7 +487,8 @@ private void sendFinishedEvents() {
EventHandler eventHandler = dispatcher.getEventHandler();
eventHandler.handle(new ApplicationContainerFinishedEvent(containerId));
// Remove the container from the resource-monitor
- eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
+ eventHandler.handle(new ContainerStopMonitoringEvent(containerId,
+ containerTokenIdentifier.getExecutionType()));
// Tell the logService too
eventHandler.handle(new LogHandlerContainerFinishedEvent(
containerId, exitCode));
@@ -520,10 +522,10 @@ private void sendContainerMonitorStartEvent() {
int cpuVcores = getResource().getVirtualCores();
long localizationDuration = containerLaunchStartTime -
containerLocalizationStartTime;
- dispatcher.getEventHandler().handle(
- new ContainerStartMonitoringEvent(containerId,
- vmemBytes, pmemBytes, cpuVcores, launchDuration,
- localizationDuration));
+ dispatcher.getEventHandler()
+ .handle(new ContainerStartMonitoringEvent(containerId,
+ containerTokenIdentifier.getExecutionType(), vmemBytes, pmemBytes,
+ cpuVcores, launchDuration, localizationDuration));
}
private void addDiagnostics(String... diags) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEvent.java
new file mode 100644
index 0000000..ed2138b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEvent.java
@@ -0,0 +1,69 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+
+public class ContainerQueuingEvent
+ extends AbstractEvent {
+
+ private final NMTokenIdentifier nmTokenIdentifier;
+ private final ContainerTokenIdentifier containerTokenIdentifier;
+ private final ContainerId containerId;
+ private final ExecutionType executionType;
+ private final StartContainerRequest startRequest;
+
+ public ContainerQueuingEvent(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ ExecutionType executionType, StartContainerRequest startRequest,
+ ContainerQueuingEventType eventType) {
+ super(eventType);
+ this.nmTokenIdentifier = nmTokenIdentifier;
+ this.containerTokenIdentifier = containerTokenIdentifier;
+ this.containerId = containerTokenIdentifier.getContainerID();
+ this.executionType = executionType;
+ this.startRequest = startRequest;
+ }
+
+ NMTokenIdentifier getNMTokenIdentifier() {
+ return nmTokenIdentifier;
+ }
+
+ public ContainerTokenIdentifier getContainerTokenIdentifier() {
+ return containerTokenIdentifier;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public ExecutionType getExecutionType() {
+ return executionType;
+ }
+
+ public StartContainerRequest getStartRequest() {
+ return startRequest;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEventType.java
new file mode 100644
index 0000000..ca26cc6
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerQueuingEventType.java
@@ -0,0 +1,25 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+public enum ContainerQueuingEventType {
+ CONTAINER_REQUEST_ARRIVED,
+ CONTAINER_FAILED_TO_START,
+ QUEUED_CONTAINER_REMOVED
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java
index c09bebf..27de052 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
public class ContainerStartMonitoringEvent extends ContainersMonitorEvent {
@@ -28,10 +29,10 @@
private final long launchDuration;
private final long localizationDuration;
- public ContainerStartMonitoringEvent(ContainerId containerId,
+ public ContainerStartMonitoringEvent(ContainerId containerId, ExecutionType executionType,
long vmemLimit, long pmemLimit, int cpuVcores, long launchDuration,
long localizationDuration) {
- super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER);
+ super(containerId, executionType, ContainersMonitorEventType.START_MONITORING_CONTAINER);
this.vmemLimit = vmemLimit;
this.pmemLimit = pmemLimit;
this.cpuVcores = cpuVcores;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
index 240c5c0..d883281 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
@@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
public class ContainerStopMonitoringEvent extends ContainersMonitorEvent {
- public ContainerStopMonitoringEvent(ContainerId containerId) {
- super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
+ public ContainerStopMonitoringEvent(ContainerId containerId,
+ ExecutionType executionType) {
+ super(containerId, executionType,
+ ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java
index 56e578b..f3aa721 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorEvent.java
@@ -19,21 +19,33 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class ContainersMonitorEvent extends
AbstractEvent {
private final ContainerId containerId;
+ private final ExecutionType executionType;
public ContainersMonitorEvent(ContainerId containerId,
ContainersMonitorEventType eventType) {
+ this(containerId, ExecutionType.GUARANTEED, eventType);
+ }
+
+ public ContainersMonitorEvent(ContainerId containerId,
+ ExecutionType executionType, ContainersMonitorEventType eventType) {
super(eventType);
this.containerId = containerId;
+ this.executionType = executionType;
}
public ContainerId getContainerId() {
return this.containerId;
}
+
+ public ExecutionType getExecutionType() {
+ return this.executionType;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index e6c3642..c0de0ac 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -18,11 +18,16 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -30,20 +35,25 @@
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerExecutionEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerExecutionEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
-import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
public class ContainersMonitorImpl extends AbstractService implements
@@ -61,6 +71,14 @@
@VisibleForTesting
final Map trackingContainers =
new ConcurrentHashMap<>();
+
+ private Map logicalGuarContainers;
+ private Map logicalOpportContainers;
+ private ResourceUtilization logicalContainersUtilization;
+
+ private Queue queuedGuarRequests;
+ private Queue queuedOpportRequests;
+ private Set opportContainersToKill;
private final ContainerExecutor containerExecutor;
private final Dispatcher eventDispatcher;
@@ -99,6 +117,17 @@ public ContainersMonitorImpl(ContainerExecutor exec,
this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
this.queuedContainersStatus = QueuedContainersStatus.newInstance();
+
+ if (this.context.isDistributedSchedulingEnabled()) {
+ this.logicalGuarContainers = new ConcurrentHashMap<>();
+ this.logicalOpportContainers = new ConcurrentHashMap<>();
+ this.logicalContainersUtilization = ResourceUtilization.newInstance(0, 0,
+ 0.0f);
+ this.queuedGuarRequests = new LinkedList<>();
+ this.queuedOpportRequests = new LinkedList<>();
+ this.opportContainersToKill = Collections
+ .synchronizedSet(new HashSet());
+ }
}
@Override
@@ -313,6 +342,293 @@ public synchronized void setResourceLimit(
this.cpuVcores = cpuVcores;
}
}
+
+ private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
+ Resource resource) {
+ long pmemBytes = resource.getMemory() * 1024 * 1024L;
+ float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ long vmemBytes = (long) (pmemRatio * pmemBytes);
+ int cpuVcores = resource.getVirtualCores();
+
+ return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes,
+ cpuVcores);
+ }
+
+ /**
+ * @return true if there are available logical resources for the given
+ * container to start.
+ */
+ private boolean hasLogicalResourcesAvailable(ProcessTreeInfo pti) {
+ // TODO: Would it be better to copy the logicalContainersUtiliation locally,
+ // instead of holding the lock for the whole check?
+ synchronized (this.logicalContainersUtilization) {
+ // Check physical memory.
+ if (this.logicalContainersUtilization.getPhysicalMemory() + (int) (pti
+ .getPmemLimit() >> 20) > (int) (getPmemAllocatedForContainers() >> 20)) {
+ return false;
+ }
+ // Check virtual memory.
+ if (this.logicalContainersUtilization.getVirtualMemory() + (int) (pti
+ .getVmemLimit() >> 20) > (int) (getVmemAllocatedForContainers() >> 20)) {
+ return false;
+ }
+ // Check CPU.
+ if (this.logicalContainersUtilization.getCPU()
+ + logicalCpuUsage(pti) > 1.0f) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void increaseResourceUtil(ResourceUtilization resourceUtil,
+ ProcessTreeInfo pti) {
+ resourceUtil.addTo((int) (pti.getPmemLimit() >> 20),
+ (int) (pti.getVmemLimit() >> 20), logicalCpuUsage(pti));
+ }
+
+ private void decreaseResourceUtil(ResourceUtilization resourceUtil,
+ ProcessTreeInfo pti) {
+ resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20),
+ (int) (pti.getVmemLimit() >> 20), logicalCpuUsage(pti));
+ }
+
+ private float logicalCpuUsage(ProcessTreeInfo pti) {
+ float cpuUsagePercentPerCore = pti.getCpuVcores() * 100.0f;
+ float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore
+ / resourceCalculatorPlugin.getNumProcessors();
+ return (cpuUsageTotalCoresPercentage * 1000 * maxVCoresAllottedForContainers
+ / nodeCpuPercentageForYARN) / 1000.0f;
+ }
+
+ private class LogicalContainerInfo {
+ private ContainerQueuingEvent contQueuingEvent;
+ private ProcessTreeInfo pti;
+
+ LogicalContainerInfo(ContainerQueuingEvent contQueuingEvent,
+ ProcessTreeInfo pti) {
+ this.contQueuingEvent = contQueuingEvent;
+ this.pti = pti;
+ }
+
+ ContainerQueuingEvent getContQueuingEvent() {
+ return this.contQueuingEvent;
+ }
+
+ ProcessTreeInfo getPti() {
+ return this.pti;
+ }
+ }
+
+ /**
+ * Start the execution of the given container. Add it to the logical
+ * containers, update logical resource utilization and send event to
+ * ContainerManager to start the actual execution of the container.
+ */
+ @SuppressWarnings("unchecked")
+ private void startLogicalContainer(LogicalContainerInfo logicalContInfo) {
+ ContainerQueuingEvent contQueuingEvent = logicalContInfo.getContQueuingEvent();
+ ProcessTreeInfo pti = logicalContInfo.getPti();
+
+ if (contQueuingEvent.getExecutionType() == ExecutionType.GUARANTEED) {
+ logicalGuarContainers.put(pti.getContainerId(), pti);
+ } else {
+ logicalOpportContainers.put(pti.getContainerId(), pti);
+ }
+
+ increaseResourceUtil(this.logicalContainersUtilization, pti);
+
+ // Send event to ContainerManager to start execution of container.
+ eventDispatcher.getEventHandler()
+ .handle(new ContainerExecutionEvent(contQueuingEvent.getStartRequest(),
+ contQueuingEvent.getContainerTokenIdentifier(),
+ contQueuingEvent.getNMTokenIdentifier(),
+ ContainerExecutionEventType.CONTAINER_EXECUTION_START));
+ }
+
+ /**
+ * If there are available resources, try to start as many pending containers
+ * as possible.
+ */
+ private void startPendingContainers() {
+ boolean resourcesAvailable = true;
+ // Start pending guaranteed containers, if resources available.
+ synchronized (queuedGuarRequests) {
+ Iterator guarIter = queuedGuarRequests.iterator();
+ while (guarIter.hasNext() && resourcesAvailable) {
+ LogicalContainerInfo logicalContInfo = guarIter.next();
+
+ if (hasLogicalResourcesAvailable(logicalContInfo.getPti())) {
+ startLogicalContainer(logicalContInfo);
+ guarIter.remove();
+ } else {
+ resourcesAvailable = false;
+ }
+ }
+ }
+ if (!resourcesAvailable) {
+ return;
+ }
+ // Start opportunistic container, if resources available.
+ synchronized (queuedOpportRequests) {
+ Iterator opportIter = queuedOpportRequests.iterator();
+ while (opportIter.hasNext() && resourcesAvailable) {
+ LogicalContainerInfo logicalContInfo = opportIter.next();
+
+ if (hasLogicalResourcesAvailable(logicalContInfo.getPti())) {
+ startLogicalContainer(logicalContInfo);
+ opportIter.remove();
+ } else {
+ resourcesAvailable = false;
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove the given container from the logical containers, and update logical
+ * container utilization accordingly.
+ */
+ private void removeLogicalContainer(ContainerId containerId,
+ ExecutionType executionType) {
+ ProcessTreeInfo ptiToRemove = null;
+ if (executionType == ExecutionType.GUARANTEED) {
+ ptiToRemove = logicalGuarContainers.remove(containerId);
+ } else {
+ ptiToRemove = logicalOpportContainers.remove(containerId);
+ }
+ // If container was indeed running, update logical resource utilization.
+ if (ptiToRemove != null) {
+ decreaseResourceUtil(this.logicalContainersUtilization, ptiToRemove);
+ }
+ }
+
+ /**
+ * Remove the given container from the container queues.
+ *
+ * @return true if the container was found in one of the queues.
+ */
+ private boolean removeContainerFromQueues(ContainerId containerId,
+ ExecutionType executionType) {
+ boolean foundInQueue = false;
+ if (executionType == ExecutionType.GUARANTEED) {
+ synchronized (queuedGuarRequests) {
+ Iterator guarIter = queuedGuarRequests.iterator();
+ while (guarIter.hasNext() && !foundInQueue) {
+ if (guarIter.next().getPti().getContainerId().equals(containerId)) {
+ guarIter.remove();
+ foundInQueue = true;
+ }
+ }
+ }
+ } else {
+ synchronized (queuedOpportRequests) {
+ Iterator opportIter = queuedOpportRequests
+ .iterator();
+ while (opportIter.hasNext() && !foundInQueue) {
+ if (opportIter.next().getPti().getContainerId().equals(containerId)) {
+ opportIter.remove();
+ foundInQueue = true;
+ }
+ }
+ }
+ }
+
+ return foundInQueue;
+ }
+
+ /**
+ * Kill opportunistic containers to free up resources for running the given
+ * container.
+ *
+ * @param logicalContInfo
+ * the container whose execution needs to start by freeing up
+ * resources occupied by opportunistic containers.
+ */
+ @SuppressWarnings("unchecked")
+ private void killOpportContainers(LogicalContainerInfo logicalContInfo) {
+ ContainerId containerToStartId = logicalContInfo.getContQueuingEvent()
+ .getContainerId();
+ // Track resources that need to be freed.
+ ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+ containerToStartId);
+
+ // Go over the running opportunistic containers. Avoid containers that have
+ // already been marked for killing.
+ for (Entry runningOpportCont : logicalOpportContainers
+ .entrySet()) {
+ ContainerId runningOpportContId = runningOpportCont.getKey();
+
+ // If there are sufficient resources to execute the given container, do
+ // not kill more opportunistic containers.
+ if (resourcesToFreeUp.getPhysicalMemory() <= 0
+ && resourcesToFreeUp.getVirtualMemory() <= 0
+ && resourcesToFreeUp.getCPU() <= 0.f) {
+ break;
+ }
+
+ if (!opportContainersToKill.contains(runningOpportContId)) {
+ opportContainersToKill.add(runningOpportContId);
+ decreaseResourceUtil(resourcesToFreeUp, runningOpportCont.getValue());
+ // Send event to ContainerManager for actually killing the container.
+ eventDispatcher.getEventHandler()
+ .handle(new ContainerExecutionEvent(runningOpportContId,
+ ContainerExecutionEventType.CONTAINER_EXECUTION_STOP));
+ }
+ }
+ }
+
+ /**
+ * Calculates the amount of resources that need to be free up (by killing
+ * opportunistic containers) in order for the given guaranteed container to
+ * start its execution.
+ * Resource utilization to be freed up = logicalContainersUtilization -
+ * utilization of opportContainersToKill + utilization of pending guaranteed
+ * containers that will start before the given container + utilization of
+ * given container - total resources of node.
+ *
+ * @param containerToStartId
+ * the ContainerId of the guaranteed container for which we need to
+ * free resources, so that its execution can start.
+ * @return the resources that need to be freed up for the given guaranteed
+ * container to start.
+ */
+ private ResourceUtilization resourcesToFreeUp(
+ ContainerId containerToStartId) {
+ // Get current utilization of logical containers.
+ ResourceUtilization resourceUtilToFreeUp = ResourceUtilization
+ .newInstance(logicalContainersUtilization);
+
+ // Subtract from the utilization the utilization of the opportunistic
+ // containers that are marked for killing.
+ synchronized (opportContainersToKill) {
+ for (ContainerId opportContId : opportContainersToKill) {
+ if (this.logicalOpportContainers.containsKey(opportContId)) {
+ decreaseResourceUtil(resourceUtilToFreeUp,
+ this.logicalOpportContainers.get(opportContId));
+ }
+ }
+ }
+ // Add to the utilization the utilization of the pending guaranteed
+ // containers that
+ // will start before the current container will be started.
+ synchronized (queuedGuarRequests) {
+ for (LogicalContainerInfo guarContInfo : queuedGuarRequests) {
+ increaseResourceUtil(resourceUtilToFreeUp, guarContInfo.getPti());
+ if (guarContInfo.getContQueuingEvent().getContainerId()
+ .equals(containerToStartId)) {
+ break;
+ }
+ }
+ }
+ // Subtract the overall node resources.
+ resourceUtilToFreeUp.subtractFrom(
+ (int) (getPmemAllocatedForContainers() >> 20),
+ (int) (getVmemAllocatedForContainers() >> 20), 1.0f);
+
+ return resourceUtilToFreeUp;
+ }
/**
* Check whether a container's process tree's current memory usage is over
@@ -408,6 +724,7 @@ public void run() {
long pmemByAllContainers = 0;
long cpuUsagePercentPerCoreByAllContainers = 0;
long cpuUsageTotalCoresByAllContainers = 0;
+
for (Entry entry : trackingContainers
.entrySet()) {
ContainerId containerId = entry.getKey();
@@ -739,11 +1056,17 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
LOG.info("Stopping resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.remove(containerId);
+ // Remove finished container from the logical containers.
+ // Attempt to start new containers, if resources available.
+ removeLogicalContainer(containerId, monitoringEvent.getExecutionType());
+ opportContainersToKill.remove(containerId);
+ startPendingContainers();
break;
case CHANGE_MONITORING_CONTAINER_RESOURCE:
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
+ // TODO KONSTANTINOS: Update available resources.
if (processTreeInfo == null) {
LOG.warn("Failed to track container "
+ containerId.toString()
@@ -762,4 +1085,71 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
// TODO: Wrong event.
}
}
+
+ public ContainerQueuingEventDispatcher createContainerQueuingEventDispatcher() {
+ return new ContainerQueuingEventDispatcher();
+ }
+
+ public class ContainerQueuingEventDispatcher implements
+ EventHandler {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handle(ContainerQueuingEvent containerQueuingEvent) {
+ if (!isEnabled()) {
+ return;
+ }
+
+ switch (containerQueuingEvent.getType()) {
+ case CONTAINER_REQUEST_ARRIVED:
+ ProcessTreeInfo pti = createProcessTreeInfo(
+ containerQueuingEvent.getContainerId(),
+ containerQueuingEvent.getContainerTokenIdentifier().getResource());
+
+ LogicalContainerInfo logicalContInfo = new LogicalContainerInfo(
+ containerQueuingEvent, pti);
+
+ // If there are already free resources to start the container.
+ if (hasLogicalResourcesAvailable(pti)) {
+ startLogicalContainer(logicalContInfo);
+ } else {
+ if (containerQueuingEvent
+ .getExecutionType() == ExecutionType.GUARANTEED) {
+ synchronized (queuedGuarRequests) {
+ queuedGuarRequests.add(logicalContInfo);
+ }
+ // Kill running opportunistic containers to make space for
+ // guaranteed container.
+ killOpportContainers(logicalContInfo);
+ } else {
+ synchronized (queuedOpportRequests) {
+ queuedOpportRequests.add(logicalContInfo);
+ }
+ }
+ }
+ break;
+ case QUEUED_CONTAINER_REMOVED:
+ boolean foundInQueue = removeContainerFromQueues(
+ containerQueuingEvent.getContainerId(),
+ containerQueuingEvent.getExecutionType());
+
+ // The container started in the meanwhile, so the ContainerManager needs
+ // to be notified and stop it.
+ if (!foundInQueue) {
+ eventDispatcher.getEventHandler()
+ .handle(new ContainerExecutionEvent(
+ containerQueuingEvent.getContainerTokenIdentifier()
+ .getContainerID(),
+ ContainerExecutionEventType.CONTAINER_EXECUTION_STOP));
+ }
+ break;
+ case CONTAINER_FAILED_TO_START:
+ removeLogicalContainer(containerQueuingEvent.getContainerId(),
+ containerQueuingEvent.getExecutionType());
+ break;
+ default:
+ // TODO: Wrong event
+ }
+ }
+
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index e1ffd88..f0be1c0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -619,6 +620,16 @@ public int getHttpPort() {
public ConcurrentMap getContainers() {
return null;
}
+
+ @Override
+ public ConcurrentMap getQueuedContainers() {
+ return null;
+ }
+
+ @Override
+ public ConcurrentMap getKilledQueuedContainers() {
+ return null;
+ }
@Override
public ConcurrentMap getIncreasedContainers() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index d7f89fc..c4247ac 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -153,7 +154,7 @@ public void testContainersResourceChange() throws Exception {
containersMonitor.start();
// create container 1
containersMonitor.handle(new ContainerStartMonitoringEvent(
- getContainerId(1), 2100L, 1000L, 1, 0, 0));
+ getContainerId(1), ExecutionType.GUARANTEED, 2100L, 1000L, 1, 0, 0));
// verify that this container is properly tracked
assertNotNull(getProcessTreeInfo(getContainerId(1)));
assertEquals(1000L, getProcessTreeInfo(getContainerId(1))
@@ -173,8 +174,9 @@ public void testContainersResourceChange() throws Exception {
assertTrue(containerEventHandler
.isContainerKilled(getContainerId(1)));
// create container 2
- containersMonitor.handle(new ContainerStartMonitoringEvent(
- getContainerId(2), 2202009L, 1048576L, 1, 0, 0));
+ containersMonitor
+ .handle(new ContainerStartMonitoringEvent(getContainerId(2),
+ ExecutionType.GUARANTEED, 2202009L, 1048576L, 1, 0, 0));
// verify that this container is properly tracked
assertNotNull(getProcessTreeInfo(getContainerId(2)));
assertEquals(1048576L, getProcessTreeInfo(getContainerId(2))
@@ -215,8 +217,9 @@ public void testContainersResourceChangeIsTriggeredImmediately()
// now waiting for the next monitor cycle
Thread.sleep(1000);
// create a container with id 3
- containersMonitor.handle(new ContainerStartMonitoringEvent(
- getContainerId(3), 2202009L, 1048576L, 1, 0, 0));
+ containersMonitor
+ .handle(new ContainerStartMonitoringEvent(getContainerId(3),
+ ExecutionType.GUARANTEED, 2202009L, 1048576L, 1, 0, 0));
// Verify that this container has been tracked
assertNotNull(getProcessTreeInfo(getContainerId(3)));
// trigger a change resource event, check limit after change