From b1b50b8bdebb72740b6ea75cd400d0661f52aa58 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Thu, 31 Jan 2019 16:00:24 +0530 Subject: [PATCH] YARN-9208 --- .../distributedshell/ApplicationMaster.java | 34 +++--- .../yarn/applications/distributedshell/Client.java | 120 +++++++++++++++------ .../distributedshell/TestDistributedShell.java | 61 +++++++++++ 3 files changed, 167 insertions(+), 48 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3b58961..4633013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -54,6 +54,7 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; @@ -322,7 +323,9 @@ private int containrRetryInterval = 0; private long containerFailuresValidityInterval = -1; - private List localizableFiles = new ArrayList<>(); + private List> localizableFiles = + new ArrayList<>(); + public static final String LOCALIZE_SEPARATOR = "="; // Timeline domain ID private String domainId = null; @@ -503,7 +506,10 @@ public boolean init(String[] args) throws ParseException, IOException { + " application attempt fails and these containers will be " + "retrieved by" + " the new application attempt "); - opts.addOption("localized_files", true, "List of localized files"); + opts.addOption("localized_files", true, "List of localized files " + + "\"FileName1=VisibilityType,FileName2=VisibilityType,,\" " + + "where VisibilityType is any one of PRIVATE, PUBLIC and " + + "default APPLICATION "); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -714,14 +720,15 @@ public boolean init(String[] args) throws ParseException, IOException { if (cliParser.hasOption("localized_files")) { String localizedFilesArg = cliParser.getOptionValue("localized_files"); - if (localizedFilesArg.contains(",")) { - String[] files = localizedFilesArg.split(","); - localizableFiles = Arrays.asList(files); - } else { - localizableFiles.add(localizedFilesArg); + String[] files = localizedFilesArg.split(","); + for (String file : files) { + int i = file.lastIndexOf(LOCALIZE_SEPARATOR); + LocalResourceVisibility visibility = LocalResourceVisibility.valueOf( + file.substring(i+1)); + localizableFiles.add(Pair.of(new Path(file.substring(0, i)), + visibility)); } } - return true; } @@ -1460,19 +1467,16 @@ public void run() { throw new UncheckedIOException("Cannot get FileSystem", e); } - localizableFiles.stream().forEach(fileName -> { + localizableFiles.stream().forEach(localizableFile -> { try { - String relativePath = - getRelativePath(appName, appId.toString(), fileName); - Path dst = - new Path(fs.getHomeDirectory(), relativePath); + Path dst = localizableFile.getLeft(); FileStatus fileStatus = fs.getFileStatus(dst); LocalResource localRes = LocalResource.newInstance( URL.fromURI(dst.toUri()), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + LocalResourceType.FILE, localizableFile.getRight(), fileStatus.getLen(), fileStatus.getModificationTime()); LOG.info("Setting up file for localization: " + dst); - localResources.put(fileName, localRes); + localResources.put(dst.getName(), localRes); } catch (IOException e) { throw new UncheckedIOException( "Error during localization setup", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 369d94b..8bda418 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -32,6 +32,8 @@ import java.util.Vector; import java.util.Arrays; import java.util.Base64; +import java.net.URI; +import java.net.URISyntaxException; import com.google.common.base.Joiner; @@ -49,8 +51,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -801,31 +805,36 @@ public boolean run() throws IOException, YarnException { // will set up localization later. StringBuilder localizableFiles = new StringBuilder(); filesToLocalize.stream().forEach(path -> { - File f = new File(path); - - if (!f.exists()) { - throw new UncheckedIOException( - new IOException(path + " does not exist")); - } - - if (!f.canRead()) { - throw new UncheckedIOException( - new IOException(path + " cannot be read")); - } - - if (f.isDirectory()) { - throw new UncheckedIOException( - new IOException(path + " is a directory")); - } - try { - String fileName = f.getName(); - uploadFile(fs, path, fileName, appId.toString()); - if (localizableFiles.length() == 0) { - localizableFiles.append(fileName); - } else { - localizableFiles.append(",").append(fileName); + URI uri = resolveURI(path); + Path src = new Path(uri); + FileSystem srcFs = FileSystem.get(uri, conf); + if (!srcFs.exists(src)) { + throw new UncheckedIOException( + new IOException(path + " does not exist")); + } + if (srcFs.getFileStatus(src).isDirectory()) { + throw new UncheckedIOException( + new IOException(path + " is a directory")); + } + if (!srcFs.getScheme().equals(fs.getScheme())) { + Path dst = new Path(ApplicationMaster.getRelativePath(appName, + appId.toString(), "")); + FileUtil.copy(srcFs, src, fs, dst, false, true, conf); + src = new Path(dst, src.getName()); } + + String visibility = isPublic(conf, src.toUri()) ? + LocalResourceVisibility.PUBLIC.name() : + LocalResourceVisibility.PRIVATE.name(); + if (localizableFiles.length() != 0) { + localizableFiles.append(","); + } + localizableFiles.append(src.toString()); + localizableFiles.append(ApplicationMaster.LOCALIZE_SEPARATOR); + localizableFiles.append(visibility); + } catch (URISyntaxException e) { + throw new UncheckedIOException(new IOException("Invalid resource", e)); } catch (IOException e) { throw new UncheckedIOException("Cannot upload file: " + path, e); } @@ -1171,16 +1180,6 @@ private void addToLocalResources(FileSystem fs, String fileSrcPath, localResources.put(fileDstPath, scRsrc); } - private void uploadFile(FileSystem fs, String fileSrcPath, - String fileDstPath, String appId) throws IOException { - String relativePath = - ApplicationMaster.getRelativePath(appName, appId, fileDstPath); - Path dst = - new Path(fs.getHomeDirectory(), relativePath); - LOG.info("Uploading file: " + fileSrcPath + " to " + dst); - fs.copyFromLocalFile(new Path(fileSrcPath), dst); - } - private void prepareTimelineDomain() { TimelineClient timelineClient = null; if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, @@ -1329,4 +1328,59 @@ private void validateResourceTypes(Iterable resourceNames, } return resources; } + + private URI resolveURI(String path) throws URISyntaxException { + URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } + return new File(path).getAbsoluteFile().toURI(); + } + + public static boolean isPublic(Configuration conf, URI uri) + throws IOException { + FileSystem fs = FileSystem.get(uri, conf); + Path current = new Path(uri.getPath()); + Map statCache = new HashMap<>(); + + // the leaf level file should be readable by others + if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { + return false; + } + return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); + } + + private static boolean ancestorsHaveExecutePermissions(FileSystem fs, + Path path, Map statCache) throws IOException { + Path current = path; + while (current != null) { + //the subdirs in the path should have execute permissions for others + if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { + return false; + } + current = current.getParent(); + } + return true; + } + + public static boolean checkPermissionOfOther(FileSystem fs, Path path, + FsAction action, Map statCache) throws IOException { + FileStatus status = getFileStatus(fs, path.toUri(), statCache); + if (!status.isEncrypted()) { + FsAction otherAction = status.getPermission().getOtherAction(); + return otherAction.implies(action); + } + return false; + } + + private static FileStatus getFileStatus(FileSystem fs, URI uri, + Map statCache) throws IOException { + FileStatus stat = statCache.get(uri); + if(stat == null) { + stat = fs.getFileStatus(new Path(uri)); + statCache.put(uri, stat); + } + return stat; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 80c1e20..1a0a10d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.applications.distributedshell; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -34,9 +35,13 @@ import java.net.InetAddress; import java.net.URI; import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.stream.Collectors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.cli.MissingArgumentException; @@ -46,7 +51,9 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -1694,4 +1701,58 @@ public void testDistributedShellWithNonExistentFileLocalization() client.init(args); client.run(); } + + @Test + public void testLocalizedFilesVisibility() throws Exception { + File localizeFileA = new File("target/a.txt"); + File localizeFileB = new File("target/b.txt"); + try { + Files.copy(new File("./src/test/resources/a.txt").toPath(), + localizeFileA.toPath()); + Files.copy(new File("./src/test/resources/b.txt").toPath(), + localizeFileB.toPath()); + FileUtil.chmod(localizeFileA.getAbsolutePath(), "750"); + FileUtil.chmod(localizeFileB.getAbsolutePath(), "755"); + + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--localize_files", + localizeFileA.getAbsolutePath()+","+localizeFileB.getAbsolutePath() + }; + Configuration config = new Configuration(yarnCluster.getConfig()); + Client client = new Client(config); + client.init(args); + client.run(); + + FileSystem fs1 = FileSystem.get(config); + File localizedFileA = getLocalizedFilePath( + "target/TestDistributedShell", "a.txt"); + assertFalse(Client.checkPermissionOfOther(fs1, new Path(localizedFileA + .getPath()), FsAction.READ, new HashMap<>())); + File localizedFileB = getLocalizedFilePath( + "target/TestDistributedShell", "b.txt"); + assertTrue(Client.checkPermissionOfOther(fs1, new Path(localizedFileB + .getPath()), FsAction.READ, new HashMap<>())); + } finally { + localizeFileA.delete(); + localizeFileB.delete(); + } + } + + public File getLocalizedFilePath(String searchPath, String fileName) + throws IOException { + List localizedFilePath = Files.walk( + Paths.get(searchPath)).filter((f) -> Files.isRegularFile(f) && + f.endsWith(fileName)).collect(Collectors.toList()); + if (!localizedFilePath.isEmpty()) { + return localizedFilePath.get(0).toFile(); + } + return null; + } + } -- 2.7.4 (Apple Git-66)