From 7272c47e5569fbeb4db652e1a0b496da4c36249e Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Wed, 13 Feb 2019 09:36:45 +0530 Subject: [PATCH] YARN-9208 --- .../distributedshell/ApplicationMaster.java | 89 ++++++++++---- .../yarn/applications/distributedshell/Client.java | 129 +++++++++++++++------ .../distributedshell/TestDistributedShell.java | 105 +++++++++++++++++ 3 files changed, 267 insertions(+), 56 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 333e00c..e4c853d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -54,6 +54,7 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; @@ -69,6 +70,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -324,7 +326,14 @@ private int containrRetryInterval = 0; private long containerFailuresValidityInterval = -1; - private List localizableFiles = new ArrayList<>(); + private List> localizableFiles = + new ArrayList<>(); + public static final Character EQ = '='; + public static final Character COMMA_SEPARATOR = ','; + public static final Character BRACKET_START = '('; + public static final Character BRACKET_END = ')'; + public static final String ESCAPED_BRACKET_START = "\\("; + public static final String ESCAPED_BRACKET_END = "\\)"; // Timeline domain ID private String domainId = null; @@ -507,7 +516,11 @@ public boolean init(String[] args) throws ParseException, IOException { + " application attempt fails and these containers will be " + "retrieved by" + " the new application attempt "); - opts.addOption("localized_files", true, "List of localized files"); + opts.addOption("localized_files", true, "List of localized files " + + "\"(Visibility1=FileName1,FileName2,,)," + + "(Visibility2=FileName3,FileName4,,),,\"" + + " where Visibility is any one of PRIVATE, PUBLIC and " + + "default APPLICATION "); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -541,7 +554,7 @@ public boolean init(String[] args) throws ParseException, IOException { if (cliParser.hasOption("placement_spec")) { String placementSpec = cliParser.getOptionValue("placement_spec"); - String decodedSpec = getDecodedPlacementSpec(placementSpec); + String decodedSpec = getDecodedString(placementSpec); LOG.info("Placement Spec received [{}]", decodedSpec); this.numTotalContainers = 0; @@ -721,17 +734,55 @@ public boolean init(String[] args) throws ParseException, IOException { if (cliParser.hasOption("localized_files")) { String localizedFilesArg = cliParser.getOptionValue("localized_files"); - if (localizedFilesArg.contains(",")) { - String[] files = localizedFilesArg.split(","); - localizableFiles = Arrays.asList(files); - } else { - localizableFiles.add(localizedFilesArg); + try { + String encodedLocalizedFiles = getDecodedString(localizedFilesArg); + LOG.info("Localized Files received {}", encodedLocalizedFiles); + localizableFiles = parseLocalizedFiles(encodedLocalizedFiles); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalArgumentException("Invalid Localized Files"); } } - return true; } + // (Visibility1=FileName1,FileName2,,),(Visibility2=FileName3,FileName4,,),, + public static String constructLocalizedFiles(Map> filesMap) { + StringBuilder str = new StringBuilder(); + for (Map.Entry> entry : + filesMap.entrySet()) { + Collection fileList = entry.getValue(); + String[] fileNames = fileList.toArray(new String[fileList.size()]); + str.append(BRACKET_START).append(entry.getKey().name()) + .append(EQ) + .append(StringUtils.arrayToString(fileNames)) + .append(BRACKET_END).append(COMMA_SEPARATOR); + } + return (str.length() > 0) ? str.substring(0, str.length()-1) : + str.toString(); + } + + public static List> parseLocalizedFiles( + String str) throws Exception { + List> pairList = new ArrayList<>(); + str = str.substring(1, str.length()-1); + String delimiter = ESCAPED_BRACKET_END + COMMA_SEPARATOR + + ESCAPED_BRACKET_START; + String[] splits = str.split(delimiter); + for (String s : splits) { + int i = s.indexOf(String.valueOf(EQ)); + LocalResourceVisibility visibility = LocalResourceVisibility.valueOf( + s.substring(0, i)); + String[] fileNames = s.substring(i + 1). + split(String.valueOf(COMMA_SEPARATOR)); + for (String fileName : fileNames) { + pairList.add(Pair.of(new Path(fileName), visibility)); + } + } + return pairList; + } + private void parsePlacementSpecs(String decodedSpec, int globalNumOfContainers) { Map pSpecs = @@ -750,13 +801,12 @@ private void parsePlacementSpecs(String decodedSpec, } } - private String getDecodedPlacementSpec(String placementSpecifications) { + private String getDecodedString(String encodeStr) { Base64.Decoder decoder = Base64.getDecoder(); byte[] decodedBytes = decoder.decode( - placementSpecifications.getBytes(StandardCharsets.UTF_8)); - String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8); - LOG.info("Decode placement spec: " + decodedSpec); - return decodedSpec; + encodeStr.getBytes(StandardCharsets.UTF_8)); + String decodedStr = new String(decodedBytes, StandardCharsets.UTF_8); + return decodedStr; } /** @@ -1467,19 +1517,16 @@ public void run() { throw new UncheckedIOException("Cannot get FileSystem", e); } - localizableFiles.stream().forEach(fileName -> { + localizableFiles.stream().forEach(localizableFile -> { try { - String relativePath = - getRelativePath(appName, appId.toString(), fileName); - Path dst = - new Path(fs.getHomeDirectory(), relativePath); + Path dst = localizableFile.getLeft(); FileStatus fileStatus = fs.getFileStatus(dst); LocalResource localRes = LocalResource.newInstance( URL.fromURI(dst.toUri()), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + LocalResourceType.FILE, localizableFile.getRight(), fileStatus.getLen(), fileStatus.getModificationTime()); LOG.info("Setting up file for localization: " + dst); - localResources.put(fileName, localRes); + localResources.put(dst.getName(), localRes); } catch (IOException e) { throw new UncheckedIOException( "Error during localization setup", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 257f590..ff560e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -32,6 +33,8 @@ import java.util.Vector; import java.util.Arrays; import java.util.Base64; +import java.net.URI; +import java.net.URISyntaxException; import com.google.common.base.Joiner; @@ -49,8 +52,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -812,37 +817,44 @@ public boolean run() throws IOException, YarnException { // Process local files for localization // Here we just upload the files, the AM // will set up localization later. - StringBuilder localizableFiles = new StringBuilder(); - filesToLocalize.stream().forEach(path -> { - File f = new File(path); - - if (!f.exists()) { - throw new UncheckedIOException( - new IOException(path + " does not exist")); - } - - if (!f.canRead()) { - throw new UncheckedIOException( - new IOException(path + " cannot be read")); - } - - if (f.isDirectory()) { - throw new UncheckedIOException( - new IOException(path + " is a directory")); - } + Map> localizableFilesMap = + new HashMap<>(); + filesToLocalize.stream().forEach(path -> { try { - String fileName = f.getName(); - uploadFile(fs, path, fileName, appId.toString()); - if (localizableFiles.length() == 0) { - localizableFiles.append(fileName); - } else { - localizableFiles.append(",").append(fileName); + URI uri = resolveURI(path); + Path src = new Path(uri); + FileSystem srcFs = FileSystem.get(uri, conf); + if (!srcFs.exists(src)) { + throw new UncheckedIOException( + new IOException(path + " does not exist")); + } + if (srcFs.getFileStatus(src).isDirectory()) { + throw new UncheckedIOException( + new IOException(path + " is a directory")); + } + if (!srcFs.getScheme().equals(fs.getScheme())) { + Path dst = new Path(ApplicationMaster.getRelativePath(appName, + appId.toString(), "")); + FileUtil.copy(srcFs, src, fs, dst, false, true, conf); + src = new Path(dst, src.getName()); + } + LocalResourceVisibility visibility = isPublic(conf, src.toUri()) ? + LocalResourceVisibility.PUBLIC : + LocalResourceVisibility.PRIVATE; + if (!localizableFilesMap.containsKey(visibility)) { + localizableFilesMap.put(visibility, new ArrayList()); } + localizableFilesMap.get(visibility).add(src.toString()); + } catch (URISyntaxException e) { + throw new UncheckedIOException( + new IOException("Invalid resource", e)); } catch (IOException e) { throw new UncheckedIOException("Cannot upload file: " + path, e); } }); + String localizableFiles = ApplicationMaster. + constructLocalizedFiles(localizableFilesMap); // The shell script has to be made available on the final container(s) // where it will be executed. @@ -980,7 +992,10 @@ public boolean run() throws IOException, YarnException { vargs.add("--debug"); } if (localizableFiles.length() > 0) { - vargs.add("--localized_files " + localizableFiles.toString()); + // Encode the string to avoid passing special chars via shell arguments. + String encodedLocalizedFiles = Base64.getEncoder().encodeToString( + localizableFiles.getBytes(StandardCharsets.UTF_8)); + vargs.add("--localized_files " + encodedLocalizedFiles); } vargs.add("--appname " + appName); @@ -1187,16 +1202,6 @@ private void addToLocalResources(FileSystem fs, String fileSrcPath, localResources.put(fileDstPath, scRsrc); } - private void uploadFile(FileSystem fs, String fileSrcPath, - String fileDstPath, String appId) throws IOException { - String relativePath = - ApplicationMaster.getRelativePath(appName, appId, fileDstPath); - Path dst = - new Path(fs.getHomeDirectory(), relativePath); - LOG.info("Uploading file: " + fileSrcPath + " to " + dst); - fs.copyFromLocalFile(new Path(fileSrcPath), dst); - } - private void prepareTimelineDomain() { TimelineClient timelineClient = null; if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, @@ -1228,6 +1233,60 @@ private void prepareTimelineDomain() { } } + private URI resolveURI(String path) throws URISyntaxException { + URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } + return new File(path).getAbsoluteFile().toURI(); + } + + public static boolean isPublic(Configuration conf, URI uri) + throws IOException { + FileSystem fs = FileSystem.get(uri, conf); + Path current = new Path(uri.getPath()); + Map statCache = new HashMap<>(); + + // the leaf level file should be readable by others + if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { + return false; + } + return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); + } + + private static boolean ancestorsHaveExecutePermissions(FileSystem fs, + Path path, Map statCache) throws IOException { + Path current = path; + while (current != null) { + // the subdirs in the path should have execute permissions for others + if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { + return false; + } + current = current.getParent(); + } + return true; + } + + public static boolean checkPermissionOfOther(FileSystem fs, Path path, + FsAction action, Map statCache) throws IOException { + FileStatus status = getFileStatus(fs, path.toUri(), statCache); + if (!status.isEncrypted()) { + FsAction otherAction = status.getPermission().getOtherAction(); + return otherAction.implies(action); + } + return false; + } + + private static FileStatus getFileStatus(FileSystem fs, URI uri, + Map statCache) throws IOException { + FileStatus stat = statCache.get(uri); + if (stat == null) { + stat = fs.getFileStatus(new Path(uri)); + statCache.put(uri, stat); + } + return stat; + } + private void setAMResourceCapability(ApplicationSubmissionContext appContext, Map profiles, List resourceTypes) throws IllegalArgumentException, IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 9252ee5..b886805 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.applications.distributedshell; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -36,18 +38,27 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.HashMap; import java.util.List; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.stream.Collectors; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -67,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; @@ -1809,4 +1821,97 @@ public void testDistributedShellWithNonExistentFileLocalization() client.init(args); client.run(); } + + @Test + public void testLocalizedFilesVisibility() throws Exception { + File localizeFileA = new File("target/a.txt"); + File localizeFileB = new File("target/b.txt"); + try { + Files.copy(new File("./src/test/resources/a.txt").toPath(), + localizeFileA.toPath()); + Files.copy(new File("./src/test/resources/b.txt").toPath(), + localizeFileB.toPath()); + FileUtil.chmod(localizeFileA.getAbsolutePath(), "750"); + FileUtil.chmod(localizeFileB.getAbsolutePath(), "755"); + + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--localize_files", + localizeFileA.getAbsolutePath()+","+localizeFileB.getAbsolutePath() + }; + Configuration config = new Configuration(yarnCluster.getConfig()); + Client client = new Client(config); + client.init(args); + client.run(); + + FileSystem fs1 = FileSystem.get(config); + File localizedFileA = getLocalizedFilePath( + "target/TestDistributedShell", "a.txt"); + assertNotNull("localizeFileA is not localized", localizedFileA); + assertFalse(Client.checkPermissionOfOther(fs1, new Path(localizedFileA + .getPath()), FsAction.READ, new HashMap<>())); + File localizedFileB = getLocalizedFilePath( + "target/TestDistributedShell", "b.txt"); + assertNotNull("localizeFileB is not localized", localizedFileB); + assertTrue(Client.checkPermissionOfOther(fs1, new Path(localizedFileB + .getPath()), FsAction.READ, new HashMap<>())); + } finally { + localizeFileA.delete(); + localizeFileB.delete(); + } + } + + public File getLocalizedFilePath(String searchPath, String fileName) + throws IOException { + List localizedFilePath = Files.walk( + Paths.get(searchPath)).filter((f) -> Files.isRegularFile(f) && + f.endsWith(fileName)).collect(Collectors.toList()); + if (!localizedFilePath.isEmpty()) { + return localizedFilePath.get(0).toFile(); + } + return null; + } + + @Test + public void testConstructLocalizedFiles() { + Map> localizableFilesMap = + new HashMap<>(); + ArrayList publicFiles = new ArrayList<>(); + publicFiles.add("file:/a.txt"); + publicFiles.add("file:/b.txt"); + localizableFilesMap.put(LocalResourceVisibility.PUBLIC, publicFiles); + ArrayList privateFiles = new ArrayList<>(); + privateFiles.add("hdfs:/c.txt"); + localizableFilesMap.put(LocalResourceVisibility.PRIVATE, privateFiles); + String actualStr = ApplicationMaster. + constructLocalizedFiles(localizableFilesMap); + String expected1 = "(PUBLIC=file:/a.txt,file:/b.txt)"; + String expected2 = "(PRIVATE=hdfs:/c.txt)"; + + assertTrue(actualStr.equals(expected1 + "," + expected2) || + actualStr.equals(expected2 + "," + expected1)); + } + + @Test + public void testParseLocalizedFiles() throws Exception { + String localizedFiles = "(PUBLIC=file:/a.txt,file:/b.txt)," + + "(PRIVATE=hdfs:/c.txt)"; + List> pairList = + ApplicationMaster.parseLocalizedFiles(localizedFiles); + Assert.assertEquals(3, pairList.size()); + List> expectedList = new ArrayList<>(); + expectedList.add(Pair.of(new Path("file:/a.txt"), + LocalResourceVisibility.PUBLIC)); + expectedList.add(Pair.of(new Path("file:/b.txt"), + LocalResourceVisibility.PUBLIC)); + expectedList.add(Pair.of(new Path("hdfs:/c.txt"), + LocalResourceVisibility.PRIVATE)); + + assertTrue(pairList.containsAll(expectedList)); + } } -- 2.7.4 (Apple Git-66)