diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index a648fef..9c8d408 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -61,7 +61,7 @@ public static ContainerLaunchContext newInstance( Map localResources, Map environment, List commands, Map serviceData, ByteBuffer tokens, - Map acls) { + Map acls, String containerExecutor) { ContainerLaunchContext container = Records.newRecord(ContainerLaunchContext.class); container.setLocalResources(localResources); @@ -70,9 +70,21 @@ public static ContainerLaunchContext newInstance( container.setServiceData(serviceData); container.setTokens(tokens); container.setApplicationACLs(acls); + container.setContainerExecutor(containerExecutor); return container; } + @Public + @Stable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls) { + return newInstance(localResources, environment, commands, serviceData, + tokens, acls, null); + } + /** * Get all the tokens needed by this container. It may include file-system * tokens, ApplicationMaster related tokens if this container is an @@ -196,4 +208,20 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + /** + * Get the ContainType of the container. + * @return container type of the container + */ + @Public + @Stable + public abstract String getContainerExecutor(); + + /** + * Set the ContainType of the container. + * @param containerExecutor container type for the container. + */ + @Public + @Stable + public abstract void setContainerExecutor(String containerExecutor); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 015baa1..990f6d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -606,10 +606,21 @@ private static void addDeprecatedKeys() { public static final String NM_BIND_HOST = NM_PREFIX + "bind-host"; - /** who will execute(launch) the containers.*/ + /** + * A comma separated list of container executor class who can execute + * (launch) the containers. + */ public static final String NM_CONTAINER_EXECUTOR = NM_PREFIX + "container-executor.class"; + /** + * The default container executor to execute(launch) the containers when + * submit containers without specify which container executor to execute + * the containers. + */ + public static final String NM_DEFAULT_CONTAINER_EXECUTOR = + NM_PREFIX + "default.container-executor.class"; + /** * Adjustment to make to the container os scheduling priority. * The valid values for this could vary depending on the platform. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c4e756d..1f94415 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -397,6 +397,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional string container_executor = 7; } message ContainerStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 12dcfcd..48fee23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.api.records.impl.pb; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -56,7 +55,7 @@ private Map environment = null; private List commands = null; private Map applicationACLS = null; - + public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); } @@ -456,6 +455,26 @@ public void setApplicationACLs( this.applicationACLS.putAll(appACLs); } + @Override + public String getContainerExecutor() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerExecutor()) { + return null; + } + return p.getContainerExecutor(); + } + + @Override + public void setContainerExecutor(String containerExecutor) { + maybeInitBuilder(); + if (containerExecutor == null) { + builder.clearContainerExecutor(); + return; + } + builder.setContainerExecutor(containerExecutor); + } + + private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { return new LocalResourcePBImpl(p); } @@ -463,4 +482,4 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } -} +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 349e57b..86ba483 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -688,13 +688,23 @@ - who will execute(launch) the containers. + A comma separated list of container executor class who can + execute (launch) the containers. yarn.nodemanager.container-executor.class org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor + The default container executor to execute(launch) the + containers when submit containers without specify container + executor. The default value is taken from + ${yarn.nodemanager.container-executor.class} if it contains + only one class. + yarn.nodemanager.default.container-executor.class + + + Number of threads container manager uses. yarn.nodemanager.container-manager.thread-count 20 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CompositeContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CompositeContainerExecutor.java new file mode 100644 index 0000000..65b2f66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CompositeContainerExecutor.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CompositeContainerExecutor extends ContainerExecutor { + private static final Log LOG = LogFactory + .getLog(CompositeContainerExecutor.class); + + @VisibleForTesting + Map execMap; + @VisibleForTesting + ContainerExecutor defaultExec; + @VisibleForTesting + String validContainerExecutor; + private Context context; + + @SuppressWarnings("unchecked") + public CompositeContainerExecutor(Configuration conf, Context context) { + String execClasses = conf.get(YarnConfiguration + .NM_CONTAINER_EXECUTOR, DefaultContainerExecutor.class.getName()); + String defaultExecClass = !execClasses.contains(",")? execClasses : conf + .get(YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR); + if (defaultExecClass == null) { + throw new YarnRuntimeException("Need to make a configuration for " + + YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR + " since " + + YarnConfiguration.NM_CONTAINER_EXECUTOR + "contains multiple " + + "values."); + } + this.execMap = new HashMap(); + Class execClass; + for (String containerExecutor : Splitter.on(',').omitEmptyStrings() + .trimResults().split(execClasses)) { + try { + execClass = (Class) conf + .getClassByName(containerExecutor); + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Failed to find container executor " + + "class " + containerExecutor); + } catch (ClassCastException e) { + throw new YarnRuntimeException("Container executor class " + + containerExecutor + " should extend " + ContainerExecutor.class + .getName()); + } + if (!execMap.containsKey(containerExecutor)) { + ContainerExecutor exec = ReflectionUtils.newInstance(execClass, conf); + execMap.put(containerExecutor, exec); + if (containerExecutor.equals(defaultExecClass)) { + this.defaultExec = exec; + } + } + } + this.validContainerExecutor = Joiner.on(",").join(execMap.keySet()); + this.context = context; + } + + public CompositeContainerExecutor(ContainerExecutor exec) { + this.execMap = ImmutableMap.of(exec.getClass().getName(), exec); + this.validContainerExecutor = exec.getClass().getName(); + this.defaultExec = exec; + this.context = null; + } + + @Override + public void init() throws IOException { + for (ContainerExecutor exec : execMap.values()) { + exec.init(); + } + } + + @Override + public void startLocalizer(Path nmPrivateContainerTokens, + InetSocketAddress nmAddr, String user, String appId, String locId, + LocalDirsHandlerService dirsHandler) throws IOException, + InterruptedException { + throw new YarnRuntimeException("Should never call this"); + } + + @Override + public int launchContainer(Container container, + Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, + String userName, String appId, Path containerWorkDir, + List localDirs, List logDirs) throws IOException { + throw new YarnRuntimeException("Should never call this"); + } + + @Override + public boolean signalContainer(String user, String pid, Signal signal) throws + IOException { + throw new YarnRuntimeException("Should never call this"); + } + + @Override + public boolean isContainerProcessAlive(String user, String pid) throws + IOException { + throw new YarnRuntimeException("Should never call this"); + } + + @Override + public void deleteAsUser(String user, Path subDir, Path... baseDirs) throws + IOException, InterruptedException { + for (ContainerExecutor exec : execMap.values()) { + exec.deleteAsUser(user, subDir, baseDirs); + } + } + + public ContainerExecutor getContainerExecutor(ContainerId containerId) { + ContainerExecutor exec = null; + if (context != null) { + Container container = context.getContainers().get(containerId); + if (container == null) { + LOG.warn("Can't find container " + containerId + ", " + + "use " + defaultExec.getClass() + " instead."); + exec = defaultExec; + } else { + ContainerLaunchContext launchContext = container.getLaunchContext(); + if (launchContext == null || launchContext + .getContainerExecutor() == null || launchContext + .getContainerExecutor().trim().isEmpty()) { + exec = defaultExec; + } else { + exec = execMap.get(launchContext.getContainerExecutor()); + if (exec == null) { + throw new YarnRuntimeException(this.getInValidExecLog + (containerId, launchContext.getContainerExecutor())); + } + } + } + } + exec = exec == null? defaultExec : exec; + LOG.info("Launch container " + containerId.toString() + " using " + exec + .getClass().getName()); + return exec; + } + + public boolean isValidContainerExecutor(String execClassName) { + return execClassName == null || execMap.containsKey(execClassName); + } + + public String getInValidExecLog(ContainerId containerId, String execName) { + return "Container " + containerId + " with invalid container executor " + + execName + ", please use " + this.validContainerExecutor + " instead."; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 6e7e2ec..6ac5d59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -77,4 +77,6 @@ boolean getDecommissioned(); void setDecommissioned(boolean isDecommissioned); + + CompositeContainerExecutor getCompositeContainerExecutor(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 53cbb11..7d8256d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -37,7 +37,6 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; @@ -104,7 +103,7 @@ protected NodeResourceMonitor createNodeResourceMonitor() { } protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, @@ -202,14 +201,18 @@ protected void serviceInit(Configuration conf) throws Exception { this.aclsManager = new ApplicationACLsManager(conf); - ContainerExecutor exec = ReflectionUtils.newInstance( - conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, - DefaultContainerExecutor.class, ContainerExecutor.class), conf); + this.context = createNMContext(containerTokenSecretManager, + nmTokenSecretManager, nmStore); + + CompositeContainerExecutor exec = createContainerExecutor(conf, context); try { exec.init(); } catch (IOException e) { - throw new YarnRuntimeException("Failed to initialize container executor", e); - } + throw new YarnRuntimeException("Failed to initialize container " + + "executor", e); + } + ((NMContext) context).setCompositeContainerExecutor(exec); + DeletionService del = createDeletionService(exec); addService(del); @@ -220,9 +223,6 @@ protected void serviceInit(Configuration conf) throws Exception { addService(nodeHealthChecker); dirsHandler = nodeHealthChecker.getDiskHandler(); - this.context = createNMContext(containerTokenSecretManager, - nmTokenSecretManager, nmStore); - nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); @@ -333,6 +333,7 @@ public void run() { .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); private final NMStateStoreService stateStore; private boolean isDecommissioned = false; + private CompositeContainerExecutor compositeContainerExecutor; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, @@ -351,6 +352,7 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, /** * Usable only after ContainerManager is started. */ + @Override public NodeId getNodeId() { return this.nodeId; @@ -429,6 +431,16 @@ public void setDecommissioned(boolean isDecommissioned) { } @Override + public CompositeContainerExecutor getCompositeContainerExecutor() { + return compositeContainerExecutor; + } + + public void setCompositeContainerExecutor( + CompositeContainerExecutor compositeContainerExecutor) { + this.compositeContainerExecutor = compositeContainerExecutor; + } + + @Override public Map getSystemCredentialsForApps() { return systemCredentials; } @@ -514,4 +526,9 @@ public static void main(String[] args) { public NodeStatusUpdater getNodeStatusUpdater() { return nodeStatusUpdater; } + + private CompositeContainerExecutor createContainerExecutor(Configuration + conf, Context context) { + return new CompositeContainerExecutor(conf, context); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index bb277d9..6f80afb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -94,6 +94,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -174,7 +175,7 @@ private long waitForContainersOnShutdownMillis; - public ContainerManagerImpl(Context context, ContainerExecutor exec, + public ContainerManagerImpl(Context context, CompositeContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { @@ -371,7 +372,8 @@ public ContainersMonitor getContainersMonitor() { } protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext, Context context) { + CompositeContainerExecutor exec, DeletionService deletionContext, + Context context) { return new ResourceLocalizationService(this.dispatcher, exec, deletionContext, dirsHandler, context); } @@ -381,7 +383,7 @@ protected SharedCacheUploadService createSharedCacheUploaderService() { } protected ContainersLauncher createContainersLauncher(Context context, - ContainerExecutor exec) { + CompositeContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); } @@ -816,6 +818,14 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, LOG.info("Start request for " + containerIdStr + " by user " + user); ContainerLaunchContext launchContext = request.getContainerLaunchContext(); + if (context.getCompositeContainerExecutor() != null && !context + .getCompositeContainerExecutor().isValidContainerExecutor( + (launchContext.getContainerExecutor()))) { + String errorMsg = context.getCompositeContainerExecutor() + .getInValidExecLog(containerId, launchContext.getContainerExecutor()); + LOG.error(errorMsg); + throw new YarnException(errorMsg); + } Map serviceData = getAuxServiceMetaData(); if (launchContext.getServiceData()!=null && diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 6950aa9..08af379 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; @@ -58,7 +59,7 @@ private static final Log LOG = LogFactory.getLog(ContainersLauncher.class); private final Context context; - private final ContainerExecutor exec; + private final CompositeContainerExecutor exec; private final Dispatcher dispatcher; private final ContainerManagerImpl containerManager; @@ -74,7 +75,7 @@ Collections.synchronizedMap(new HashMap()); public ContainersLauncher(Context context, Dispatcher dispatcher, - ContainerExecutor exec, LocalDirsHandlerService dirsHandler, + CompositeContainerExecutor exec, LocalDirsHandlerService dirsHandler, ContainerManagerImpl containerManager) { super("containers-launcher"); this.exec = exec; @@ -113,8 +114,9 @@ public void handle(ContainersLauncherEvent event) { containerId.getApplicationAttemptId().getApplicationId()); ContainerLaunch launch = - new ContainerLaunch(context, getConfig(), dispatcher, exec, app, - event.getContainer(), dirsHandler, containerManager); + new ContainerLaunch(context, getConfig(), dispatcher, exec + .getContainerExecutor(containerId), app, event.getContainer() + , dirsHandler, containerManager); containerLauncher.submit(launch); running.put(containerId, launch); break; @@ -122,7 +124,8 @@ public void handle(ContainersLauncherEvent event) { app = context.getApplications().get( containerId.getApplicationAttemptId().getApplicationId()); launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher, - exec, app, event.getContainer(), dirsHandler, containerManager); + exec.getContainerExecutor(containerId), app, event.getContainer(), + dirsHandler, containerManager); containerLauncher.submit(launch); running.put(containerId, launch); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 8c84132..40eb8d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -146,7 +147,7 @@ private long cacheTargetSize; private long cacheCleanupPeriod; - private final ContainerExecutor exec; + private final CompositeContainerExecutor exec; protected final Dispatcher dispatcher; private final DeletionService delService; private LocalizerTracker localizerTracker; @@ -177,7 +178,7 @@ FileContext lfs; public ResourceLocalizationService(Dispatcher dispatcher, - ContainerExecutor exec, DeletionService delService, + CompositeContainerExecutor exec, DeletionService delService, LocalDirsHandlerService dirsHandler, Context context) { super(ResourceLocalizationService.class.getName()); @@ -1078,13 +1079,12 @@ public void run() { List localDirs = getInitializedLocalDirs(); List logDirs = getInitializedLogDirs(); if (dirsHandler.areDisksHealthy()) { - exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, - context.getUser(), - ConverterUtils.toString( - context.getContainerId(). - getApplicationAttemptId().getApplicationId()), - localizerId, - dirsHandler); + exec.getContainerExecutor(context.getContainerId()) + .startLocalizer(nmPrivateCTokensPath, localizationServerAddress, + context.getUser(), + ConverterUtils.toString(context.getContainerId(). + getApplicationAttemptId().getApplicationId()), + localizerId, dirsHandler); } else { throw new IOException("All disks failed. " + dirsHandler.getDisksHealthReport(false)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 0ae4325..e730f0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; @@ -59,7 +59,7 @@ Map trackingContainers = new HashMap(); - final ContainerExecutor containerExecutor; + final CompositeContainerExecutor containerExecutor; private final Dispatcher eventDispatcher; private final Context context; private ResourceCalculatorPlugin resourceCalculatorPlugin; @@ -76,7 +76,7 @@ private static final long UNKNOWN_MEMORY_LIMIT = -1L; - public ContainersMonitorImpl(ContainerExecutor exec, + public ContainersMonitorImpl(CompositeContainerExecutor exec, AsyncDispatcher dispatcher, Context context) { super("containers-monitor"); @@ -386,7 +386,8 @@ public void run() { // Initialize any uninitialized processTrees if (pId == null) { // get pid from ContainerId - pId = containerExecutor.getProcessId(ptInfo.getContainerId()); + pId = containerExecutor.getContainerExecutor(ptInfo.getContainerId()) + .getProcessId(ptInfo.getContainerId()); if (pId != null) { // pId will be null, either if the container is not spawned yet // or if the container's pid is removed from ContainerExecutor diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index f872a55..abc9fcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -66,14 +66,16 @@ public DummyContainerManager(Context context, ContainerExecutor exec, NodeManagerMetrics metrics, ApplicationACLsManager applicationACLsManager, LocalDirsHandlerService dirsHandler) { - super(context, exec, deletionContext, nodeStatusUpdater, metrics, + super(context, new CompositeContainerExecutor(exec), deletionContext, + nodeStatusUpdater, metrics, applicationACLsManager, dirsHandler); } @Override @SuppressWarnings("unchecked") protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext, Context context) { + CompositeContainerExecutor exec, DeletionService deletionContext, + Context context) { return new ResourceLocalizationService(super.dispatcher, exec, deletionContext, super.dirsHandler, context) { @Override @@ -143,7 +145,7 @@ protected UserGroupInformation getRemoteUgi() throws YarnException { @Override @SuppressWarnings("unchecked") protected ContainersLauncher createContainersLauncher(Context context, - ContainerExecutor exec) { + CompositeContainerExecutor exec) { return new ContainersLauncher(context, super.dispatcher, exec, super.dirsHandler, this) { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestCompositeContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestCompositeContainerExecutor.java new file mode 100644 index 0000000..1c7788e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestCompositeContainerExecutor.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCompositeContainerExecutor extends BaseContainerManagerTest { + + public TestCompositeContainerExecutor() throws + UnsupportedFileSystemException { + super(); + } + + @Override + @Before + public void setup() throws IOException { + super.setup(); + ((NodeManager.NMContext) context).setCompositeContainerExecutor(exec); + } + + @Override + protected CompositeContainerExecutor createContainerExecutor() { + conf.set(YarnConfiguration.NM_CONTAINER_EXECUTOR, + ValidContainerExecutor.class.getName() + "," + + DefaultContainerExecutor.class.getName()); + conf.set(YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR, + ValidContainerExecutor.class.getName()); + return new CompositeContainerExecutor(conf, context); + } + + public static final class ValidContainerExecutor extends DefaultContainerExecutor { + int launchCount = 0; + + @Override + public int launchContainer(Container container, + Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, + String user, String appId, Path containerWorkDir, + List localDirs, List logDirs) throws IOException { + launchCount++; + return super.launchContainer(container, nmPrivateContainerScriptPath, + nmPrivateTokensPath, user, appId, containerWorkDir, localDirs, + logDirs); + } + } + + @Test + public void testContainerExecutorConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + + //test single container executor configuration + conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, + ValidContainerExecutor.class, ContainerExecutor.class); + CompositeContainerExecutor compositeExec = new CompositeContainerExecutor + (conf, null); + expectExecutors(compositeExec, ValidContainerExecutor.class.getName(), + ValidContainerExecutor.class, ValidContainerExecutor.class); + + //test multiple container executor configuration + conf.set(YarnConfiguration.NM_CONTAINER_EXECUTOR, + ValidContainerExecutor.class.getName() + "," + + DefaultContainerExecutor.class.getName()); + boolean expectFail = false; + try { + new CompositeContainerExecutor(conf, null); + } catch (Exception e) { + //Do not configure NM_DEFAULT_CONTAINER_EXECUTOR, should get an exception + Assert.assertEquals("Need to make a configuration for " + + YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR + " since " + + YarnConfiguration.NM_CONTAINER_EXECUTOR + "contains multiple " + + "values.", e.getMessage()); + expectFail = true; + } + Assert.assertTrue(expectFail); + } + + @SafeVarargs + private final void expectExecutors(CompositeContainerExecutor compositeExec, + String validExec, Class defaultExec, + Class... expectExecs) { + Assert.assertEquals(validExec, compositeExec.validContainerExecutor); + Assert.assertEquals(defaultExec, compositeExec.defaultExec.getClass()); + if (expectExecs != null) { + Assert.assertEquals(expectExecs.length, compositeExec.execMap.size()); + for (Class clazz : expectExecs) { + Assert.assertTrue(compositeExec.execMap.containsKey(clazz.getName())); + Assert.assertEquals(clazz, compositeExec.execMap.get(clazz.getName()).getClass()); + } + } + } + + @Test + public void testWithMocks() { + expectExecutors(exec, Joiner.on(",") + .join(ValidContainerExecutor.class.getName(), + DefaultContainerExecutor.class.getName()), + ValidContainerExecutor.class, ValidContainerExecutor.class, + DefaultContainerExecutor.class); + ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class); + ContainerId containerId = createContainerId(0); + Container container = mock(Container.class); + when(container.getContainerId()).thenReturn(containerId); + when(container.getLaunchContext()).thenReturn(launchContext); + context.getContainers().put(containerId, container); + + when(launchContext.getContainerExecutor()) + .thenReturn(DefaultContainerExecutor.class.getName()); + Assert.assertEquals(DefaultContainerExecutor.class, + exec.getContainerExecutor(containerId).getClass()); + + when(launchContext.getContainerExecutor()) + .thenReturn(ValidContainerExecutor.class.getName()); + Assert.assertEquals(ValidContainerExecutor.class, + exec.getContainerExecutor(containerId).getClass()); + + // Do not have LinuxContainerExecutor configured, should fail + when(launchContext.getContainerExecutor()) + .thenReturn(LinuxContainerExecutor.class.getName()); + boolean expectFail = false; + try { + exec.getContainerExecutor(containerId); + } catch (Exception e) { + Assert.assertEquals(YarnRuntimeException.class, e.getClass()); + Assert.assertEquals(exec.getInValidExecLog(containerId, + LinuxContainerExecutor.class.getName()), e.getMessage()); + expectFail = true; + } + Assert.assertTrue(expectFail); + + ContainerId notExistContainer = createContainerId(1); + Assert.assertEquals(ValidContainerExecutor.class, + exec.getContainerExecutor(notExistContainer).getClass()); + } + + @Test + public void testLaunchContainers() throws IOException, YarnException, + InterruptedException { + ValidContainerExecutor validExec = (ValidContainerExecutor) exec.execMap + .get(ValidContainerExecutor.class.getName()); + containerManager.start(); + ContainerId cId = createContainerId(0); + StartContainerRequest scRequest = createStartContainerRequest(cId, + LinuxContainerExecutor.class); + Assert.assertEquals(0, validExec.launchCount); + + StartContainersResponse scResponse = containerManager.startContainers( + StartContainersRequest.newInstance(ImmutableList.of(scRequest))); + Assert.assertEquals(1, scResponse.getFailedRequests().size()); + Map failedRequests = scResponse + .getFailedRequests(); + Assert.assertTrue(failedRequests.containsKey(cId)); + Throwable throwable = failedRequests.get(cId).deSerialize(); + Assert.assertEquals(YarnException.class, throwable.getClass()); + Assert.assertEquals(exec.getInValidExecLog(cId, LinuxContainerExecutor + .class.getName()), throwable.getMessage()); + + //container with ValidContainerExecutor should be launched with ValidContainerExecutor + Assert.assertEquals(0, validExec.launchCount); + cId = createContainerId(1); + scRequest = createStartContainerRequest(cId, ValidContainerExecutor.class); + containerManager.startContainers( + StartContainersRequest.newInstance(ImmutableList.of(scRequest))); + BaseContainerManagerTest + .waitForContainerState(containerManager, cId, ContainerState.COMPLETE); + Assert.assertEquals(1, validExec.launchCount); + + //container with DefaultContainerExecutor should be launched with DefaultContainerExecutor + cId = createContainerId(2); + scRequest = createStartContainerRequest(cId, DefaultContainerExecutor.class); + containerManager.startContainers( + StartContainersRequest.newInstance(ImmutableList.of(scRequest))); + BaseContainerManagerTest + .waitForContainerState(containerManager, cId, ContainerState.COMPLETE); + Assert.assertEquals(1, validExec.launchCount); + } + + private StartContainerRequest createStartContainerRequest(ContainerId cId, + Class clazz) throws IOException { + ContainerLaunchContext containerLaunchContext = recordFactory + .newRecordInstance(ContainerLaunchContext.class); + containerLaunchContext.setContainerExecutor(clazz.getName()); + return StartContainerRequest.newInstance(containerLaunchContext, + TestContainerManager + .createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), + user, context.getContainerTokenSecretManager())); + } + + private ContainerId createContainerId(int id) { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(appId, 1); + return ContainerId.newContainerId(appAttemptId, id); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index a47e7f7..fe06d25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -195,12 +195,12 @@ private boolean shouldRunTest() { } @Override - protected ContainerExecutor createContainerExecutor() { + protected CompositeContainerExecutor createContainerExecutor() { super.conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH)); LinuxContainerExecutor linuxContainerExecutor = new LinuxContainerExecutor(); linuxContainerExecutor.setConf(super.conf); - return linuxContainerExecutor; + return new CompositeContainerExecutor(linuxContainerExecutor); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 611e671..2620e98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -431,7 +431,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 71a420e..5e4e553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -1077,7 +1077,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { @@ -1290,7 +1290,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService diskhandler) { @@ -1440,7 +1440,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, @Override protected ContainerManagerImpl createContainerManager(Context context, - ContainerExecutor exec, DeletionService del, + CompositeContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 1907e1a..a8e965f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -110,7 +111,7 @@ public int getHttpPort() { return HTTP_PORT; }; }; - protected ContainerExecutor exec; + protected CompositeContainerExecutor exec; protected DeletionService delSrvc; protected String user = "nobody"; protected NodeHealthCheckerService nodeHealthChecker; @@ -143,10 +144,10 @@ public long getRMIdentifier() { protected ContainerManagerImpl containerManager = null; - protected ContainerExecutor createContainerExecutor() { + protected CompositeContainerExecutor createContainerExecutor() { DefaultContainerExecutor exec = new DefaultContainerExecutor(); exec.setConf(conf); - return exec; + return new CompositeContainerExecutor(exec); } @Before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index a73d583..799288f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -308,8 +309,8 @@ public void handle(ContainersLauncherEvent event) { }; return new ContainerManagerImpl(context, - mock(ContainerExecutor.class), mock(DeletionService.class), - mock(NodeStatusUpdater.class), metrics, + new CompositeContainerExecutor(mock(ContainerExecutor.class)), + mock(DeletionService.class), mock(NodeStatusUpdater.class), metrics, context.getApplicationACLsManager(), null) { @Override protected LogHandler createLogHandler(Configuration conf, @@ -319,13 +320,14 @@ protected LogHandler createLogHandler(Configuration conf, @Override protected ResourceLocalizationService createResourceLocalizationService( - ContainerExecutor exec, DeletionService deletionContext, Context context) { + CompositeContainerExecutor exec, DeletionService + deletionContext, Context context) { return rsrcSrv; } @Override protected ContainersLauncher createContainersLauncher( - Context context, ContainerExecutor exec) { + Context context, CompositeContainerExecutor exec) { return launcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 2834e30..a7dbb55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -754,7 +755,8 @@ public boolean matches(Object o) { when(context.getNMStateStore()).thenReturn(stateStore); ContainerExecutor executor = mock(ContainerExecutor.class); launcher = - new ContainersLauncher(context, dispatcher, executor, null, null); + new ContainersLauncher(context, dispatcher, new + CompositeContainerExecutor(executor), null, null); // create a mock ExecutorService, which will not really launch // ContainerLaunch at all. launcher.containerLauncher = mock(ExecutorService.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 30af5a4..73072b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -97,7 +97,9 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; @@ -207,7 +209,7 @@ public void testLocalizationInit() throws Exception { diskhandler.init(conf); ResourceLocalizationService locService = - spy(new ResourceLocalizationService(dispatcher, exec, delService, + spy(createResourceLocalizationService(dispatcher, exec, delService, diskhandler, nmContext)); doReturn(lfs) .when(locService).getLocalFileContext(isA(Configuration.class)); @@ -268,7 +270,7 @@ public void testDirectoryCleanupOnNewlyCreatedStateStore() when(nmStateStoreService.isNewlyCreated()).thenReturn(true); ResourceLocalizationService locService = - spy(new ResourceLocalizationService(dispatcher, exec, delService, + spy(createResourceLocalizationService(dispatcher, exec, delService, diskhandler,nmContext)); doReturn(lfs) .when(locService).getLocalFileContext(isA(Configuration.class)); @@ -339,7 +341,7 @@ public void testResourceRelease() throws Exception { delService.start(); ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -749,7 +751,7 @@ public void testLocalizationHeartbeat() throws Exception { delService.start(); ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -1000,7 +1002,7 @@ public void testPublicResourceInitializesLocalDir() throws Exception { try { ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, spyContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -1094,7 +1096,7 @@ public void testFailedPublicResource() throws Exception { try { ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -1203,7 +1205,7 @@ public void testPublicResourceAddResourceExceptions() throws Exception { try { ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandlerSpy, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -1335,7 +1337,7 @@ public void testParallelDownloadAttemptsForPrivateResource() throws Exception { dispatcher1.start(); ResourceLocalizationService rls = - new ResourceLocalizationService(dispatcher1, exec, delService, + createResourceLocalizationService(dispatcher1, exec, delService, localDirHandler, nmContext); dispatcher1.register(LocalizationEventType.class, rls); rls.init(conf); @@ -1488,7 +1490,7 @@ public void testLocalResourcePath() throws Exception { dispatcher1.start(); ResourceLocalizationService rls = - new ResourceLocalizationService(dispatcher1, exec, delService, + createResourceLocalizationService(dispatcher1, exec, delService, localDirHandler, nmContext); dispatcher1.register(LocalizationEventType.class, rls); rls.init(conf); @@ -1654,7 +1656,7 @@ public void testParallelDownloadAttemptsForPublicResource() throws Exception { // Creating and initializing ResourceLocalizationService but not starting // it as otherwise it will remove requests from pending queue. ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher1, exec, delService, + createResourceLocalizationService(dispatcher1, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); dispatcher1.register(LocalizationEventType.class, spyService); @@ -1948,7 +1950,7 @@ private ResourceLocalizationService createSpyService( new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf), stateStore); ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -2012,7 +2014,7 @@ public void testFailedDirsResourceRelease() throws Exception { // setup mocks ResourceLocalizationService rawService = - new ResourceLocalizationService(dispatcher, exec, delService, + createResourceLocalizationService(dispatcher, exec, delService, mockDirsHandler, nmContext); ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); @@ -2234,4 +2236,11 @@ public boolean matches(Object o) { } } + public ResourceLocalizationService createResourceLocalizationService + (Dispatcher dispatcher, ContainerExecutor exec, DeletionService + delService, LocalDirsHandlerService dirsHandler, Context context) { + return new ResourceLocalizationService(dispatcher, + new CompositeContainerExecutor(exec), delService, dirsHandler, context); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java index 1f2d067..14d42e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.CompositeContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -299,7 +300,8 @@ public void testContainerMonitorMemFlags() { long expPmem = 8192 * 1024 * 1024l; long expVmem = (long) (expPmem * 2.1f); - cm = new ContainersMonitorImpl(mock(ContainerExecutor.class), + cm = new ContainersMonitorImpl( + new CompositeContainerExecutor(mock(ContainerExecutor.class)), mock(AsyncDispatcher.class), mock(Context.class)); cm.init(getConfForCM(false, false, 8192, 2.1f)); assertEquals(expPmem, cm.getPmemAllocatedForContainers()); @@ -307,7 +309,8 @@ public void testContainerMonitorMemFlags() { assertEquals(false, cm.isPmemCheckEnabled()); assertEquals(false, cm.isVmemCheckEnabled()); - cm = new ContainersMonitorImpl(mock(ContainerExecutor.class), + cm = new ContainersMonitorImpl( + new CompositeContainerExecutor(mock(ContainerExecutor.class)), mock(AsyncDispatcher.class), mock(Context.class)); cm.init(getConfForCM(true, false, 8192, 2.1f)); assertEquals(expPmem, cm.getPmemAllocatedForContainers()); @@ -315,7 +318,8 @@ public void testContainerMonitorMemFlags() { assertEquals(true, cm.isPmemCheckEnabled()); assertEquals(false, cm.isVmemCheckEnabled()); - cm = new ContainersMonitorImpl(mock(ContainerExecutor.class), + cm = new ContainersMonitorImpl( + new CompositeContainerExecutor(mock(ContainerExecutor.class)), mock(AsyncDispatcher.class), mock(Context.class)); cm.init(getConfForCM(true, true, 8192, 2.1f)); assertEquals(expPmem, cm.getPmemAllocatedForContainers()); @@ -323,7 +327,8 @@ public void testContainerMonitorMemFlags() { assertEquals(true, cm.isPmemCheckEnabled()); assertEquals(true, cm.isVmemCheckEnabled()); - cm = new ContainersMonitorImpl(mock(ContainerExecutor.class), + cm = new ContainersMonitorImpl( + new CompositeContainerExecutor(mock(ContainerExecutor.class)), mock(AsyncDispatcher.class), mock(Context.class)); cm.init(getConfForCM(false, true, 8192, 2.1f)); assertEquals(expPmem, cm.getPmemAllocatedForContainers());