diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2e2abc9..d3a8317 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1339,6 +1339,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_AUX_SERVICE_FMT = NM_PREFIX + "aux-services.%s.class"; + public static final String NM_AUX_SERVICE_CLASS_CLASSPATH = + NM_PREFIX + "aux-services.%s.class.classpath"; + public static final String NM_USER_HOME_DIR = NM_PREFIX + "user-home-dir"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 0e508ed..bfaa560 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -91,6 +91,8 @@ public void initializeMemberVariables() { configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); + configurationPropsToSkipCompare.add( + YarnConfiguration.NM_AUX_SERVICE_CLASS_CLASSPATH); // Ignore all YARN Application Timeline Service (version 1) properties configurationPrefixToSkipCompare.add("yarn.timeline-service."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 6508a2a..1deaeff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1725,6 +1725,18 @@ org.apache.hadoop.mapred.ShuffleHandler + + + The configuration to indicate a comma-separated list of mapreduce_shuffle + classpath location. It can be configured as local file path + (such as file:///path-to-class), or as hdfs file path. + When this value is not empty, the mapreduce_shuffle class would be loaded + using the configured classpath, otherwise, it would use NM class path. + + yarn.nodemanager.aux-services.mapreduce_shuffle.class.classpath + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index cd5ed88..0da97f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -18,23 +18,38 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Map.Entry; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.util.ApplicationClassLoader; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -45,6 +60,9 @@ import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; public class AuxServices extends AbstractService implements ServiceStateChangeListener, EventHandler { @@ -118,9 +136,30 @@ public void serviceInit(Configuration conf) throws Exception { YarnConfiguration.NM_AUX_SERVICES +" is invalid." + "The valid service name should only contain a-zA-Z0-9_ " + "and can not start with numbers"); - Class sClass = conf.getClass( - String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, sName), null, - AuxiliaryService.class); + String classKey = String.format( + YarnConfiguration.NM_AUX_SERVICE_FMT, sName); + Class sClass = null; + String className = conf.get(classKey); + String localClassPath = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICE_CLASS_CLASSPATH, sName)); + if (localClassPath != null && !localClassPath.isEmpty() + && className != null && !className.isEmpty()) { + // TODO: ApplicationClassLoader can not load jar from hdfs directory + final URL[] urls = constructUrlsFromClasspath(localClassPath, conf); + ApplicationClassLoader loader = AccessController.doPrivileged( + new PrivilegedExceptionAction() { + @Override + public ApplicationClassLoader run() + throws MalformedURLException { + return new ApplicationClassLoader( + urls, getClass().getClassLoader(), null); + } + }); + Class clazz = Class.forName(className, true, loader); + sClass = clazz.asSubclass(AuxiliaryService.class); + } else { + sClass = conf.getClass(classKey, null, AuxiliaryService.class); + } if (null == sClass) { throw new RuntimeException("No class defined for " + sName); } @@ -264,4 +303,39 @@ private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, : "The auxService name is " + service.getName()) + " and it got an error at event: " + eventType, th); } + + private URL[] constructUrlsFromClasspath(String classpath, + Configuration conf) throws MalformedURLException, IOException, + IllegalArgumentException { + List urls = new ArrayList(); + for (String element : classpath.split(",")) { + if (element.endsWith("/*")) { + element = element.substring(0, element.length() - 1); + } + Path dirPath = new Path(element); + FileContext fc = FileContext.getFileContext(dirPath.toUri(), conf); + Set status = new HashSet( + Arrays.asList(fc.util().listStatus(dirPath))); + + Iterable mask = + Iterables.filter(status, new Predicate() { + @Override + public boolean apply(FileStatus next) { + return next.getPath().getName().endsWith(".jar") + || next.getPath().getName().endsWith(".JAR"); + } + }); + status = Sets.newHashSet(mask); + if (status != null && !status.isEmpty()) { + for (FileStatus fileStatus : status) { + urls.add(fileStatus.getPath().toUri().toURL()); + } + } + } + return urls.toArray(new URL[urls.size()]); + } + + static { + URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 1380752..d70771f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -354,6 +354,68 @@ public void testAuxServiceRecoverySetup() throws IOException { } } + @Test (timeout = 10000) + public void testLoadAuxServiceLocally() throws IOException, Exception { + Configuration conf = new YarnConfiguration(); + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { "Asrv"}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), + ServiceA.class, Service.class); + + // Set this configuration with a directory which does not + // contain the related jar file. + conf.set(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), + TEST_DIR.toString()); + boolean auxServiceInit = true; + + // initiate the aux-service + // should be fail because the jar file can not be found in this configured + // directory. + AuxServices aux = null; + try { + aux = new AuxServices(); + aux.init(conf); + } catch (Exception ex) { + auxServiceInit = false; + } finally { + if (aux != null) { + aux.close(); + } + } + Assert.assertFalse("The aux-service should not be initiated" + + "by specifing the invalid classpath.", + auxServiceInit); + + // Set this configuration with the directory + // which contains the related jar file + ClassLoader loader = TestAuxServices.class.getClassLoader(); + String classPath = loader.getResource( + "org/apache/hadoop/yarn/server/nodemanager/containermanager" + + "/TestAuxServices.class").toString(); + Configuration conf2 = new YarnConfiguration(); + conf2.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { "Asrv"}); + conf2.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"), + ServiceA.class, Service.class); + conf2.set(String.format(YarnConfiguration.NM_AUX_SERVICE_CLASS_CLASSPATH, + "Asrv"), classPath); + + AuxServices aux1 = null; + try { + aux1 = new AuxServices(); + aux1.init(conf2); + Assert.assertEquals("We should only have one aux-service initiated.", + 1, aux1.getServices().size()); + Assert.assertEquals("ServiceA should be initiated as NM Aux-service.", + ServiceA.class.getName(), + (aux1.getServices().toArray()[0]).getClass().getName()); + } finally { + if (aux1 != null) { + aux1.close(); + } + } + } + static class RecoverableAuxService extends AuxiliaryService { static final FsPermission RECOVERY_PATH_PERMS = new FsPermission((short)0700);