diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1f62bbd..cefbe1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2088,6 +2088,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_AUX_SERVICES_CLASSPATH = NM_AUX_SERVICES + ".%s.classpath"; + public static final String NM_AUX_SERVICE_REMOTE_CLASSPATH = + NM_AUX_SERVICES + ".%s.remote-classpath"; + public static final String NM_AUX_SERVICES_SYSTEM_CLASSES = NM_AUX_SERVICES + ".%s.system-classes"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 57cca50..44f6b37 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -29,27 +31,42 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; - +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.util.FSDownload; import com.google.common.base.Preconditions; public class AuxServices extends AbstractService implements ServiceStateChangeListener, EventHandler { + public static final String NM_AUX_SERVICE_DIR = "nmAuxService"; + public static final FsPermission NM_AUX_SERVICE_DIR_PERM = + new FsPermission((short) 0700); + static final String STATE_STORE_ROOT_NAME = "nm-aux-services"; private static final Logger LOG = @@ -58,16 +75,21 @@ protected final Map serviceMap; protected final Map serviceMetaData; private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; + private final LocalDirsHandlerService dirsHandler; + private final UserGroupInformation userUGI; private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); - public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { + public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler, + Context nmContext) { super(AuxServices.class.getName()); serviceMap = Collections.synchronizedMap(new HashMap()); serviceMetaData = Collections.synchronizedMap(new HashMap()); this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + this.dirsHandler = nmContext.getLocalDirsHandler(); + this.userUGI = getRemoteUgi(); // Obtain services from configuration in init() } @@ -125,15 +147,64 @@ public void serviceInit(Configuration conf) throws Exception { String classKey = String.format( YarnConfiguration.NM_AUX_SERVICE_FMT, sName); String className = conf.get(classKey); - final String appClassPath = conf.get(String.format( + final String appLocalClassPath = conf.get(String.format( YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName)); + final String appRemoteClassPath = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName)); AuxiliaryService s = null; - boolean useCustomerClassLoader = appClassPath != null - && !appClassPath.isEmpty() && className != null - && !className.isEmpty(); + boolean useCustomerClassLoader = ((appLocalClassPath != null + && !appLocalClassPath.isEmpty()) || + (appRemoteClassPath != null && !appRemoteClassPath.isEmpty())) + && className != null && !className.isEmpty(); if (useCustomerClassLoader) { - s = AuxiliaryServiceWithCustomClassLoader.getInstance( - conf, className, appClassPath); + // load AuxiliaryService from local class path + if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) { + s = AuxiliaryServiceWithCustomClassLoader.getInstance( + conf, className, appLocalClassPath); + } else { + // load AuxiliaryService from remote class path + if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) { + throw new YarnRuntimeException("The aux serivce:" + sName + + " has configured local classpath:" + appLocalClassPath + + " and remote classpath:" + appRemoteClassPath + + ". Only one of them should be configured."); + } + FileContext localLFS = getLocalFileContext(conf); + // download remote class objects to NM_AUX_SERVICE_DIR + Path downloadDest = new Path(dirsHandler.getLocalPathForWrite( + "." + Path.SEPARATOR + NM_AUX_SERVICE_DIR), className); + Path src = new Path(appRemoteClassPath); + FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf); + FileStatus scFileStatus = remoteLFS.getFileStatus(src); + if (!scFileStatus.getOwner().equals( + this.userUGI.getShortUserName())) { + throw new YarnRuntimeException("The remote jarfile owner:" + + scFileStatus.getOwner() + " is not the same as the NM user:" + + this.userUGI.getShortUserName() + "."); + } + if ((scFileStatus.getPermission().toShort() & 0022) != 0) { + throw new YarnRuntimeException("The remote jarfile should not " + + "be writable by group or others. " + + "The current Permission is " + + scFileStatus.getPermission().toShort()); + } + LocalResource scRsrc = LocalResource.newInstance( + URL.fromURI(src.toUri()), + LocalResourceType.ARCHIVE, LocalResourceVisibility.PRIVATE, + scFileStatus.getLen(), scFileStatus.getModificationTime()); + FSDownload download = new FSDownload(localLFS, null, conf, + downloadDest, scRsrc, null); + Path dest = null; + try { + dest = new Path(download.call() + Path.SEPARATOR + "*"); + } catch (Exception ex) { + throw new YarnRuntimeException( + "Exception happend while downloading files for aux-service:" + + sName + " and remote-file-path:" + src + "."); + } + s = AuxiliaryServiceWithCustomClassLoader.getInstance( + conf, className, dest.toString()); + } LOG.info("The aux service:" + sName + " are using the custom classloader"); } else { @@ -289,4 +360,33 @@ private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, : "The auxService name is " + service.getName()) + " and it got an error at event: " + eventType, th); } + + FileContext getLocalFileContext(Configuration conf) { + try { + return FileContext.getLocalFSFileContext(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access local fs"); + } + } + + FileContext getRemoteFileContext(final URI path, Configuration conf) { + try { + return FileContext.getFileContext(path, conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access remote fs"); + } + } + + private UserGroupInformation getRemoteUgi() { + UserGroupInformation remoteUgi; + try { + remoteUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + String msg = "Cannot obtain the user-name. Got exception: " + + StringUtils.stringifyException(e); + LOG.warn(msg); + throw new YarnRuntimeException(msg); + } + return remoteUgi; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 6b4d517..18b9cd3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -253,7 +253,8 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = new AuxiliaryLocalPathHandlerImpl(dirsHandler); // Start configurable services - auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler); + auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler, + this.context); auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 29fc747..3123a92 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; @@ -1489,11 +1490,13 @@ private void cleanUpLocalDir(FileContext lfs, DeletionService del, String localDir) { long currentTimeStamp = System.currentTimeMillis(); renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE, - currentTimeStamp); + currentTimeStamp); renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE, - currentTimeStamp); + currentTimeStamp); renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR, - currentTimeStamp); + currentTimeStamp); + renameLocalDir(lfs, localDir, AuxServices.NM_AUX_SERVICE_DIR, + currentTimeStamp); try { deleteLocalDir(lfs, del, localDir); } catch (IOException e) { @@ -1532,7 +1535,11 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*") || status.getPath().getName() - .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) { + .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*") + || + status.getPath().getName() + .matches(".*" + AuxServices.NM_AUX_SERVICE_DIR + + "_DEL_.*")) { FileDeletionTask deletionTask = new FileDeletionTask(del, null, status.getPath(), null); del.delete(deletionTask); @@ -1640,10 +1647,13 @@ private boolean checkLocalDir(String localDir) { Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE); Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE); Path sysDir = new Path(localDir, NM_PRIVATE_DIR); + Path auxSysDir = new Path(localDir, AuxServices.NM_AUX_SERVICE_DIR); localDirPathFsPermissionsMap.put(userDir, defaultPermission); localDirPathFsPermissionsMap.put(fileDir, defaultPermission); localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission); + localDirPathFsPermissionsMap.put(auxSysDir, + AuxServices.NM_AUX_SERVICE_DIR_PERM); return localDirPathFsPermissionsMap; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 6d12f8c..e458717 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -83,6 +83,7 @@ TestAuxServices.class.getName()); private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER = Mockito.mock(AuxiliaryLocalPathHandler.class); + private final static Context MOCK_CONTEXT = Mockito.mock(Context.class); static class LightService extends AuxiliaryService implements Service { @@ -202,7 +203,7 @@ public void testCustomizedAuxServiceClassPath() throws Exception { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ServiceC"), ServiceC.class, Service.class); @SuppressWarnings("resource") - AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); aux.start(); Map meta = aux.getMetaData(); @@ -244,7 +245,7 @@ public void testCustomizedAuxServiceClassPath() throws Exception { conf.set(String.format( YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES, "ServiceC"), systemClasses); - aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT); aux.init(conf); aux.start(); meta = aux.getMetaData(); @@ -282,7 +283,8 @@ public void testAuxEventDispatch() { ServiceB.class, Service.class); conf.setInt("A.expected.init", 1); conf.setInt("B.expected.stop", 1); - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, + MOCK_CONTEXT); aux.init(conf); aux.start(); @@ -346,7 +348,8 @@ public void testAuxServices() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, + MOCK_CONTEXT); aux.init(conf); int latch = 1; @@ -379,7 +382,8 @@ public void testAuxServicesMeta() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, + MOCK_CONTEXT); aux.init(conf); int latch = 1; @@ -416,7 +420,8 @@ public void testAuxUnexpectedStop() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, + MOCK_CONTEXT); aux.init(conf); aux.start(); @@ -429,7 +434,8 @@ public void testAuxUnexpectedStop() { @Test public void testValidAuxServiceName() { - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, + MOCK_CONTEXT); Configuration conf = new Configuration(); conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"}); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"), @@ -443,7 +449,8 @@ public void testValidAuxServiceName() { } //Test bad auxService Name - final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER, + MOCK_CONTEXT); conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"}); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"), ServiceA.class, Service.class); @@ -469,7 +476,8 @@ public void testAuxServiceRecoverySetup() throws IOException { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), RecoverableServiceB.class, Service.class); try { - final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER); + final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, + MOCK_CONTEXT); aux.init(conf); Assert.assertEquals(2, aux.getServices().size()); File auxStorageDir = new File(TEST_DIR,