diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 0d3c29d..0e326cf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -795,11 +795,12 @@ public Path getDefaultDestDir(Configuration conf) throws LoginException, IOExcep String hdfsDirPathStr, Configuration conf) throws IOException, LoginException { List tmpResources = new ArrayList(); - addTempFiles(conf, tmpResources, hdfsDirPathStr, getTempFilesFromConf(conf)); + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getTempFilesFromConf(conf)); + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, getTempArchivesFromConf(conf)); return tmpResources; } - public static String[] getTempFilesFromConf(Configuration conf) { + private static String[] getTempFilesFromConf(Configuration conf) { String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); if (StringUtils.isNotBlank(addedFiles)) { HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles); @@ -808,19 +809,23 @@ public Path getDefaultDestDir(Configuration conf) throws LoginException, IOExcep if (StringUtils.isNotBlank(addedJars)) { HiveConf.setVar(conf, ConfVars.HIVEADDEDJARS, addedJars); } - String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); - if (StringUtils.isNotBlank(addedArchives)) { - HiveConf.setVar(conf, ConfVars.HIVEADDEDARCHIVES, addedArchives); - } - String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); // need to localize the additional jars and files // we need the directory on hdfs to which we shall put all these files - String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives; + String allFiles = auxJars + "," + addedJars + "," + addedFiles; return allFiles.split(","); } + private static String[] getTempArchivesFromConf(Configuration conf) { + String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE); + if (StringUtils.isNotBlank(addedArchives)) { + HiveConf.setVar(conf, ConfVars.HIVEADDEDARCHIVES, addedArchives); + return addedArchives.split(","); + } + return new String[0]; + } + /** * Localizes files, archives and jars from a provided array of names. * @param hdfsDirPathStr Destination directory in HDFS. @@ -834,12 +839,13 @@ public Path getDefaultDestDir(Configuration conf) throws LoginException, IOExcep String[] inputOutputJars) throws IOException, LoginException { if (inputOutputJars == null) return null; List tmpResources = new ArrayList(); - addTempFiles(conf, tmpResources, hdfsDirPathStr, inputOutputJars); + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, inputOutputJars); return tmpResources; } - private void addTempFiles(Configuration conf, + private void addTempResources(Configuration conf, List tmpResources, String hdfsDirPathStr, + LocalResourceType type, String[] files) throws IOException { for (String file : files) { if (!StringUtils.isNotBlank(file)) { @@ -847,7 +853,7 @@ private void addTempFiles(Configuration conf, } Path hdfsFilePath = new Path(hdfsDirPathStr, getResourceBaseName(new Path(file))); LocalResource localResource = localizeResource(new Path(file), - hdfsFilePath, conf); + hdfsFilePath, type, conf); tmpResources.add(localResource); } } @@ -925,11 +931,12 @@ private boolean checkPreExisting(Path src, Path dest, Configuration conf) /** * @param src path to the source for the resource * @param dest path in hdfs for the resource + * @param type local resource type (File/Archive) * @param conf * @return localresource from tez localization. * @throws IOException when any file system related calls fails. */ - public LocalResource localizeResource(Path src, Path dest, Configuration conf) + public LocalResource localizeResource(Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException { FileSystem destFS = dest.getFileSystem(conf); @@ -970,7 +977,7 @@ public LocalResource localizeResource(Path src, Path dest, Configuration conf) } } - return createLocalResource(destFS, dest, LocalResourceType.FILE, + return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 563fb49..bb99357 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.tez; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.SessionNotRunning; @@ -352,7 +354,7 @@ private LocalResource createJarLocalResource(String localJarPath) // TODO: if this method is ever called on more than one jar, getting the dir and the // list need to be refactored out to be done only once. Path destFile = new Path(destDirPath.toString() + "/" + destFileName); - return utils.localizeResource(localFile, destFile, conf); + return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 337f2f4..e6bbe9a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -50,6 +50,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -360,7 +361,10 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, Map resourceMap = new HashMap(); if (additionalLr != null) { for (LocalResource lr: additionalLr) { - resourceMap.put(utils.getBaseName(lr), lr); + if (lr.getType() == LocalResourceType.FILE) { + // TEZ AM will only localize FILE (no script operators in the AM) + resourceMap.put(utils.getBaseName(lr), lr); + } } }