diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerSubState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerSubState.java
index fed453791f5..8aeafb73140 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerSubState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerSubState.java
@@ -27,7 +27,7 @@
@InterfaceStability.Unstable
public enum ContainerSubState {
/*
- * NEW, LOCALIZING, SCHEDULED,
+ * NEW, QUEUED_FOR_LOCALIZATION, LOCALIZING, SCHEDULED,
* REINITIALIZING_AWAITING_KILL, RELAUNCHING,
*/
SCHEDULED,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 134b69871ce..1d4a6cad670 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1145,6 +1145,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =
0;
+ /**
+ * Max number of queued OPPORTUNISTIC containers after which
+ * OPPORTUNISTIC containers would be queued for localization.
+ */
+ public static final String NM_OPPORTUNISTIC_CONTAINERS_LOCALIZATION_THROTTLE_LIMIT =
+ NM_PREFIX + "opportunistic-containers-localization-throttle-limit";
+ public static final int DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_LOCALIZATION_THROTTLE_LIMIT =
+ -1;
+
/** Setting that controls whether distributed scheduling is enabled or not. */
public static final String DIST_SCHEDULING_ENABLED =
NM_PREFIX + "distributed-scheduling.enabled";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 4573859d384..a1c56d8351a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -115,7 +115,7 @@ enum ContainerStateProto {
enum ContainerSubStateProto {
/**
- * NEW, LOCALIZING, SCHEDULED,
+ * NEW, QUEUED_FOR_LOCALIZATION, LOCALIZING, SCHEDULED,
* REINITIALIZING_AWAITING_KILL, RELAUNCHING,
*/
CSS_SCHEDULED = 1;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4b93d1e18aa..87598ab9005 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1121,6 +1121,13 @@
0
+
+ Max number of queued OPPORTUNISTIC containers after which
+ OPPORTUNISTIC containers would be queued for localization.
+ yarn.nodemanager.opportunistic-containers-localization-throttle-limit
+ -1
+
+
Number of seconds after an application finishes before the nodemanager's
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 75e32e482cd..75f08d3ac30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -47,6 +47,6 @@
// Producer: ContainerScheduler
CONTAINER_TOKEN_UPDATED,
-
+ LOCALIZE_CONTAINER,
RECOVER_PAUSED_CONTAINER
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index b79c305a0e6..f36439c7421 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -313,19 +313,37 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
new StateMachineFactory(ContainerState.NEW)
// From NEW State
.addTransition(ContainerState.NEW,
- EnumSet.of(ContainerState.LOCALIZING,
+ EnumSet.of(ContainerState.QUEUED_FOR_LOCALIZATION,
ContainerState.SCHEDULED,
- ContainerState.LOCALIZATION_FAILED,
ContainerState.DONE),
ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
.addTransition(ContainerState.NEW, ContainerState.NEW,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE,
- ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
+ ContainerEventType.KILL_CONTAINER,
+ new KillBeforeLocalizationTransition())
.addTransition(ContainerState.NEW, ContainerState.NEW,
ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
+ // From QUEUED_FOR_LOCALIZATION State
+ .addTransition(ContainerState.QUEUED_FOR_LOCALIZATION,
+ EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED,
+ ContainerState.LOCALIZATION_FAILED),
+ ContainerEventType.LOCALIZE_CONTAINER, new LocalizationTransition())
+ .addTransition(ContainerState.QUEUED_FOR_LOCALIZATION,
+ ContainerState.QUEUED_FOR_LOCALIZATION,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.QUEUED_FOR_LOCALIZATION,
+ ContainerState.DONE,
+ ContainerEventType.KILL_CONTAINER,
+ new KillBeforeLocalizationTransition())
+ .addTransition(ContainerState.QUEUED_FOR_LOCALIZATION,
+ ContainerState.QUEUED_FOR_LOCALIZATION,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
+
// From LOCALIZING State
.addTransition(ContainerState.LOCALIZING,
EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED),
@@ -763,6 +781,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
private ContainerSubState getContainerSubState() {
switch (stateMachine.getCurrentState()) {
case NEW:
+ case QUEUED_FOR_LOCALIZATION:
case LOCALIZING:
case SCHEDULED:
case REINITIALIZING_AWAITING_KILL:
@@ -1019,7 +1038,6 @@ public void sendLaunchEvent() {
dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent));
}
-
}
@SuppressWarnings("unchecked") // dispatcher not typed
@@ -1035,6 +1053,11 @@ private void sendScheduleEvent() {
}
}
+ private void sendQueueContainerForLocalizationEvent() {
+ dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this,
+ ContainerSchedulerEventType.QUEUE_CONTAINER_FOR_LOCALIZAITON));
+ }
+
@SuppressWarnings("unchecked") // dispatcher not typed
@Override
public void sendKillEvent(int exitStatus, String description) {
@@ -1174,16 +1197,13 @@ public ContainerState transition(ContainerImpl container,
return ContainerState.DONE;
} else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) {
return ContainerState.SCHEDULED;
- } else if (container.recoveredAsKilled &&
- container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
+ } else if (container.recoveredAsKilled && container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
// container was killed but never launched
container.metrics.killedContainer();
- NMAuditLogger.logSuccess(container.user,
- AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
- container.containerId.getApplicationAttemptId().getApplicationId(),
+ NMAuditLogger.logSuccess(container.user, AuditConstants.FINISH_KILLED_CONTAINER,
+ "ContainerImpl", container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
- container.metrics.releaseContainer(
- container.containerTokenIdentifier.getResource());
+ container.metrics.releaseContainer(container.containerTokenIdentifier.getResource());
container.sendFinishedEvents();
return ContainerState.DONE;
}
@@ -1191,25 +1211,37 @@ public ContainerState transition(ContainerImpl container,
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();
- container.dispatcher.getEventHandler().handle(new AuxServicesEvent
- (AuxServicesEventType.CONTAINER_INIT, container));
+ container.dispatcher.getEventHandler().handle(
+ new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT, container));
// Inform the AuxServices about the opaque serviceData
- Map csd = ctxt.getServiceData();
+ Map csd = ctxt.getServiceData();
if (csd != null) {
// This can happen more than once per Application as each container may
// have distinct service data
- for (Map.Entry service : csd.entrySet()) {
+ for (Map.Entry service : csd.entrySet()) {
container.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
- container.user, container.containerId
- .getApplicationAttemptId().getApplicationId(),
+ container.user,
+ container.containerId.getApplicationAttemptId().getApplicationId(),
service.getKey().toString(), service.getValue()));
}
}
+ container.dispatcher.getEventHandler().handle(
+ new ContainerSchedulerEvent(container,
+ ContainerSchedulerEventType.QUEUE_CONTAINER_FOR_LOCALIZAITON));
+ return ContainerState.QUEUED_FOR_LOCALIZATION;
+ }
+ }
+ static class LocalizationTransition implements
+ MultipleArcTransition {
+ @SuppressWarnings("unchecked")
+ @Override
+ public ContainerState transition(ContainerImpl container,
+ ContainerEvent event) {
container.containerLocalizationStartTime = clock.getTime();
-
+ final ContainerLaunchContext ctxt = container.launchContext;
// Send requests for public, private resources
Map cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
@@ -1924,10 +1956,12 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
/**
- * Handle the following transition:
+ * Handle the following transitions:
* - NEW -> DONE upon KILL_CONTAINER
+ * - QUEUED_FOR_LOCALIZATION -> DONE upon KILL_CONTAINER
*/
- static class KillOnNewTransition extends ContainerDoneTransition {
+ static class KillBeforeLocalizationTransition extends
+ ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerLocalizeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerLocalizeEvent.java
new file mode 100644
index 00000000000..be40bbeeb86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerLocalizeEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerLocalizeEvent extends ContainerEvent {
+
+ public ContainerLocalizeEvent(ContainerId c) {
+ super(c, ContainerEventType.LOCALIZE_CONTAINER);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index a0db64619de..6a731eee3d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -28,8 +28,8 @@
// 1. ContainerImpl::getContainerSubState().
// 2. the doc in the ContainerSubState class.
// 3. the doc in the yarn_protos.proto file.
- NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
- REINITIALIZING, REINITIALIZING_AWAITING_KILL,
+ NEW, QUEUED_FOR_LOCALIZATION, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED,
+ RUNNING, RELAUNCHING, REINITIALIZING, REINITIALIZING_AWAITING_KILL,
EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
PAUSING, PAUSED, RESUMING
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 3a6d29635b0..b8e6bf8c815 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerLocalizeEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
@@ -75,6 +77,11 @@
// Capacity of the queue for opportunistic Containers.
private final int maxOppQueueLength;
+ // max queued opportunistic containers after which no O container would be
+ // localized
+ private final int maxOQueueLengthForLocalization;
+
+
// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap
queuedGuaranteedContainers = new LinkedHashMap<>();
@@ -82,6 +89,10 @@
private final LinkedHashMap
queuedOpportunisticContainers = new LinkedHashMap<>();
+ // Queue of containers waiting for localization
+ private final LinkedHashMap
+ queuedContainerForLocalization = new LinkedHashMap<>();
+
// Used to keep track of containers that have been marked to be killed
// or paused to make room for a guaranteed container.
private final Map oppContainersToKill =
@@ -123,7 +134,13 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
this(context, dispatcher, metrics, context.getConf().getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
YarnConfiguration.
- DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
+ DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH),
+ context.getConf().getInt(
+ YarnConfiguration.
+ NM_OPPORTUNISTIC_CONTAINERS_LOCALIZATION_THROTTLE_LIMIT,
+ YarnConfiguration.
+ DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_LOCALIZATION_THROTTLE_LIMIT)
+ );
}
@@ -148,12 +165,13 @@ public void serviceInit(Configuration conf) throws Exception {
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
- NodeManagerMetrics metrics, int qLength) {
+ NodeManagerMetrics metrics, int qLength, int throttleLimit) {
super(ContainerScheduler.class.getName());
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
+ this.maxOQueueLengthForLocalization = throttleLimit;
this.utilizationTracker =
new AllocationBasedResourceUtilizationTracker(this);
this.opportunisticContainersStatus =
@@ -167,6 +185,9 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
@Override
public void handle(ContainerSchedulerEvent event) {
switch (event.getType()) {
+ case QUEUE_CONTAINER_FOR_LOCALIZAITON:
+ queueContainerForLocalization(event.getContainer());
+ break;
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
@@ -180,7 +201,7 @@ public void handle(ContainerSchedulerEvent event) {
if (event instanceof UpdateContainerSchedulerEvent) {
onUpdateContainer((UpdateContainerSchedulerEvent) event);
} else {
- LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
+ LOG.error("Unknown event type on UpdateContainer: " + event.getType());
}
break;
case SHED_QUEUED_CONTAINERS:
@@ -188,6 +209,7 @@ public void handle(ContainerSchedulerEvent event) {
break;
case RECOVERY_COMPLETED:
startPendingContainers(maxOppQueueLength <= 0);
+ startLocalizationOfContainers();
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
break;
@@ -228,6 +250,11 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
// promotion request
reclaimOpportunisticContainerResources(updateEvent.getContainer());
}
+ if (queuedContainerForLocalization.containsKey(containerId)) {
+ // start localization of this container as it has been promoted.
+ localizeContainer(queuedContainerForLocalization.get(containerId));
+ queuedContainerForLocalization.remove(containerId);
+ }
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
@@ -244,11 +271,33 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
"continer update of %s", containerId), ex);
}
startPendingContainers(maxOppQueueLength <= 0);
+ startLocalizationOfContainers();
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
}
}
+ /**
+ *
+ */
+ private void startLocalizationOfContainers() {
+ int toBeLocalized = maxOQueueLengthForLocalization -
+ queuedOpportunisticContainers.size();
+ Iterator containerIter =
+ queuedContainerForLocalization.values().iterator();
+ while (containerIter.hasNext()) {
+ Container container = containerIter.next();
+ if (toBeLocalized > 0) {
+ localizeContainer(container);
+ containerIter.remove();
+ LOG.info(
+ "Opportunistic container {} will be localized",
+ container.getContainerId());
+ }
+ toBeLocalized--;
+ }
+ }
+
/**
* Populates auxiliary data structures used by the ContainerScheduler on
* recovery.
@@ -374,6 +423,7 @@ private void onResourcesReclaimed(Container container) {
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
}
+ startLocalizationOfContainers();
this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
}
@@ -506,10 +556,28 @@ protected void scheduleContainer(Container container) {
startPendingContainers(false);
}
}
+ startLocalizationOfContainers();
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
}
+ private void queueContainerForLocalization(Container container) {
+ boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
+ getExecutionType() == ExecutionType.GUARANTEED;
+ if (isGuaranteedContainer ||
+ maxOQueueLengthForLocalization < 0 ||
+ queuedOpportunisticContainers.size() < maxOQueueLengthForLocalization) {
+ localizeContainer(container);
+ } else {
+ queuedContainerForLocalization.put(container.getContainerId(), container);
+ }
+ }
+
+ private void localizeContainer(Container container) {
+ dispatcher.getEventHandler().handle(
+ new ContainerLocalizeEvent(container.getContainerId()));
+ }
+
@SuppressWarnings("unchecked")
private void reclaimOpportunisticContainerResources(Container container) {
List extraOppContainersToReclaim =
@@ -662,6 +730,7 @@ private void shedQueuedOpportunisticContainers() {
numAllowed--;
}
}
+ startLocalizationOfContainers();
this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 294eddf88e4..0514b345fa1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -22,6 +22,7 @@
* Event types associated with {@link ContainerSchedulerEvent}.
*/
public enum ContainerSchedulerEventType {
+ QUEUE_CONTAINER_FOR_LOCALIZAITON,
SCHEDULE_CONTAINER,
CONTAINER_COMPLETED,
UPDATE_CONTAINER,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index ea3acca35e1..4a0a6740cfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -347,16 +347,19 @@ public void testCleanupOnSuccess() throws Exception {
assertEquals(running, metrics.getRunningContainers());
ContainerEventType e1 = wc.initStateToEvent.get(ContainerState.NEW);
- ContainerState s2 = wc.eventToFinalState.get(e1);
- ContainerEventType e2 = wc.initStateToEvent.get(s2);
- ContainerState s3 = wc.eventToFinalState.get(e2);
- ContainerEventType e3 = wc.initStateToEvent.get(s3);
- ContainerState s4 = wc.eventToFinalState.get(e3);
- ContainerEventType e4 = wc.initStateToEvent.get(s4);
- ContainerState s5 = wc.eventToFinalState.get(e4);
- ContainerEventType e5 = wc.initStateToEvent.get(s5);
- ContainerState s6 = wc.eventToFinalState.get(e5);
-
+ ContainerState s1 = wc.eventToFinalState.get(e1);
+ ContainerEventType e2 = wc.initStateToEvent.get(s1);
+ ContainerState s2 = wc.eventToFinalState.get(e2);
+ ContainerEventType e3 = wc.initStateToEvent.get(s2);
+ ContainerState s3 = wc.eventToFinalState.get(e3);
+ ContainerEventType e4 = wc.initStateToEvent.get(s3);
+ ContainerState s4 = wc.eventToFinalState.get(e4);
+ ContainerEventType e5 = wc.initStateToEvent.get(s4);
+ ContainerState s5 = wc.eventToFinalState.get(e5);
+ ContainerEventType e6 = wc.initStateToEvent.get(s5);
+ ContainerState s6 = wc.eventToFinalState.get(e6);
+
+ Assert.assertEquals(ContainerState.QUEUED_FOR_LOCALIZATION, s1);
Assert.assertEquals(ContainerState.LOCALIZING, s2);
Assert.assertEquals(ContainerState.SCHEDULED, s3);
Assert.assertEquals(ContainerState.RUNNING, s4);
@@ -364,10 +367,11 @@ public void testCleanupOnSuccess() throws Exception {
Assert.assertEquals(ContainerState.DONE, s6);
Assert.assertEquals(ContainerEventType.INIT_CONTAINER, e1);
- Assert.assertEquals(ContainerEventType.RESOURCE_LOCALIZED, e2);
- Assert.assertEquals(ContainerEventType.CONTAINER_LAUNCHED, e3);
- Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, e4);
- Assert.assertEquals(ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, e5);
+ Assert.assertEquals(ContainerEventType.LOCALIZE_CONTAINER, e2);
+ Assert.assertEquals(ContainerEventType.RESOURCE_LOCALIZED, e3);
+ Assert.assertEquals(ContainerEventType.CONTAINER_LAUNCHED, e4);
+ Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, e5);
+ Assert.assertEquals(ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, e6);
}
finally {
if (wc != null) {
@@ -1346,7 +1350,7 @@ public boolean matches(LocalizationEvent e) {
appBus = mock(EventHandler.class);
LogBus = mock(EventHandler.class);
delService = mock(DeletionService.class);
- schedBus = new ContainerScheduler(context, dispatcher, metrics, 0) {
+ schedBus = new ContainerScheduler(context, dispatcher, metrics, 0, -1) {
@Override
protected void scheduleContainer(Container container) {
container.sendLaunchEvent();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
index b09a1f260e0..95642c5e374 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
@@ -96,7 +96,7 @@ private void setupContainerMonitor() {
ContainerManager cm = mock(ContainerManager.class);
when(cm.getContainersMonitor()).thenReturn(containersMonitor);
when(context.getContainerManager()).thenReturn(cm);
- spy = new ContainerScheduler(context, dispatcher, metrics, 0);
+ spy = new ContainerScheduler(context, dispatcher, metrics, 0, -1);
}
@Before public void setUp() throws Exception {