diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerSubState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerSubState.java index fed453791f5..8aeafb73140 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerSubState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerSubState.java @@ -27,7 +27,7 @@ @InterfaceStability.Unstable public enum ContainerSubState { /* - * NEW, LOCALIZING, SCHEDULED, + * NEW, QUEUED_FOR_LOCALIZATION, LOCALIZING, SCHEDULED, * REINITIALIZING_AWAITING_KILL, RELAUNCHING, */ SCHEDULED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 134b69871ce..1d4a6cad670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1145,6 +1145,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH = 0; + /** + * Max number of queued OPPORTUNISTIC containers after which + * OPPORTUNISTIC containers would be queued for localization. + */ + public static final String NM_OPPORTUNISTIC_CONTAINERS_LOCALIZATION_THROTTLE_LIMIT = + NM_PREFIX + "opportunistic-containers-localization-throttle-limit"; + public static final int DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_LOCALIZATION_THROTTLE_LIMIT = + -1; + /** Setting that controls whether distributed scheduling is enabled or not. */ public static final String DIST_SCHEDULING_ENABLED = NM_PREFIX + "distributed-scheduling.enabled"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 4573859d384..a1c56d8351a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -115,7 +115,7 @@ enum ContainerStateProto { enum ContainerSubStateProto { /** - * NEW, LOCALIZING, SCHEDULED, + * NEW, QUEUED_FOR_LOCALIZATION, LOCALIZING, SCHEDULED, * REINITIALIZING_AWAITING_KILL, RELAUNCHING, */ CSS_SCHEDULED = 1; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 4b93d1e18aa..87598ab9005 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1121,6 +1121,13 @@ 0 + + Max number of queued OPPORTUNISTIC containers after which + OPPORTUNISTIC containers would be queued for localization. + yarn.nodemanager.opportunistic-containers-localization-throttle-limit + -1 + + Number of seconds after an application finishes before the nodemanager's diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index 75e32e482cd..75f08d3ac30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -47,6 +47,6 @@ // Producer: ContainerScheduler CONTAINER_TOKEN_UPDATED, - + LOCALIZE_CONTAINER, RECOVER_PAUSED_CONTAINER } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index b79c305a0e6..f36439c7421 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -313,19 +313,37 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, new StateMachineFactory(ContainerState.NEW) // From NEW State .addTransition(ContainerState.NEW, - EnumSet.of(ContainerState.LOCALIZING, + EnumSet.of(ContainerState.QUEUED_FOR_LOCALIZATION, ContainerState.SCHEDULED, - ContainerState.LOCALIZATION_FAILED, ContainerState.DONE), ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition()) .addTransition(ContainerState.NEW, ContainerState.NEW, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.NEW, ContainerState.DONE, - ContainerEventType.KILL_CONTAINER, new KillOnNewTransition()) + ContainerEventType.KILL_CONTAINER, + new KillBeforeLocalizationTransition()) .addTransition(ContainerState.NEW, ContainerState.NEW, ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition()) + // From QUEUED_FOR_LOCALIZATION State + .addTransition(ContainerState.QUEUED_FOR_LOCALIZATION, + EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED, + ContainerState.LOCALIZATION_FAILED), + ContainerEventType.LOCALIZE_CONTAINER, new LocalizationTransition()) + .addTransition(ContainerState.QUEUED_FOR_LOCALIZATION, + ContainerState.QUEUED_FOR_LOCALIZATION, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.QUEUED_FOR_LOCALIZATION, + ContainerState.DONE, + ContainerEventType.KILL_CONTAINER, + new KillBeforeLocalizationTransition()) + .addTransition(ContainerState.QUEUED_FOR_LOCALIZATION, + ContainerState.QUEUED_FOR_LOCALIZATION, + ContainerEventType.UPDATE_CONTAINER_TOKEN, + new NotifyContainerSchedulerOfUpdateTransition()) + // From LOCALIZING State .addTransition(ContainerState.LOCALIZING, EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED), @@ -763,6 +781,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, private ContainerSubState getContainerSubState() { switch (stateMachine.getCurrentState()) { case NEW: + case QUEUED_FOR_LOCALIZATION: case LOCALIZING: case SCHEDULED: case REINITIALIZING_AWAITING_KILL: @@ -1019,7 +1038,6 @@ public void sendLaunchEvent() { dispatcher.getEventHandler().handle( new ContainersLauncherEvent(this, launcherEvent)); } - } @SuppressWarnings("unchecked") // dispatcher not typed @@ -1035,6 +1053,11 @@ private void sendScheduleEvent() { } } + private void sendQueueContainerForLocalizationEvent() { + dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this, + ContainerSchedulerEventType.QUEUE_CONTAINER_FOR_LOCALIZAITON)); + } + @SuppressWarnings("unchecked") // dispatcher not typed @Override public void sendKillEvent(int exitStatus, String description) { @@ -1174,16 +1197,13 @@ public ContainerState transition(ContainerImpl container, return ContainerState.DONE; } else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) { return ContainerState.SCHEDULED; - } else if (container.recoveredAsKilled && - container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { + } else if (container.recoveredAsKilled && container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { // container was killed but never launched container.metrics.killedContainer(); - NMAuditLogger.logSuccess(container.user, - AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl", - container.containerId.getApplicationAttemptId().getApplicationId(), + NMAuditLogger.logSuccess(container.user, AuditConstants.FINISH_KILLED_CONTAINER, + "ContainerImpl", container.containerId.getApplicationAttemptId().getApplicationId(), container.containerId); - container.metrics.releaseContainer( - container.containerTokenIdentifier.getResource()); + container.metrics.releaseContainer(container.containerTokenIdentifier.getResource()); container.sendFinishedEvents(); return ContainerState.DONE; } @@ -1191,25 +1211,37 @@ public ContainerState transition(ContainerImpl container, final ContainerLaunchContext ctxt = container.launchContext; container.metrics.initingContainer(); - container.dispatcher.getEventHandler().handle(new AuxServicesEvent - (AuxServicesEventType.CONTAINER_INIT, container)); + container.dispatcher.getEventHandler().handle( + new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT, container)); // Inform the AuxServices about the opaque serviceData - Map csd = ctxt.getServiceData(); + Map csd = ctxt.getServiceData(); if (csd != null) { // This can happen more than once per Application as each container may // have distinct service data - for (Map.Entry service : csd.entrySet()) { + for (Map.Entry service : csd.entrySet()) { container.dispatcher.getEventHandler().handle( new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT, - container.user, container.containerId - .getApplicationAttemptId().getApplicationId(), + container.user, + container.containerId.getApplicationAttemptId().getApplicationId(), service.getKey().toString(), service.getValue())); } } + container.dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(container, + ContainerSchedulerEventType.QUEUE_CONTAINER_FOR_LOCALIZAITON)); + return ContainerState.QUEUED_FOR_LOCALIZATION; + } + } + static class LocalizationTransition implements + MultipleArcTransition { + @SuppressWarnings("unchecked") + @Override + public ContainerState transition(ContainerImpl container, + ContainerEvent event) { container.containerLocalizationStartTime = clock.getTime(); - + final ContainerLaunchContext ctxt = container.launchContext; // Send requests for public, private resources Map cntrRsrc = ctxt.getLocalResources(); if (!cntrRsrc.isEmpty()) { @@ -1924,10 +1956,12 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** - * Handle the following transition: + * Handle the following transitions: * - NEW -> DONE upon KILL_CONTAINER + * - QUEUED_FOR_LOCALIZATION -> DONE upon KILL_CONTAINER */ - static class KillOnNewTransition extends ContainerDoneTransition { + static class KillBeforeLocalizationTransition extends + ContainerDoneTransition { @Override public void transition(ContainerImpl container, ContainerEvent event) { if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerLocalizeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerLocalizeEvent.java new file mode 100644 index 00000000000..be40bbeeb86 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerLocalizeEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class ContainerLocalizeEvent extends ContainerEvent { + + public ContainerLocalizeEvent(ContainerId c) { + super(c, ContainerEventType.LOCALIZE_CONTAINER); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index a0db64619de..6a731eee3d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -28,8 +28,8 @@ // 1. ContainerImpl::getContainerSubState(). // 2. the doc in the ContainerSubState class. // 3. the doc in the yarn_protos.proto file. - NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING, - REINITIALIZING, REINITIALIZING_AWAITING_KILL, + NEW, QUEUED_FOR_LOCALIZATION, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, + RUNNING, RELAUNCHING, REINITIALIZING, REINITIALIZING_AWAITING_KILL, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE, PAUSING, PAUSED, RESUMING diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 3a6d29635b0..b8e6bf8c815 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerLocalizeEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor @@ -75,6 +77,11 @@ // Capacity of the queue for opportunistic Containers. private final int maxOppQueueLength; + // max queued opportunistic containers after which no O container would be + // localized + private final int maxOQueueLengthForLocalization; + + // Queue of Guaranteed Containers waiting for resources to run private final LinkedHashMap queuedGuaranteedContainers = new LinkedHashMap<>(); @@ -82,6 +89,10 @@ private final LinkedHashMap queuedOpportunisticContainers = new LinkedHashMap<>(); + // Queue of containers waiting for localization + private final LinkedHashMap + queuedContainerForLocalization = new LinkedHashMap<>(); + // Used to keep track of containers that have been marked to be killed // or paused to make room for a guaranteed container. private final Map oppContainersToKill = @@ -123,7 +134,13 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher, this(context, dispatcher, metrics, context.getConf().getInt( YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, YarnConfiguration. - DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH)); + DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH), + context.getConf().getInt( + YarnConfiguration. + NM_OPPORTUNISTIC_CONTAINERS_LOCALIZATION_THROTTLE_LIMIT, + YarnConfiguration. + DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_LOCALIZATION_THROTTLE_LIMIT) + ); } @@ -148,12 +165,13 @@ public void serviceInit(Configuration conf) throws Exception { @VisibleForTesting public ContainerScheduler(Context context, AsyncDispatcher dispatcher, - NodeManagerMetrics metrics, int qLength) { + NodeManagerMetrics metrics, int qLength, int throttleLimit) { super(ContainerScheduler.class.getName()); this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; + this.maxOQueueLengthForLocalization = throttleLimit; this.utilizationTracker = new AllocationBasedResourceUtilizationTracker(this); this.opportunisticContainersStatus = @@ -167,6 +185,9 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher, @Override public void handle(ContainerSchedulerEvent event) { switch (event.getType()) { + case QUEUE_CONTAINER_FOR_LOCALIZAITON: + queueContainerForLocalization(event.getContainer()); + break; case SCHEDULE_CONTAINER: scheduleContainer(event.getContainer()); break; @@ -180,7 +201,7 @@ public void handle(ContainerSchedulerEvent event) { if (event instanceof UpdateContainerSchedulerEvent) { onUpdateContainer((UpdateContainerSchedulerEvent) event); } else { - LOG.error("Unknown event type on UpdateCOntainer: " + event.getType()); + LOG.error("Unknown event type on UpdateContainer: " + event.getType()); } break; case SHED_QUEUED_CONTAINERS: @@ -188,6 +209,7 @@ public void handle(ContainerSchedulerEvent event) { break; case RECOVERY_COMPLETED: startPendingContainers(maxOppQueueLength <= 0); + startLocalizationOfContainers(); metrics.setQueuedContainers(queuedOpportunisticContainers.size(), queuedGuaranteedContainers.size()); break; @@ -228,6 +250,11 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { // promotion request reclaimOpportunisticContainerResources(updateEvent.getContainer()); } + if (queuedContainerForLocalization.containsKey(containerId)) { + // start localization of this container as it has been promoted. + localizeContainer(queuedContainerForLocalization.get(containerId)); + queuedContainerForLocalization.remove(containerId); + } } else { // Demotion of queued container.. Should not happen too often // since you should not find too many queued guaranteed @@ -244,11 +271,33 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { "continer update of %s", containerId), ex); } startPendingContainers(maxOppQueueLength <= 0); + startLocalizationOfContainers(); metrics.setQueuedContainers(queuedOpportunisticContainers.size(), queuedGuaranteedContainers.size()); } } + /** + * + */ + private void startLocalizationOfContainers() { + int toBeLocalized = maxOQueueLengthForLocalization - + queuedOpportunisticContainers.size(); + Iterator containerIter = + queuedContainerForLocalization.values().iterator(); + while (containerIter.hasNext()) { + Container container = containerIter.next(); + if (toBeLocalized > 0) { + localizeContainer(container); + containerIter.remove(); + LOG.info( + "Opportunistic container {} will be localized", + container.getContainerId()); + } + toBeLocalized--; + } + } + /** * Populates auxiliary data structures used by the ContainerScheduler on * recovery. @@ -374,6 +423,7 @@ private void onResourcesReclaimed(Container container) { boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); startPendingContainers(forceStartGuaranteedContainers); } + startLocalizationOfContainers(); this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(), queuedGuaranteedContainers.size()); } @@ -506,10 +556,28 @@ protected void scheduleContainer(Container container) { startPendingContainers(false); } } + startLocalizationOfContainers(); metrics.setQueuedContainers(queuedOpportunisticContainers.size(), queuedGuaranteedContainers.size()); } + private void queueContainerForLocalization(Container container) { + boolean isGuaranteedContainer = container.getContainerTokenIdentifier(). + getExecutionType() == ExecutionType.GUARANTEED; + if (isGuaranteedContainer || + maxOQueueLengthForLocalization < 0 || + queuedOpportunisticContainers.size() < maxOQueueLengthForLocalization) { + localizeContainer(container); + } else { + queuedContainerForLocalization.put(container.getContainerId(), container); + } + } + + private void localizeContainer(Container container) { + dispatcher.getEventHandler().handle( + new ContainerLocalizeEvent(container.getContainerId())); + } + @SuppressWarnings("unchecked") private void reclaimOpportunisticContainerResources(Container container) { List extraOppContainersToReclaim = @@ -662,6 +730,7 @@ private void shedQueuedOpportunisticContainers() { numAllowed--; } } + startLocalizationOfContainers(); this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(), queuedGuaranteedContainers.size()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 294eddf88e4..0514b345fa1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -22,6 +22,7 @@ * Event types associated with {@link ContainerSchedulerEvent}. */ public enum ContainerSchedulerEventType { + QUEUE_CONTAINER_FOR_LOCALIZAITON, SCHEDULE_CONTAINER, CONTAINER_COMPLETED, UPDATE_CONTAINER, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index ea3acca35e1..4a0a6740cfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -347,16 +347,19 @@ public void testCleanupOnSuccess() throws Exception { assertEquals(running, metrics.getRunningContainers()); ContainerEventType e1 = wc.initStateToEvent.get(ContainerState.NEW); - ContainerState s2 = wc.eventToFinalState.get(e1); - ContainerEventType e2 = wc.initStateToEvent.get(s2); - ContainerState s3 = wc.eventToFinalState.get(e2); - ContainerEventType e3 = wc.initStateToEvent.get(s3); - ContainerState s4 = wc.eventToFinalState.get(e3); - ContainerEventType e4 = wc.initStateToEvent.get(s4); - ContainerState s5 = wc.eventToFinalState.get(e4); - ContainerEventType e5 = wc.initStateToEvent.get(s5); - ContainerState s6 = wc.eventToFinalState.get(e5); - + ContainerState s1 = wc.eventToFinalState.get(e1); + ContainerEventType e2 = wc.initStateToEvent.get(s1); + ContainerState s2 = wc.eventToFinalState.get(e2); + ContainerEventType e3 = wc.initStateToEvent.get(s2); + ContainerState s3 = wc.eventToFinalState.get(e3); + ContainerEventType e4 = wc.initStateToEvent.get(s3); + ContainerState s4 = wc.eventToFinalState.get(e4); + ContainerEventType e5 = wc.initStateToEvent.get(s4); + ContainerState s5 = wc.eventToFinalState.get(e5); + ContainerEventType e6 = wc.initStateToEvent.get(s5); + ContainerState s6 = wc.eventToFinalState.get(e6); + + Assert.assertEquals(ContainerState.QUEUED_FOR_LOCALIZATION, s1); Assert.assertEquals(ContainerState.LOCALIZING, s2); Assert.assertEquals(ContainerState.SCHEDULED, s3); Assert.assertEquals(ContainerState.RUNNING, s4); @@ -364,10 +367,11 @@ public void testCleanupOnSuccess() throws Exception { Assert.assertEquals(ContainerState.DONE, s6); Assert.assertEquals(ContainerEventType.INIT_CONTAINER, e1); - Assert.assertEquals(ContainerEventType.RESOURCE_LOCALIZED, e2); - Assert.assertEquals(ContainerEventType.CONTAINER_LAUNCHED, e3); - Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, e4); - Assert.assertEquals(ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, e5); + Assert.assertEquals(ContainerEventType.LOCALIZE_CONTAINER, e2); + Assert.assertEquals(ContainerEventType.RESOURCE_LOCALIZED, e3); + Assert.assertEquals(ContainerEventType.CONTAINER_LAUNCHED, e4); + Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, e5); + Assert.assertEquals(ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, e6); } finally { if (wc != null) { @@ -1346,7 +1350,7 @@ public boolean matches(LocalizationEvent e) { appBus = mock(EventHandler.class); LogBus = mock(EventHandler.class); delService = mock(DeletionService.class); - schedBus = new ContainerScheduler(context, dispatcher, metrics, 0) { + schedBus = new ContainerScheduler(context, dispatcher, metrics, 0, -1) { @Override protected void scheduleContainer(Container container) { container.sendLaunchEvent(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java index b09a1f260e0..95642c5e374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java @@ -96,7 +96,7 @@ private void setupContainerMonitor() { ContainerManager cm = mock(ContainerManager.class); when(cm.getContainersMonitor()).thenReturn(containersMonitor); when(context.getContainerManager()).thenReturn(cm); - spy = new ContainerScheduler(context, dispatcher, metrics, 0); + spy = new ContainerScheduler(context, dispatcher, metrics, 0, -1); } @Before public void setUp() throws Exception {