diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7c25be3..49e1ce4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1797,6 +1797,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_AUX_SERVICES_CLASSPATH = NM_AUX_SERVICES + ".%s.classpath"; + public static final String NM_AUX_SERVICE_REMOTE_CLASSPATH = + NM_AUX_SERVICES + ".%s.remote-classpath"; + public static final String NM_AUX_SERVICES_SYSTEM_CLASSES = NM_AUX_SERVICES + ".%s.system-classes"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 57cca50..95592a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -18,10 +18,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.regex.Pattern; @@ -29,6 +31,8 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -36,20 +40,32 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; - +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.util.FSDownload; import com.google.common.base.Preconditions; public class AuxServices extends AbstractService implements ServiceStateChangeListener, EventHandler { + public static final String NM_AUX_SERVICE_DIR = "nmAuxService"; + public static final FsPermission NM_AUX_SERVICE_DIR_PERM = + new FsPermission((short) 0700); + static final String STATE_STORE_ROOT_NAME = "nm-aux-services"; private static final Logger LOG = @@ -58,16 +74,25 @@ protected final Map serviceMap; protected final Map serviceMetaData; private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; + private final LocalDirsHandlerService dirsHandler; + private final FileContext localLFS; + private final FileContext remoteLFS; + private final Configuration conf; private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); - public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { + public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler, + Context nmContext) { super(AuxServices.class.getName()); serviceMap = Collections.synchronizedMap(new HashMap()); serviceMetaData = Collections.synchronizedMap(new HashMap()); this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + this.dirsHandler = nmContext.getLocalDirsHandler(); + this.conf = nmContext.getConf(); + this.localLFS = getLocalFileContext(this.conf); + this.remoteLFS = getRemoteFileContext(this.conf); // Obtain services from configuration in init() } @@ -127,13 +152,42 @@ public void serviceInit(Configuration conf) throws Exception { String className = conf.get(classKey); final String appClassPath = conf.get(String.format( YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName)); + final String appRemoteClassPath = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName)); AuxiliaryService s = null; - boolean useCustomerClassLoader = appClassPath != null - && !appClassPath.isEmpty() && className != null - && !className.isEmpty(); + boolean useCustomerClassLoader = ((appClassPath != null + && !appClassPath.isEmpty()) || + (appRemoteClassPath != null && !appRemoteClassPath.isEmpty())) + && className != null && !className.isEmpty(); if (useCustomerClassLoader) { - s = AuxiliaryServiceWithCustomClassLoader.getInstance( - conf, className, appClassPath); + // load AuxiliaryService from local class path + if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) { + s = AuxiliaryServiceWithCustomClassLoader.getInstance( + conf, className, appClassPath); + } else { + // load AuxiliaryService from remote class path + if (appClassPath != null && !appClassPath.isEmpty()) { + LOG.warn("The aux serivce:" + sName + " has local classpath:" + + appClassPath + " and remote classpath:" + + appRemoteClassPath + ". Using remote classpath."); + } + // create its own aux_serivce object directory locally + // createAuxServiceDir(className); + // download remote class objects to NM_AUX_SERVICE_DIR + Path downloadDest = new Path(dirsHandler.getLocalPathForWrite( + "." + Path.SEPARATOR + NM_AUX_SERVICE_DIR), className); + Path src = this.remoteLFS.makeQualified(new Path(appRemoteClassPath)); + FileStatus scFileStatus = this.remoteLFS.getFileStatus(src); + LocalResource scRsrc = LocalResource.newInstance( + URL.fromURI(src.toUri()), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + scFileStatus.getLen(), scFileStatus.getModificationTime()); + FSDownload download = new FSDownload(this.localLFS, null, conf, + downloadDest, scRsrc, null); + Path dest = new Path(download.call() + Path.SEPARATOR + "*"); + s = AuxiliaryServiceWithCustomClassLoader.getInstance( + conf, className, dest.toString()); + } LOG.info("The aux service:" + sName + " are using the custom classloader"); } else { @@ -289,4 +343,48 @@ private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, : "The auxService name is " + service.getName()) + " and it got an error at event: " + eventType, th); } + + private void createAuxServiceDir(String className) { + List localDirs = this.dirsHandler.getLocalDirs(); + for (String localDir : localDirs) { + FileStatus status; + Path base = localLFS.makeQualified(new Path(localDir, NM_AUX_SERVICE_DIR)); + Path target = new Path(base, className); + try { + localLFS.mkdir(target, NM_AUX_SERVICE_DIR_PERM, false); + status = localLFS.getFileStatus(target); + } catch (IOException e) { + String msg = "Could not initialize local dir " + target; + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + FsPermission perms = status.getPermission(); + if(!perms.equals(NM_AUX_SERVICE_DIR_PERM)) { + try { + localLFS.setPermission(target, NM_AUX_SERVICE_DIR_PERM); + } + catch(IOException ie) { + String msg = "Could not set permissions for local dir " + target; + LOG.warn(msg, ie); + throw new YarnRuntimeException(msg, ie); + } + } + } + } + + FileContext getLocalFileContext(Configuration conf) { + try { + return FileContext.getLocalFSFileContext(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access local fs"); + } + } + + FileContext getRemoteFileContext(Configuration conf) { + try { + return FileContext.getFileContext(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access remote fs"); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index de3db6e..1c4b082 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -253,7 +253,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = new AuxiliaryLocalPathHandlerImpl(dirsHandler); // Start configurable services - auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler); + auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler, this.context); auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 29fc747..32fa376 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; @@ -1494,6 +1495,8 @@ private void cleanUpLocalDir(FileContext lfs, DeletionService del, currentTimeStamp); renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR, currentTimeStamp); + renameLocalDir(lfs, localDir, AuxServices.NM_AUX_SERVICE_DIR, + currentTimeStamp); try { deleteLocalDir(lfs, del, localDir); } catch (IOException e) { @@ -1532,7 +1535,11 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*") || status.getPath().getName() - .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) { + .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*") + || + status.getPath().getName() + .matches(".*" + AuxServices.NM_AUX_SERVICE_DIR + + "_DEL_.*")) { FileDeletionTask deletionTask = new FileDeletionTask(del, null, status.getPath(), null); del.delete(deletionTask); @@ -1640,10 +1647,13 @@ private boolean checkLocalDir(String localDir) { Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE); Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE); Path sysDir = new Path(localDir, NM_PRIVATE_DIR); + Path auxSysDir = new Path(localDir, AuxServices.NM_AUX_SERVICE_DIR); localDirPathFsPermissionsMap.put(userDir, defaultPermission); localDirPathFsPermissionsMap.put(fileDir, defaultPermission); localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission); + localDirPathFsPermissionsMap.put(auxSysDir, + AuxServices.NM_AUX_SERVICE_DIR_PERM); return localDirPathFsPermissionsMap; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 6d12f8c..eeb4e26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -83,6 +83,7 @@ TestAuxServices.class.getName()); private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER = Mockito.mock(AuxiliaryLocalPathHandler.class); + private final static Context MOCK_CONTEXT = Mockito.mock(Context.class); static class LightService extends AuxiliaryService implements Service { @@ -202,7 +203,7 @@ public void testCustomizedAuxServiceClassPath() throws Exception { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ServiceC"), ServiceC.class, Service.class); @SuppressWarnings("resource") - AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); aux.start(); Map meta = aux.getMetaData(); @@ -244,7 +245,7 @@ public void testCustomizedAuxServiceClassPath() throws Exception { conf.set(String.format( YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, "ServiceC"), systemClasses); - aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); aux.start(); meta = aux.getMetaData(); @@ -282,7 +283,7 @@ public void testAuxEventDispatch() { ServiceB.class, Service.class); conf.setInt("A.expected.init", 1); conf.setInt("B.expected.stop", 1); - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); aux.start(); @@ -346,7 +347,7 @@ public void testAuxServices() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); int latch = 1; @@ -379,7 +380,7 @@ public void testAuxServicesMeta() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); int latch = 1; @@ -416,7 +417,7 @@ public void testAuxUnexpectedStop() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); aux.start(); @@ -429,7 +430,7 @@ public void testAuxUnexpectedStop() { @Test public void testValidAuxServiceName() { - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); Configuration conf = new Configuration(); conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"}); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"), @@ -443,7 +444,7 @@ public void testValidAuxServiceName() { } //Test bad auxService Name - final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"}); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"), ServiceA.class, Service.class); @@ -469,7 +470,7 @@ public void testAuxServiceRecoverySetup() throws IOException { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), RecoverableServiceB.class, Service.class); try { - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); Assert.assertEquals(2, aux.getServices().size()); File auxStorageDir = new File(TEST_DIR,