diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e7d16d7..7dd81f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1368,6 +1368,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_AUX_SERVICE_FMT = NM_PREFIX + "aux-services.%s.class"; + public static final String NM_AUX_SERVICE_CLASS_CLASSPATH = + NM_PREFIX + "aux-services.%s.class.classpath"; + + public static final String NM_AUX_SERVICE_CLASSLOADER_SYSTEM_CLASSES = + NM_PREFIX + "aux-services.%s.classloader.system.classes"; + public static final String NM_USER_HOME_DIR = NM_PREFIX + "user-home-dir"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index cd5ed88..93a05dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -18,7 +18,13 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import java.io.IOException; +import java.net.MalformedURLException; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -35,6 +41,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.util.ApplicationClassLoader; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -118,21 +125,51 @@ public void serviceInit(Configuration conf) throws Exception { YarnConfiguration.NM_AUX_SERVICES +" is invalid." + "The valid service name should only contain a-zA-Z0-9_ " + "and can not start with numbers"); - Class sClass = conf.getClass( - String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, sName), null, + String classKey = String.format( + YarnConfiguration.NM_AUX_SERVICE_FMT, sName); + String className = conf.get(classKey); + final String appClassPath = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICE_CLASS_CLASSPATH, sName)); + AuxiliaryService s = null; + boolean useCustomerClassLoader = appClassPath != null + && !appClassPath.isEmpty() && className != null + && !className.isEmpty(); + if (useCustomerClassLoader) { + String[] systemClass = conf.getTrimmedStrings(String.format( + YarnConfiguration.NM_AUX_SERVICE_CLASSLOADER_SYSTEM_CLASSES, + sName)); + ClassLoader customClassLoader = createJobClassLoader( + appClassPath, systemClass); + Class clazz = Class.forName(className, true, + customClassLoader); + Class sClass = clazz.asSubclass( AuxiliaryService.class); - if (null == sClass) { - throw new RuntimeException("No class defined for " + sName); + AuxiliaryService wrapped = ReflectionUtils.newInstance(sClass, conf); + s = new AuxiliaryServiceWithCustomClassLoader( + sName + " with custom class loader", wrapped, + customClassLoader); + LOG.info("The aux service:" + sName + + " are using the custom classloader"); + } else { + Class sClass = conf.getClass( + classKey, null, AuxiliaryService.class); + + if (sClass == null) { + throw new RuntimeException("No class defined for " + sName); + } + s = ReflectionUtils.newInstance(sClass, conf); + // TODO better use s.getName()? + if(s != null && !sName.equals(s.getName())) { + LOG.warn("The Auxilurary Service named '"+sName+"' in the " + +"configuration is for "+sClass+" which has " + +"a name of '"+s.getName()+"'. Because these are " + +"not the same tools trying to send ServiceData and read " + +"Service Meta Data may have issues unless the refer to " + +"the name in the config."); + } } - AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf); - // TODO better use s.getName()? - if(!sName.equals(s.getName())) { - LOG.warn("The Auxilurary Service named '"+sName+"' in the " - +"configuration is for "+sClass+" which has " - +"a name of '"+s.getName()+"'. Because these are " - +"not the same tools trying to send ServiceData and read " - +"Service Meta Data may have issues unless the refer to " - +"the name in the config."); + if (s == null) { + throw new RuntimeException("No object created for " + sName); } addService(sName, s); if (recoveryEnabled) { @@ -264,4 +301,25 @@ private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, : "The auxService name is " + service.getName()) + " and it got an error at event: " + eventType, th); } -} + + private ClassLoader createJobClassLoader(final String appClasspath, + final String[] systemClasses) throws IOException { + try { + return AccessController.doPrivileged( + new PrivilegedExceptionAction() { + @Override + public ClassLoader run() throws MalformedURLException { + return new ApplicationClassLoader(appClasspath, + AuxServices.class.getClassLoader(), + Arrays.asList(systemClasses)); + } + }); + } catch (PrivilegedActionException e) { + Throwable t = e.getCause(); + if (t instanceof MalformedURLException) { + throw (MalformedURLException) t; + } + throw new IOException(e); + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java new file mode 100644 index 0000000..426873c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxiliaryServiceWithCustomClassLoader.java @@ -0,0 +1,147 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; + +class AuxiliaryServiceWithCustomClassLoader extends AuxiliaryService { + + private final AuxiliaryService wrapped; + private final ClassLoader customClassLoader; + + public AuxiliaryServiceWithCustomClassLoader(String name, + AuxiliaryService wrapped, ClassLoader customClassLoader) { + super(name); + this.wrapped = wrapped; + this.customClassLoader = customClassLoader; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + Configuration config = new Configuration(conf); + // reset the service configuration + setConfig(config); + config.setClassLoader(customClassLoader); + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.init(config); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + protected void serviceStart() throws Exception { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.start(); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + protected void serviceStop() throws Exception { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.stop(); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void initializeApplication( + ApplicationInitializationContext initAppContext) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.initializeApplication(initAppContext); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void stopApplication(ApplicationTerminationContext stopAppContext) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.stopApplication(stopAppContext); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public ByteBuffer getMetaData() { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + return wrapped.getMetaData(); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void initializeContainer(ContainerInitializationContext + initContainerContext) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.initializeContainer(initContainerContext); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void stopContainer(ContainerTerminationContext stopContainerContext) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.stopContainer(stopContainerContext); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } + + @Override + public void setRecoveryPath(Path recoveryPath) { + ClassLoader original = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(customClassLoader); + try { + wrapped.setRecoveryPath(recoveryPath); + } finally { + Thread.currentThread().setContextClassLoader(original); + } + } +}