diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7c25be3..49e1ce4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1797,6 +1797,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_AUX_SERVICES_CLASSPATH = NM_AUX_SERVICES + ".%s.classpath"; + public static final String NM_AUX_SERVICE_REMOTE_CLASSPATH = + NM_AUX_SERVICES + ".%s.remote-classpath"; + public static final String NM_AUX_SERVICES_SYSTEM_CLASSES = NM_AUX_SERVICES + ".%s.system-classes"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 57cca50..7cbad6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -29,27 +31,44 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; - +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.util.FSDownload; import com.google.common.base.Preconditions; public class AuxServices extends AbstractService implements ServiceStateChangeListener, EventHandler { + public static final String NM_AUX_SERVICE_DIR = "nmAuxService"; + public static final FsPermission NM_AUX_SERVICE_DIR_PERM = + new FsPermission((short) 0700); + static final String STATE_STORE_ROOT_NAME = "nm-aux-services"; private static final Logger LOG = @@ -58,16 +77,21 @@ protected final Map serviceMap; protected final Map serviceMetaData; private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; + private final LocalDirsHandlerService dirsHandler; + private final UserGroupInformation userUGI; private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); - public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { + public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler, + Context nmContext) throws YarnException { super(AuxServices.class.getName()); serviceMap = Collections.synchronizedMap(new HashMap()); serviceMetaData = Collections.synchronizedMap(new HashMap()); this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + this.dirsHandler = nmContext.getLocalDirsHandler(); + this.userUGI = getRemoteUgi(); // Obtain services from configuration in init() } @@ -125,15 +149,62 @@ public void serviceInit(Configuration conf) throws Exception { String classKey = String.format( YarnConfiguration.NM_AUX_SERVICE_FMT, sName); String className = conf.get(classKey); - final String appClassPath = conf.get(String.format( + final String appLocalClassPath = conf.get(String.format( YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName)); + final String appRemoteClassPath = conf.get(String.format( + YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName)); AuxiliaryService s = null; - boolean useCustomerClassLoader = appClassPath != null - && !appClassPath.isEmpty() && className != null - && !className.isEmpty(); + boolean useCustomerClassLoader = ((appLocalClassPath != null + && !appLocalClassPath.isEmpty()) || + (appRemoteClassPath != null && !appRemoteClassPath.isEmpty())) + && className != null && !className.isEmpty(); if (useCustomerClassLoader) { - s = AuxiliaryServiceWithCustomClassLoader.getInstance( - conf, className, appClassPath); + // load AuxiliaryService from local class path + if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) { + s = AuxiliaryServiceWithCustomClassLoader.getInstance( + conf, className, appLocalClassPath); + } else { + // load AuxiliaryService from remote class path + if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) { + throw new YarnRuntimeException("The aux serivce:" + sName + + " has configured local classpath:" + appLocalClassPath + + " and remote classpath:" + appRemoteClassPath + + ". Only one of them should be configured."); + } + FileContext localLFS = getLocalFileContext(conf); + // download remote class objects to NM_AUX_SERVICE_DIR + Path downloadDest = new Path(dirsHandler.getLocalPathForWrite( + "." + Path.SEPARATOR + NM_AUX_SERVICE_DIR), className); + Path src = new Path(appRemoteClassPath); + FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf); + FileStatus scFileStatus = remoteLFS.getFileStatus(src); + if (scFileStatus.getOwner() != this.userUGI.getShortUserName()) { + throw new YarnRuntimeException("The remote jarfile owner:" + + scFileStatus.getOwner() + " is not the same as the NM user:" + + this.userUGI.getShortUserName() + "."); + } + if ((scFileStatus.getPermission().toShort() & 0022) != 0) { + throw new YarnRuntimeException("The remote jarfile should not " + + "be writable by group or others. The current Permission is " + + + scFileStatus.getPermission().toShort()); + } + LocalResource scRsrc = LocalResource.newInstance( + URL.fromURI(src.toUri()), + LocalResourceType.ARCHIVE, LocalResourceVisibility.PRIVATE, + scFileStatus.getLen(), scFileStatus.getModificationTime()); + FSDownload download = new FSDownload(localLFS, null, conf, + downloadDest, scRsrc, null); + Path dest = null; + try { + dest = new Path(download.call() + Path.SEPARATOR + "*"); + } catch (Exception ex) { + throw new YarnRuntimeException( + "Exception happend while downloading files for aux-service:" + + sName + " and remote-file-path:" + src + "."); + } + s = AuxiliaryServiceWithCustomClassLoader.getInstance( + conf, className, dest.toString()); + } LOG.info("The aux service:" + sName + " are using the custom classloader"); } else { @@ -289,4 +360,34 @@ private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, : "The auxService name is " + service.getName()) + " and it got an error at event: " + eventType, th); } + + FileContext getLocalFileContext(Configuration conf) { + try { + return FileContext.getLocalFSFileContext(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access local fs"); + } + } + + FileContext getRemoteFileContext(final URI path, Configuration conf) { + try { + return FileContext.getFileContext(path, conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access remote fs"); + } + } + + private UserGroupInformation getRemoteUgi() + throws YarnException { + UserGroupInformation remoteUgi; + try { + remoteUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + String msg = "Cannot obtain the user-name. Got exception: " + + StringUtils.stringifyException(e); + LOG.warn(msg); + throw RPCUtil.getRemoteException(msg); + } + return remoteUgi; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index de3db6e..1c4b082 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -253,7 +253,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = new AuxiliaryLocalPathHandlerImpl(dirsHandler); // Start configurable services - auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler); + auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler, this.context); auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 29fc747..32fa376 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; @@ -1494,6 +1495,8 @@ private void cleanUpLocalDir(FileContext lfs, DeletionService del, currentTimeStamp); renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR, currentTimeStamp); + renameLocalDir(lfs, localDir, AuxServices.NM_AUX_SERVICE_DIR, + currentTimeStamp); try { deleteLocalDir(lfs, del, localDir); } catch (IOException e) { @@ -1532,7 +1535,11 @@ private void deleteLocalDir(FileContext lfs, DeletionService del, .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*") || status.getPath().getName() - .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) { + .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*") + || + status.getPath().getName() + .matches(".*" + AuxServices.NM_AUX_SERVICE_DIR + + "_DEL_.*")) { FileDeletionTask deletionTask = new FileDeletionTask(del, null, status.getPath(), null); del.delete(deletionTask); @@ -1640,10 +1647,13 @@ private boolean checkLocalDir(String localDir) { Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE); Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE); Path sysDir = new Path(localDir, NM_PRIVATE_DIR); + Path auxSysDir = new Path(localDir, AuxServices.NM_AUX_SERVICE_DIR); localDirPathFsPermissionsMap.put(userDir, defaultPermission); localDirPathFsPermissionsMap.put(fileDir, defaultPermission); localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission); + localDirPathFsPermissionsMap.put(auxSysDir, + AuxServices.NM_AUX_SERVICE_DIR_PERM); return localDirPathFsPermissionsMap; } }