diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index a648fef..9c8d408 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -61,7 +61,7 @@ public static ContainerLaunchContext newInstance( Map localResources, Map environment, List commands, Map serviceData, ByteBuffer tokens, - Map acls) { + Map acls, String containerExecutor) { ContainerLaunchContext container = Records.newRecord(ContainerLaunchContext.class); container.setLocalResources(localResources); @@ -70,9 +70,21 @@ public static ContainerLaunchContext newInstance( container.setServiceData(serviceData); container.setTokens(tokens); container.setApplicationACLs(acls); + container.setContainerExecutor(containerExecutor); return container; } + @Public + @Stable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls) { + return newInstance(localResources, environment, commands, serviceData, + tokens, acls, null); + } + /** * Get all the tokens needed by this container. It may include file-system * tokens, ApplicationMaster related tokens if this container is an @@ -196,4 +208,20 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + /** + * Get the ContainType of the container. + * @return container type of the container + */ + @Public + @Stable + public abstract String getContainerExecutor(); + + /** + * Set the ContainType of the container. + * @param containerExecutor container type for the container. + */ + @Public + @Stable + public abstract void setContainerExecutor(String containerExecutor); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3273b47..62efbe6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -606,10 +606,21 @@ private static void addDeprecatedKeys() { public static final String NM_BIND_HOST = NM_PREFIX + "bind-host"; - /** who will execute(launch) the containers.*/ + /** + * A comma separated list of container executor class who can execute + * (launch) the containers. + */ public static final String NM_CONTAINER_EXECUTOR = NM_PREFIX + "container-executor.class"; + /** + * The default container executor to execute(launch) the containers when + * submit containers without specify which container executor to execute + * the containers. + */ + public static final String NM_DEFAULT_CONTAINER_EXECUTOR = + NM_PREFIX + "default.container-executor.class"; + /** * Adjustment to make to the container os scheduling priority. * The valid values for this could vary depending on the platform. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c4e756d..1f94415 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -397,6 +397,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional string container_executor = 7; } message ContainerStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 12dcfcd..db2c5b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -18,13 +18,8 @@ package org.apache.hadoop.yarn.api.records.impl.pb; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - +import com.google.protobuf.ByteString; +import com.google.protobuf.TextFormat; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -38,8 +33,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; -import com.google.protobuf.ByteString; -import com.google.protobuf.TextFormat; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; @Private @Unstable @@ -56,7 +55,7 @@ private Map environment = null; private List commands = null; private Map applicationACLS = null; - + public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); } @@ -456,6 +455,26 @@ public void setApplicationACLs( this.applicationACLS.putAll(appACLs); } + @Override + public String getContainerExecutor() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerExecutor()) { + return null; + } + return p.getContainerExecutor(); + } + + @Override + public void setContainerExecutor(String containerExecutor) { + maybeInitBuilder(); + if (containerExecutor == null) { + builder.clearContainerExecutor(); + return; + } + builder.setContainerExecutor(containerExecutor); + } + + private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { return new LocalResourcePBImpl(p); } @@ -463,4 +482,4 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } -} +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7dea2c3..71b81bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -688,13 +688,23 @@ - who will execute(launch) the containers. + A comma separated list of container executor class who can + execute (launch) the containers. yarn.nodemanager.container-executor.class org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor + The default container executor to execute(launch) the + containers when submit containers without specify container + executor. The default value is taken from + ${yarn.nodemanager.container-executor.class} if it contains + only one class. + yarn.nodemanager.default.container-executor.class + + + Number of threads container manager uses. yarn.nodemanager.container-manager.thread-count 20 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CompositeContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CompositeContainerExecutor.java new file mode 100644 index 0000000..79c069e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CompositeContainerExecutor.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +public class CompositeContainerExecutor extends ContainerExecutor { + private static final Log LOG = LogFactory + .getLog(CompositeContainerExecutor.class); + + @VisibleForTesting + Map executorMap; + @VisibleForTesting + ContainerExecutor defaultExec; + @VisibleForTesting + String validContainerExecutor; + private Context context; + + public CompositeContainerExecutor(Map executorMap, + ContainerExecutor defaultExec, Context context) { + Preconditions.checkNotNull(executorMap); + Preconditions.checkNotNull(defaultExec); + this.executorMap = executorMap; + this.validContainerExecutor = Joiner.on(",").join(executorMap.keySet()); + this.defaultExec = defaultExec; + this.context = context; + } + + public CompositeContainerExecutor(ContainerExecutor exec) { + this(ImmutableMap.of(exec.getClass().getName(), exec), exec, null); + } + + @Override + public void init() throws IOException { + for (ContainerExecutor exec : executorMap.values()) { + exec.init(); + } + } + + @Override + public void startLocalizer(Path nmPrivateContainerTokens, + InetSocketAddress nmAddr, String user, String appId, String locId, + LocalDirsHandlerService dirsHandler) throws IOException, + InterruptedException { + throw new YarnRuntimeException("Should never call this"); + } + + @Override + public int launchContainer(Container container, + Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, + String userName, String appId, Path containerWorkDir, + List localDirs, List logDirs) throws IOException { + throw new YarnRuntimeException("Should never call this"); + } + + @Override + public boolean signalContainer(String user, String pid, Signal signal) throws + IOException { + throw new YarnRuntimeException("Should never call this"); + } + + @Override + public boolean isContainerProcessAlive(String user, String pid) throws + IOException { + throw new YarnRuntimeException("Should never call this"); + } + + @Override + public void deleteAsUser(String user, Path subDir, Path... baseDirs) throws + IOException, InterruptedException { + for (ContainerExecutor exec : executorMap.values()) { + exec.deleteAsUser(user, subDir, baseDirs); + } + } + + public ContainerExecutor getContainerExecutor(ContainerId containerId) { + ContainerExecutor exec = null; + if (context != null) { + Container container = context.getContainers().get(containerId); + if (container == null) { + LOG.warn("Can't find container " + containerId + ", " + + "use " + defaultExec.getClass() + " instead."); + exec = defaultExec; + } else { + ContainerLaunchContext launchContext = container.getLaunchContext(); + if (launchContext == null || launchContext + .getContainerExecutor() == null || launchContext + .getContainerExecutor().trim().isEmpty()) { + exec = defaultExec; + } else { + exec = executorMap.get(launchContext.getContainerExecutor()); + if (exec == null) { + throw new YarnRuntimeException(this.getInValidExecLog + (containerId, launchContext.getContainerExecutor())); + } + } + } + } + exec = exec == null? defaultExec : exec; + LOG.info("Launch container " + containerId.toString() + " using " + exec + .getClass().getName()); + return exec; + } + + public boolean isValidContainerExecutor(String containerExecutor) { + return containerExecutor == null || executorMap + .containsKey(containerExecutor); + } + + public String getInValidExecLog(ContainerId containerId, String execName) { + return "Container " + containerId + " with invalid container executor " + + execName + ", please use " + this.validContainerExecutor + " instead."; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 6e7e2ec..6ac5d59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -77,4 +77,6 @@ boolean getDecommissioned(); void setDecommissioned(boolean isDecommissioned); + + CompositeContainerExecutor getCompositeContainerExecutor(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 53cbb11..3c887d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Splitter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -104,7 +105,7 @@ protected NodeResourceMonitor createNodeResourceMonitor() { } protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, @@ -202,14 +203,18 @@ protected void serviceInit(Configuration conf) throws Exception { this.aclsManager = new ApplicationACLsManager(conf); - ContainerExecutor exec = ReflectionUtils.newInstance( - conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, - DefaultContainerExecutor.class, ContainerExecutor.class), conf); + this.context = createNMContext(containerTokenSecretManager, + nmTokenSecretManager, nmStore); + + CompositeContainerExecutor exec = createContainerExecutor(conf, context); try { exec.init(); } catch (IOException e) { - throw new YarnRuntimeException("Failed to initialize container executor", e); - } + throw new YarnRuntimeException("Failed to initialize container " + + "executor", e); + } + ((NMContext) context).setCompositeContainerExecutor(exec); + DeletionService del = createDeletionService(exec); addService(del); @@ -220,9 +225,6 @@ protected void serviceInit(Configuration conf) throws Exception { addService(nodeHealthChecker); dirsHandler = nodeHealthChecker.getDiskHandler(); - this.context = createNMContext(containerTokenSecretManager, - nmTokenSecretManager, nmStore); - nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); @@ -333,6 +335,7 @@ public void run() { .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); private final NMStateStoreService stateStore; private boolean isDecommissioned = false; + private CompositeContainerExecutor compositeContainerExecutor; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, @@ -351,6 +354,7 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, /** * Usable only after ContainerManager is started. */ + @Override public NodeId getNodeId() { return this.nodeId; @@ -429,6 +433,16 @@ public void setDecommissioned(boolean isDecommissioned) { } @Override + public CompositeContainerExecutor getCompositeContainerExecutor() { + return compositeContainerExecutor; + } + + public void setCompositeContainerExecutor( + CompositeContainerExecutor compositeContainerExecutor) { + this.compositeContainerExecutor = compositeContainerExecutor; + } + + @Override public Map getSystemCredentialsForApps() { return systemCredentials; } @@ -514,4 +528,50 @@ public static void main(String[] args) { public NodeStatusUpdater getNodeStatusUpdater() { return nodeStatusUpdater; } + + @SuppressWarnings("unchecked") + private CompositeContainerExecutor createContainerExecutor(Configuration + conf, Context context) { + String nmContainerExecutor = conf.get(YarnConfiguration + .NM_CONTAINER_EXECUTOR, DefaultContainerExecutor.class.getName()); + String defaultContainerExecutor; + if (!nmContainerExecutor.contains(",")) { + defaultContainerExecutor = nmContainerExecutor; + } else { + defaultContainerExecutor = conf.get(YarnConfiguration + .NM_DEFAULT_CONTAINER_EXECUTOR); + if (defaultContainerExecutor == null) { + throw new YarnRuntimeException("Need to make a configuration for " + + YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR + " since " + + YarnConfiguration.NM_CONTAINER_EXECUTOR + "contains multiple " + + "values."); + } + } + Map execMap = new HashMap(); + Class execClass; + ContainerExecutor defaultExec = null; + for (String containerExecutor : Splitter.on(',').omitEmptyStrings() + .trimResults().split(nmContainerExecutor)) { + try { + execClass = (Class) conf + .getClassByName(containerExecutor); + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Failed to find container executor " + + "class " + containerExecutor); + } catch (ClassCastException e) { + throw new YarnRuntimeException("Container executor class " + + containerExecutor + " should extend " + ContainerExecutor.class + .getName()); + } + if (!execMap.containsKey(containerExecutor)) { + ContainerExecutor exec = ReflectionUtils.newInstance(execClass, conf); + execMap.put(containerExecutor, exec); + if (containerExecutor.equals(defaultContainerExecutor)) { + defaultExec = exec; + } + } + } + return new CompositeContainerExecutor(execMap, defaultExec, context); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index bb277d9..5f3dc06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -94,6 +94,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -174,7 +175,7 @@ private long waitForContainersOnShutdownMillis; - public ContainerManagerImpl(Context context, ContainerExecutor exec, + public ContainerManagerImpl(Context context, CompositeContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { @@ -371,7 +372,8 @@ public ContainersMonitor getContainersMonitor() { } protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext, Context context) { + CompositeContainerExecutor exec, DeletionService deletionContext, + Context context) { return new ResourceLocalizationService(this.dispatcher, exec, deletionContext, dirsHandler, context); } @@ -381,7 +383,7 @@ protected SharedCacheUploadService createSharedCacheUploaderService() { } protected ContainersLauncher createContainersLauncher(Context context, - ContainerExecutor exec) { + CompositeContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); } @@ -816,6 +818,13 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, LOG.info("Start request for " + containerIdStr + " by user " + user); ContainerLaunchContext launchContext = request.getContainerLaunchContext(); + if (!context.getCompositeContainerExecutor().isValidContainerExecutor( + (launchContext.getContainerExecutor()))) { + String errorMsg = context.getCompositeContainerExecutor() + .getInValidExecLog(containerId, launchContext.getContainerExecutor()); + LOG.error(errorMsg); + throw new YarnException(errorMsg); + } Map serviceData = getAuxServiceMetaData(); if (launchContext.getServiceData()!=null && diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 6950aa9..08af379 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -58,7 +59,7 @@ private static final Log LOG = LogFactory.getLog(ContainersLauncher.class); private final Context context; - private final ContainerExecutor exec; + private final CompositeContainerExecutor exec; private final Dispatcher dispatcher; private final ContainerManagerImpl containerManager; @@ -74,7 +75,7 @@ Collections.synchronizedMap(new HashMap()); public ContainersLauncher(Context context, Dispatcher dispatcher, - ContainerExecutor exec, LocalDirsHandlerService dirsHandler, + CompositeContainerExecutor exec, LocalDirsHandlerService dirsHandler, ContainerManagerImpl containerManager) { super("containers-launcher"); this.exec = exec; @@ -113,8 +114,9 @@ public void handle(ContainersLauncherEvent event) { containerId.getApplicationAttemptId().getApplicationId()); ContainerLaunch launch = - new ContainerLaunch(context, getConfig(), dispatcher, exec, app, - event.getContainer(), dirsHandler, containerManager); + new ContainerLaunch(context, getConfig(), dispatcher, exec + .getContainerExecutor(containerId), app, event.getContainer() + , dirsHandler, containerManager); containerLauncher.submit(launch); running.put(containerId, launch); break; @@ -122,7 +124,8 @@ public void handle(ContainersLauncherEvent event) { app = context.getApplications().get( containerId.getApplicationAttemptId().getApplicationId()); launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher, - exec, app, event.getContainer(), dirsHandler, containerManager); + exec.getContainerExecutor(containerId), app, event.getContainer(), + dirsHandler, containerManager); containerLauncher.submit(launch); running.put(containerId, launch); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 5440980..1edac51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -146,7 +147,7 @@ private long cacheTargetSize; private long cacheCleanupPeriod; - private final ContainerExecutor exec; + private final CompositeContainerExecutor exec; protected final Dispatcher dispatcher; private final DeletionService delService; private LocalizerTracker localizerTracker; @@ -177,7 +178,7 @@ FileContext lfs; public ResourceLocalizationService(Dispatcher dispatcher, - ContainerExecutor exec, DeletionService delService, + CompositeContainerExecutor exec, DeletionService delService, LocalDirsHandlerService dirsHandler, Context context) { super(ResourceLocalizationService.class.getName()); @@ -1091,13 +1092,12 @@ public void run() { List localDirs = getInitializedLocalDirs(); List logDirs = getInitializedLogDirs(); if (dirsHandler.areDisksHealthy()) { - exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, - context.getUser(), - ConverterUtils.toString( - context.getContainerId(). - getApplicationAttemptId().getApplicationId()), - localizerId, - dirsHandler); + exec.getContainerExecutor(context.getContainerId()) + .startLocalizer(nmPrivateCTokensPath, localizationServerAddress, + context.getUser(), + ConverterUtils.toString(context.getContainerId(). + getApplicationAttemptId().getApplicationId()), + localizerId, dirsHandler); } else { throw new IOException("All disks failed. " + dirsHandler.getDisksHealthReport(false)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 02a63ac..68ea53e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; @@ -57,7 +57,7 @@ Map trackingContainers = new HashMap(); - final ContainerExecutor containerExecutor; + final CompositeContainerExecutor containerExecutor; private final Dispatcher eventDispatcher; private final Context context; private ResourceCalculatorPlugin resourceCalculatorPlugin; @@ -74,7 +74,7 @@ private static final long UNKNOWN_MEMORY_LIMIT = -1L; - public ContainersMonitorImpl(ContainerExecutor exec, + public ContainersMonitorImpl(CompositeContainerExecutor exec, AsyncDispatcher dispatcher, Context context) { super("containers-monitor"); @@ -374,7 +374,8 @@ public void run() { // Initialize any uninitialized processTrees if (pId == null) { // get pid from ContainerId - pId = containerExecutor.getProcessId(ptInfo.getContainerId()); + pId = containerExecutor.getContainerExecutor(ptInfo.getContainerId()) + .getProcessId(ptInfo.getContainerId()); if (pId != null) { // pId will be null, either if the container is not spawned yet // or if the container's pid is removed from ContainerExecutor diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index f872a55..abc9fcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -66,14 +66,16 @@ public DummyContainerManager(Context context, ContainerExecutor exec, NodeManagerMetrics metrics, ApplicationACLsManager applicationACLsManager, LocalDirsHandlerService dirsHandler) { - super(context, exec, deletionContext, nodeStatusUpdater, metrics, + super(context, new CompositeContainerExecutor(exec), deletionContext, + nodeStatusUpdater, metrics, applicationACLsManager, dirsHandler); } @Override @SuppressWarnings("unchecked") protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext, Context context) { + CompositeContainerExecutor exec, DeletionService deletionContext, + Context context) { return new ResourceLocalizationService(super.dispatcher, exec, deletionContext, super.dirsHandler, context) { @Override @@ -143,7 +145,7 @@ protected UserGroupInformation getRemoteUgi() throws YarnException { @Override @SuppressWarnings("unchecked") protected ContainersLauncher createContainersLauncher(Context context, - ContainerExecutor exec) { + CompositeContainerExecutor exec) { return new ContainersLauncher(context, super.dispatcher, exec, super.dirsHandler, this) { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestCompositeContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestCompositeContainerExecutor.java new file mode 100644 index 0000000..c360967 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestCompositeContainerExecutor.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCompositeContainerExecutor extends BaseContainerManagerTest { + private ValidContainerExecutor validExec; + + public TestCompositeContainerExecutor() throws + UnsupportedFileSystemException { + super(); + } + + @Override + @Before + public void setup() throws IOException { + super.setup(); + ((NodeManager.NMContext) context).setCompositeContainerExecutor(exec); + } + + @Override + protected CompositeContainerExecutor createContainerExecutor() { + Map executorMap = new HashMap(); + validExec = new ValidContainerExecutor(); + DefaultContainerExecutor defaultExec = new DefaultContainerExecutor(); + validExec.setConf(conf); + defaultExec.setConf(conf); + executorMap.put(DefaultContainerExecutor.class.getName(), defaultExec); + executorMap.put(ValidContainerExecutor.class.getName(), validExec); + return new CompositeContainerExecutor(executorMap, validExec, context); + } + + public static final class ValidContainerExecutor extends DefaultContainerExecutor { + int launchCount = 0; + + @Override + public int launchContainer(Container container, + Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, + String user, String appId, Path containerWorkDir, + List localDirs, List logDirs) throws IOException { + launchCount++; + return super.launchContainer(container, nmPrivateContainerScriptPath, + nmPrivateTokensPath, user, appId, containerWorkDir, localDirs, + logDirs); + } + } + + /** + * To make sure a single container_executor would work + */ + @Test + public void testSingleContainerExecutor() { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, + ValidContainerExecutor.class, ContainerExecutor.class); + try { + nm.init(conf); + CompositeContainerExecutor compositeExec = nm.getNMContext() + .getCompositeContainerExecutor(); + Assert.assertEquals(ValidContainerExecutor.class, + compositeExec.defaultExec.getClass()); + Assert.assertEquals(1, compositeExec.executorMap.size()); + Assert.assertTrue(compositeExec.executorMap + .containsKey(ValidContainerExecutor.class.getName())); + Assert.assertEquals(ValidContainerExecutor.class.getName(), + compositeExec.validContainerExecutor); + + boolean expectFail = false; + try { + compositeExec.startLocalizer(null, null, null, null, null, null); + } catch (YarnRuntimeException e) { + Assert.assertEquals("Should never call this", e.getMessage()); + expectFail = true; + } + Assert.assertTrue(expectFail); + + expectFail = false; + try { + compositeExec + .launchContainer(null, null, null, null, null, null, null, null); + } catch (YarnRuntimeException e) { + expectFail = true; + } + Assert.assertTrue(expectFail); + + expectFail = false; + try { + compositeExec.signalContainer(null, null, null); + } catch (YarnRuntimeException e) { + expectFail = true; + } + Assert.assertTrue(expectFail); + + expectFail = false; + try { + compositeExec.isContainerProcessAlive(null, null); + } catch (YarnRuntimeException e) { + expectFail = true; + } + Assert.assertTrue(expectFail); + + } catch (Exception e) { + fail("Should not fail"); + } finally { + nm.stop(); + } + } + + /** + * Test multiple container_executor configuration + */ + @Test + public void testMultipleContainerExecutor() { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NM_CONTAINER_EXECUTOR, + ValidContainerExecutor.class.getName() + "," + + DefaultContainerExecutor.class.getName()); + boolean expectFail = false; + try { + new NodeManager().init(conf); + } catch (Exception e) { + Assert.assertEquals(YarnRuntimeException.class, e.getClass()); + Assert.assertEquals("Need to make a configuration for " + + YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR + " since " + + YarnConfiguration.NM_CONTAINER_EXECUTOR + "contains multiple " + + "values.", e.getMessage()); + expectFail = true; + } + Assert.assertTrue(expectFail); + conf.set(YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR, + ValidContainerExecutor.class.getName()); + try { + nm.init(conf); + CompositeContainerExecutor compositeExec = nm.getNMContext() + .getCompositeContainerExecutor(); + Assert.assertEquals(ValidContainerExecutor.class, + compositeExec.defaultExec.getClass()); + Assert.assertEquals(2, compositeExec.executorMap.size()); + Assert.assertTrue(compositeExec.executorMap + .containsKey(ValidContainerExecutor.class.getName())); + Assert.assertTrue(compositeExec.executorMap + .containsKey(DefaultContainerExecutor.class.getName())); + Assert.assertEquals(Joiner.on(",") + .join(ValidContainerExecutor.class.getName(), + DefaultContainerExecutor.class.getName()), + compositeExec.validContainerExecutor); + + ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class); + ContainerId containerId = createContainerId(0); + Container container = mock(Container.class); + when(container.getContainerId()).thenReturn(containerId); + when(container.getLaunchContext()).thenReturn(launchContext); + nm.getNMContext().getContainers().put(containerId, container); + + when(launchContext.getContainerExecutor()) + .thenReturn(DefaultContainerExecutor.class.getName()); + Assert.assertEquals(DefaultContainerExecutor.class, + compositeExec.getContainerExecutor(containerId).getClass()); + + when(launchContext.getContainerExecutor()) + .thenReturn(ValidContainerExecutor.class.getName()); + Assert.assertEquals(ValidContainerExecutor.class, + compositeExec.getContainerExecutor(containerId).getClass()); + + // Do not have LinuxContainerExecutor configured, should fail + when(launchContext.getContainerExecutor()) + .thenReturn(LinuxContainerExecutor.class.getName()); + expectFail = false; + try { + compositeExec.getContainerExecutor(containerId); + } catch (Exception e) { + Assert.assertEquals(YarnRuntimeException.class, e.getClass()); + Assert.assertEquals(compositeExec.getInValidExecLog(containerId, + LinuxContainerExecutor.class.getName()), e.getMessage()); + expectFail = true; + } + Assert.assertTrue(expectFail); + + ContainerId notExistContainer = createContainerId(1); + Assert.assertEquals(ValidContainerExecutor.class, + compositeExec.getContainerExecutor(notExistContainer).getClass()); + } catch (Exception e) { + fail("should not fail"); + } finally { + nm.stop(); + } + } + + /** + * To make sure a single container_executor would work + */ + @Test + public void testContainerLaunch() throws IOException, YarnException, + InterruptedException { + containerManager.start(); + ContainerId cId = createContainerId(0); + StartContainerRequest scRequest = createStartContainerRequest(cId, + LinuxContainerExecutor.class); + Assert.assertEquals(0, validExec.launchCount); + + StartContainersResponse scResponse = containerManager.startContainers( + StartContainersRequest.newInstance(ImmutableList.of(scRequest))); + Assert.assertEquals(1, scResponse.getFailedRequests().size()); + Map failedRequests = scResponse + .getFailedRequests(); + Assert.assertTrue(failedRequests.containsKey(cId)); + Throwable throwable = failedRequests.get(cId).deSerialize(); + Assert.assertEquals(YarnException.class, throwable.getClass()); + Assert.assertEquals(exec.getInValidExecLog(cId, LinuxContainerExecutor + .class.getName()), throwable.getMessage()); + + //container with ValidContainerExecutor should be launched with ValidContainerExecutor + Assert.assertEquals(0, validExec.launchCount); + cId = createContainerId(1); + scRequest = createStartContainerRequest(cId, ValidContainerExecutor.class); + containerManager.startContainers( + StartContainersRequest.newInstance(ImmutableList.of(scRequest))); + BaseContainerManagerTest + .waitForContainerState(containerManager, cId, ContainerState.COMPLETE); + Assert.assertEquals(1, validExec.launchCount); + + //container with DefaultContainerExecutor should be launched with DefaultContainerExecutor + cId = createContainerId(1); + scRequest = createStartContainerRequest(cId, DefaultContainerExecutor.class); + containerManager.startContainers( + StartContainersRequest.newInstance(ImmutableList.of(scRequest))); + BaseContainerManagerTest + .waitForContainerState(containerManager, cId, ContainerState.COMPLETE); + Assert.assertEquals(1, validExec.launchCount); + } + + private StartContainerRequest createStartContainerRequest(ContainerId cId, + Class clazz) throws IOException { + ContainerLaunchContext containerLaunchContext = recordFactory + .newRecordInstance(ContainerLaunchContext.class); + containerLaunchContext.setContainerExecutor(clazz.getName()); + return StartContainerRequest.newInstance(containerLaunchContext, + TestContainerManager + .createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), + user, context.getContainerTokenSecretManager())); + } + + private ContainerId createContainerId(int id) { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(appId, 1); + return ContainerId.newContainerId(appAttemptId, id); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index a47e7f7..fe06d25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -195,12 +195,12 @@ private boolean shouldRunTest() { } @Override - protected ContainerExecutor createContainerExecutor() { + protected CompositeContainerExecutor createContainerExecutor() { super.conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH)); LinuxContainerExecutor linuxContainerExecutor = new LinuxContainerExecutor(); linuxContainerExecutor.setConf(super.conf); - return linuxContainerExecutor; + return new CompositeContainerExecutor(linuxContainerExecutor); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index a58294f..0cb43b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -430,7 +430,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 46d7b10..a242e0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -1071,7 +1071,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { @@ -1284,7 +1284,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService diskhandler) { @@ -1434,7 +1434,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 1907e1a..a8e965f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -110,7 +111,7 @@ public int getHttpPort() { return HTTP_PORT; }; }; - protected ContainerExecutor exec; + protected CompositeContainerExecutor exec; protected DeletionService delSrvc; protected String user = "nobody"; protected NodeHealthCheckerService nodeHealthChecker; @@ -143,10 +144,10 @@ public long getRMIdentifier() { protected ContainerManagerImpl containerManager = null; - protected ContainerExecutor createContainerExecutor() { + protected CompositeContainerExecutor createContainerExecutor() { DefaultContainerExecutor exec = new DefaultContainerExecutor(); exec.setConf(conf); - return exec; + return new CompositeContainerExecutor(exec); } @Before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index a73d583..799288f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -308,8 +309,8 @@ public void handle(ContainersLauncherEvent event) { }; return new ContainerManagerImpl(context, - mock(ContainerExecutor.class), mock(DeletionService.class), - mock(NodeStatusUpdater.class), metrics, + new CompositeContainerExecutor(mock(ContainerExecutor.class)), + mock(DeletionService.class), mock(NodeStatusUpdater.class), metrics, context.getApplicationACLsManager(), null) { @Override protected LogHandler createLogHandler(Configuration conf, @@ -319,13 +320,14 @@ protected LogHandler createLogHandler(Configuration conf, @Override protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext, Context context) { + CompositeContainerExecutor exec, DeletionService + deletionContext, Context context) { return rsrcSrv; } @Override protected ContainersLauncher createContainersLauncher( - Context context, ContainerExecutor exec) { + Context context, CompositeContainerExecutor exec) { return launcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 2834e30..a7dbb55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -754,7 +755,8 @@ public boolean matches(Object o) { when(context.getNMStateStore()).thenReturn(stateStore); ContainerExecutor executor = mock(ContainerExecutor.class); launcher = - new ContainersLauncher(context, dispatcher, executor, null, null); + new ContainersLauncher(context, dispatcher, new + CompositeContainerExecutor(executor), null, null); // create a mock ExecutorService, which will not really launch // ContainerLaunch at all. launcher.containerLauncher = mock(ExecutorService.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index f968bb9..3f47a2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -97,7 +97,9 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; @@ -207,7 +209,7 @@ public void testLocalizationInit() throws Exception { diskhandler.init(conf); ResourceLocalizationService locService = - spy(new ResourceLocalizationService(dispatcher, exec, delService, + spy(createResourceLocalizationService(dispatcher, exec, delService, diskhandler, nmContext)); doReturn(lfs) .when(locService).getLocalFileContext(isA(Configuration.class)); @@ -268,7 +270,7 @@ public void testDirectoryCleanupOnNewlyCreatedStateStore() when(nmStateStoreService.isNewlyCreated()).thenReturn(true); ResourceLocalizationService locService = - spy(new ResourceLocalizationService(dispatcher, exec, delService, + spy(createResourceLocalizationService(dispatcher, exec, delService, diskhandler,nmContext)); doReturn(lfs) .when(locService).getLocalFileContext(isA(Configuration.class)); @@ -339,7 +341,7 @@ public void testResourceRelease() throws Exception { delService.start(); ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -749,7 +751,7 @@ public void testLocalizationHeartbeat() throws Exception { delService.start(); ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -971,7 +973,7 @@ public void testPublicResourceInitializesLocalDir() throws Exception { try { ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, spyContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -1065,7 +1067,7 @@ public void testFailedPublicResource() throws Exception { try { ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -1174,7 +1176,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { try { ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandlerSpy, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -1287,7 +1289,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception { dispatcher1.start(); ResourceLocalizationService rls = - new ResourceLocalizationService(dispatcher1, exec, delService, + createResourceLocalizationService(dispatcher1, exec, delService, localDirHandler, nmContext); dispatcher1.register(LocalizationEventType.class, rls); rls.init(conf); @@ -1440,7 +1442,7 @@ public void testLocalResourcePath() throws Exception { dispatcher1.start(); ResourceLocalizationService rls = - new ResourceLocalizationService(dispatcher1, exec, delService, + createResourceLocalizationService(dispatcher1, exec, delService, localDirHandler, nmContext); dispatcher1.register(LocalizationEventType.class, rls); rls.init(conf); @@ -1606,7 +1608,7 @@ public void testParallelDownloadAttemptsForPublicResource() throws Exception { // Creating and initializing ResourceLocalizationService but not starting // it as otherwise it will remove requests from pending queue. ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher1, exec, delService, + createResourceLocalizationService(dispatcher1, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); dispatcher1.register(LocalizationEventType.class, spyService); @@ -1900,7 +1902,7 @@ private ResourceLocalizationService createSpyService( new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf), stateStore); ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -1964,7 +1966,7 @@ public void testFailedDirsResourceRelease() throws Exception { // setup mocks ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, mockDirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -2186,4 +2188,11 @@ public boolean matches(Object o) { } } + public ResourceLocalizationService createResourceLocalizationService + (Dispatcher dispatcher, ContainerExecutor exec, DeletionService + delService, LocalDirsHandlerService dirsHandler, Context context) { + return new ResourceLocalizationService(dispatcher, + new CompositeContainerExecutor(exec), delService, dirsHandler, context); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java index 1f2d067..14d42e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -299,7 +300,8 @@ public void testContainerMonitorMemFlags() { long expPmem = 8192 * 1024 * 1024l; long expVmem = (long) (expPmem * 2.1f); - cm = new ContainersMonitorImpl(mock(ContainerExecutor.class), + cm = new ContainersMonitorImpl( + new CompositeContainerExecutor(mock(ContainerExecutor.class)), mock(AsyncDispatcher.class), mock(Context.class)); cm.init(getConfForCM(false, false, 8192, 2.1f)); assertEquals(expPmem, cm.getPmemAllocatedForContainers()); @@ -307,7 +309,8 @@ public void testContainerMonitorMemFlags() { assertEquals(false, cm.isPmemCheckEnabled()); assertEquals(false, cm.isVmemCheckEnabled()); - cm = new ContainersMonitorImpl(mock(ContainerExecutor.class), + cm = new ContainersMonitorImpl( + new CompositeContainerExecutor(mock(ContainerExecutor.class)), mock(AsyncDispatcher.class), mock(Context.class)); cm.init(getConfForCM(true, false, 8192, 2.1f)); assertEquals(expPmem, cm.getPmemAllocatedForContainers()); @@ -315,7 +318,8 @@ public void testContainerMonitorMemFlags() { assertEquals(true, cm.isPmemCheckEnabled()); assertEquals(false, cm.isVmemCheckEnabled()); - cm = new ContainersMonitorImpl(mock(ContainerExecutor.class), + cm = new ContainersMonitorImpl( + new CompositeContainerExecutor(mock(ContainerExecutor.class)), mock(AsyncDispatcher.class), mock(Context.class)); cm.init(getConfForCM(true, true, 8192, 2.1f)); assertEquals(expPmem, cm.getPmemAllocatedForContainers()); @@ -323,7 +327,8 @@ public void testContainerMonitorMemFlags() { assertEquals(true, cm.isPmemCheckEnabled()); assertEquals(true, cm.isVmemCheckEnabled()); - cm = new ContainersMonitorImpl(mock(ContainerExecutor.class), + cm = new ContainersMonitorImpl( + new CompositeContainerExecutor(mock(ContainerExecutor.class)), mock(AsyncDispatcher.class), mock(Context.class)); cm.init(getConfForCM(false, true, 8192, 2.1f)); assertEquals(expPmem, cm.getPmemAllocatedForContainers());