From f9ec5bb21b4fbdc6c9b65c301feb854e8c681482 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Wed, 30 Jan 2019 00:07:50 +0530 Subject: [PATCH] YARN-9208 --- .../distributedshell/ApplicationMaster.java | 59 +++++++++-- .../yarn/applications/distributedshell/Client.java | 109 +++++++++++++++++---- .../distributedshell/TestDistributedShell.java | 61 ++++++++++++ 3 files changed, 198 insertions(+), 31 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3b58961..47d73da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -54,6 +54,7 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; @@ -322,7 +323,8 @@ private int containrRetryInterval = 0; private long containerFailuresValidityInterval = -1; - private List localizableFiles = new ArrayList<>(); + private List> localizableFiles = + new ArrayList<>(); // Timeline domain ID private String domainId = null; @@ -504,6 +506,8 @@ public boolean init(String[] args) throws ParseException, IOException { + "retrieved by" + " the new application attempt "); opts.addOption("localized_files", true, "List of localized files"); + opts.addOption("localized_files_visibilities", true, + "List of Visibilities of localized files"); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -712,19 +716,57 @@ public boolean init(String[] args) throws ParseException, IOException { LOG.warn("Timeline service is not enabled"); } + List fileNames = new ArrayList<>(); + List visibilities = new ArrayList<>(); + if (cliParser.hasOption("localized_files")) { String localizedFilesArg = cliParser.getOptionValue("localized_files"); if (localizedFilesArg.contains(",")) { String[] files = localizedFilesArg.split(","); - localizableFiles = Arrays.asList(files); + fileNames = Arrays.asList(files); } else { - localizableFiles.add(localizedFilesArg); + fileNames.add(localizedFilesArg); + } + } + if (cliParser.hasOption("localized_files_visibilities")) { + String visibilitiesArg = cliParser.getOptionValue( + "localized_files_visibilities"); + if (visibilitiesArg.contains(",")) { + String[] visibilityArray = visibilitiesArg.split(","); + for (String visibility: visibilityArray) { + visibilities.add(getLocalResourceVisibility(visibility)); + } + } else { + visibilities.add(getLocalResourceVisibility(visibilitiesArg)); + } + } + + // Default Visibilty is set to APPLICATION type + int size = fileNames.size(); + if (size == visibilities.size()) { + for (int i=0; i pSpecs = @@ -1460,19 +1502,16 @@ public void run() { throw new UncheckedIOException("Cannot get FileSystem", e); } - localizableFiles.stream().forEach(fileName -> { + localizableFiles.stream().forEach(localizableFile -> { try { - String relativePath = - getRelativePath(appName, appId.toString(), fileName); - Path dst = - new Path(fs.getHomeDirectory(), relativePath); + Path dst = localizableFile.getLeft(); FileStatus fileStatus = fs.getFileStatus(dst); LocalResource localRes = LocalResource.newInstance( URL.fromURI(dst.toUri()), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + LocalResourceType.FILE, localizableFile.getRight(), fileStatus.getLen(), fileStatus.getModificationTime()); LOG.info("Setting up file for localization: " + dst); - localResources.put(fileName, localRes); + localResources.put(dst.getName(), localRes); } catch (IOException e) { throw new UncheckedIOException( "Error during localization setup", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 369d94b..f05f1d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -32,6 +32,8 @@ import java.util.Vector; import java.util.Arrays; import java.util.Base64; +import java.net.URI; +import java.net.URISyntaxException; import com.google.common.base.Joiner; @@ -49,8 +51,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -800,32 +804,37 @@ public boolean run() throws IOException, YarnException { // Here we just upload the files, the AM // will set up localization later. StringBuilder localizableFiles = new StringBuilder(); + StringBuilder visibilities = new StringBuilder(); filesToLocalize.stream().forEach(path -> { - File f = new File(path); - - if (!f.exists()) { - throw new UncheckedIOException( - new IOException(path + " does not exist")); - } - - if (!f.canRead()) { - throw new UncheckedIOException( - new IOException(path + " cannot be read")); - } - - if (f.isDirectory()) { - throw new UncheckedIOException( - new IOException(path + " is a directory")); - } - try { - String fileName = f.getName(); - uploadFile(fs, path, fileName, appId.toString()); + URI uri = resolveURI(path); + Path src = new Path(uri); + FileSystem srcFs = FileSystem.get(uri, conf); + if (!srcFs.exists(src)) { + throw new UncheckedIOException( + new IOException(path + " does not exist")); + } + if (srcFs.getFileStatus(src).isDirectory()) { + throw new UncheckedIOException( + new IOException(path + " is a directory")); + } + if (!srcFs.getScheme().equals(fs.getScheme())) { + Path dst = new Path(ApplicationMaster.getRelativePath(appName, + appId.toString(), "")); + FileUtil.copy(srcFs, src, fs, dst, false, true, conf); + src = new Path(dst, src.getName()); + } + + String visibility = String.valueOf(isPublic(conf, src.toUri())); if (localizableFiles.length() == 0) { - localizableFiles.append(fileName); + localizableFiles.append(src.toString()); + visibilities.append(visibility); } else { - localizableFiles.append(",").append(fileName); + localizableFiles.append(",").append(src.toString()); + visibilities.append(",").append(visibility); } + } catch (URISyntaxException e) { + throw new UncheckedIOException(new IOException("Invalid resource", e)); } catch (IOException e) { throw new UncheckedIOException("Cannot upload file: " + path, e); } @@ -966,6 +975,9 @@ public boolean run() throws IOException, YarnException { if (localizableFiles.length() > 0) { vargs.add("--localized_files " + localizableFiles.toString()); } + if (visibilities.length() > 0) { + vargs.add("--localized_files_visibilities " + visibilities.toString()); + } vargs.add("--appname " + appName); vargs.addAll(containerRetryOptions); @@ -1329,4 +1341,59 @@ private void validateResourceTypes(Iterable resourceNames, } return resources; } + + private URI resolveURI(String path) throws URISyntaxException { + URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } + return new File(path).getAbsoluteFile().toURI(); + } + + public static boolean isPublic(Configuration conf, URI uri) + throws IOException { + FileSystem fs = FileSystem.get(uri, conf); + Path current = new Path(uri.getPath()); + Map statCache = new HashMap<>(); + + // the leaf level file should be readable by others + if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { + return false; + } + return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); + } + + private static boolean ancestorsHaveExecutePermissions(FileSystem fs, + Path path, Map statCache) throws IOException { + Path current = path; + while (current != null) { + //the subdirs in the path should have execute permissions for others + if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { + return false; + } + current = current.getParent(); + } + return true; + } + + public static boolean checkPermissionOfOther(FileSystem fs, Path path, + FsAction action, Map statCache) throws IOException { + FileStatus status = getFileStatus(fs, path.toUri(), statCache); + if (!status.isEncrypted()) { + FsAction otherAction = status.getPermission().getOtherAction(); + return otherAction.implies(action); + } + return false; + } + + private static FileStatus getFileStatus(FileSystem fs, URI uri, + Map statCache) throws IOException { + FileStatus stat = statCache.get(uri); + if(stat == null) { + stat = fs.getFileStatus(new Path(uri)); + statCache.put(uri, stat); + } + return stat; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 80c1e20..1a0a10d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.applications.distributedshell; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -34,9 +35,13 @@ import java.net.InetAddress; import java.net.URI; import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.stream.Collectors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.cli.MissingArgumentException; @@ -46,7 +51,9 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -1694,4 +1701,58 @@ public void testDistributedShellWithNonExistentFileLocalization() client.init(args); client.run(); } + + @Test + public void testLocalizedFilesVisibility() throws Exception { + File localizeFileA = new File("target/a.txt"); + File localizeFileB = new File("target/b.txt"); + try { + Files.copy(new File("./src/test/resources/a.txt").toPath(), + localizeFileA.toPath()); + Files.copy(new File("./src/test/resources/b.txt").toPath(), + localizeFileB.toPath()); + FileUtil.chmod(localizeFileA.getAbsolutePath(), "750"); + FileUtil.chmod(localizeFileB.getAbsolutePath(), "755"); + + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--localize_files", + localizeFileA.getAbsolutePath()+","+localizeFileB.getAbsolutePath() + }; + Configuration config = new Configuration(yarnCluster.getConfig()); + Client client = new Client(config); + client.init(args); + client.run(); + + FileSystem fs1 = FileSystem.get(config); + File localizedFileA = getLocalizedFilePath( + "target/TestDistributedShell", "a.txt"); + assertFalse(Client.checkPermissionOfOther(fs1, new Path(localizedFileA + .getPath()), FsAction.READ, new HashMap<>())); + File localizedFileB = getLocalizedFilePath( + "target/TestDistributedShell", "b.txt"); + assertTrue(Client.checkPermissionOfOther(fs1, new Path(localizedFileB + .getPath()), FsAction.READ, new HashMap<>())); + } finally { + localizeFileA.delete(); + localizeFileB.delete(); + } + } + + public File getLocalizedFilePath(String searchPath, String fileName) + throws IOException { + List localizedFilePath = Files.walk( + Paths.get(searchPath)).filter((f) -> Files.isRegularFile(f) && + f.endsWith(fileName)).collect(Collectors.toList()); + if (!localizedFilePath.isEmpty()) { + return localizedFilePath.get(0).toFile(); + } + return null; + } + } -- 2.7.4 (Apple Git-66)