diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 79c38c1..3e3b779 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -786,6 +786,8 @@ (int) Math.pow(2, (5 + 10 + 10)) ), // 32MB HIVE_OPTIMIZE_TEZ("hive.optimize.tez", false), + HIVE_JAR_DIRECTORY("hive.jar.directory", "hdfs:///apps/hive/install/"), + HIVE_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/"), ; public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 12e9334..439eab5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -360,6 +360,9 @@ CANNOT_REPLACE_COLUMNS(10243, "Replace columns is not supported for table {0}. SerDe may be incompatible.", true), BAD_LOCATION_VALUE(10244, "{0} is not absolute or has no scheme information. Please specify a complete absolute uri with scheme information."), UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported"), + CANNOT_FIND_EXEC_JAR(10246, "{0} jar couldn't be found in location configured by hive.jar.directory = {1}", true), + INVALID_HDFS_URI(10247, "{0} is not a hdfs uri", true), + INVALID_DIR(10248, "{0} is not a directory", true), SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " @@ -616,8 +619,8 @@ public String format(String reason) { return format(new String[]{reason}); } /** - * If the message is parametrized, this will fill the parameters with supplied - * {@code reasons}, otherwise {@code reasons} are appended at the end of the + * If the message is parametrized, this will fill the parameters with supplied + * {@code reasons}, otherwise {@code reasons} are appended at the end of the * message. */ public String format(String... reasons) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index faa99f7..02fc5e9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -17,19 +17,26 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.security.auth.Subject; +import javax.security.auth.login.LoginException; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; @@ -44,6 +51,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -74,6 +82,8 @@ */ public class DagUtils { + private static Subject currentUser; + /* * Creates the configuration object necessary to run a specific vertex from * map work. This includes input formats, input processor, etc. @@ -305,8 +315,10 @@ private static LocalResource createLocalResource(FileSystem remoteFs, Path file, * * @param conf * @return List local resources to add to execution + * @throws IOException when hdfs operation fails + * @throws LoginException when user cannot be determined */ - public static List localizeTempFiles(Configuration conf) { + public static List localizeTempFiles(Configuration conf) throws IOException, LoginException { List tmpResources = new ArrayList(); String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); @@ -316,15 +328,147 @@ private static LocalResource createLocalResource(FileSystem remoteFs, Path file, // need to localize the additional jars and files + // we need the directory on hdfs to which we shall put all these files + String hdfsDirPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_JAR_DIRECTORY); + Path hdfsDirPath = new Path(hdfsDirPathStr); + FileSystem fs = hdfsDirPath.getFileSystem(conf); + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hdfsDirPathStr)); + } + + FileStatus fstatus = null; + try { + fstatus = fs.getFileStatus(hdfsDirPath); + } catch (FileNotFoundException fe) { + // do nothing + } + + if ((fstatus == null) || (!fstatus.isDir())) { + UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); + String userName = ShimLoader.getHadoopShims().getShortUserName(ugi); + String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_INSTALL_DIR); + Path userPath = new Path(userPathStr); + fs = userPath.getFileSystem(conf); + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(userPathStr)); + } + + String jarPathStr = userPathStr + "/" + userName; + hdfsDirPathStr = jarPathStr; + hdfsDirPath = new Path(hdfsDirPathStr); + } + + String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives; + String[] allFilesArr = allFiles.split(","); + for (String file : allFilesArr) { + fs.copyFromLocalFile(new Path(file), hdfsDirPath); + String hdfsFilePathStr = hdfsDirPathStr + "/" + getClientVersion(file); + LocalResource localResource = createLocalResource(fs, new Path(hdfsFilePathStr), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); + tmpResources.add(localResource); + } + return tmpResources; } + private static String getExecJarPathLocal () { + try { + return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + + return null; + } + + + /** + * @param pathStr - the string from which we try to determine the version + * @return returns a string of type 0.12.0 etc. + */ + private static String getClientVersion(String pathStr) { + String[] splits = pathStr.split("/"); + return splits[splits.length - 1]; + } + /** * Creates a local resource representing the hive-exec jar. This resource will * be used to execute the plan on the cluster. + * @param conf + * @return LocalResource corresponding to the localized hive exec resource. + * @throws IOException when any filesystem related call fails. + * @throws LoginException when we are unable to determine the user. */ - public static LocalResource createHiveExecLocalResource(Path mrScratchDir) { - return null; + public static LocalResource createHiveExecLocalResource(HiveConf conf) + throws IOException, LoginException { + String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY); + String currentVersionPathStr = getExecJarPathLocal(); + String currentJar = getClientVersion(currentVersionPathStr); + FileSystem fs = null; + Path jarPath = null; + FileStatus dirStatus = null; + + if (hiveJarDir != null) { + // check if it is a valid directory in HDFS + Path hiveJarDirPath = new Path(hiveJarDir); + fs = hiveJarDirPath.getFileSystem(conf); + + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir)); + } + + try { + dirStatus = fs.getFileStatus(hiveJarDirPath); + } catch (FileNotFoundException fe) { + // do nothing + } + if ((dirStatus != null) && (dirStatus.isDir())) { + FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath); + for (FileStatus fstatus : listFileStatus) { + String jarVersion = getClientVersion(fstatus.getPath().toString()); + if (jarVersion.equals(currentJar)) { + // we have found the jar we need. + jarPath = fstatus.getPath(); + break; + } + } + + if (jarPath == null) { + throw new IOException(ErrorMsg.CANNOT_FIND_EXEC_JAR.format(currentJar, hiveJarDir)); + } + } + } + + /* + * specified location does not exist or is not a directory + * try to push the jar to the hdfs location pointed by + * config variable HIVE_INSTALL_DIR. Path will be + * HIVE_INSTALL_DIR/{username}/hive_install/ + */ + if ((hiveJarDir == null) || (dirStatus == null) || + ((dirStatus != null) && (!dirStatus.isDir()))) { + UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); + String userName = ShimLoader.getHadoopShims().getShortUserName(ugi); + String userPathStr = conf.getVar(HiveConf.ConfVars.HIVE_INSTALL_DIR); + Path userPath = new Path(userPathStr); + fs = userPath.getFileSystem(conf); + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(userPathStr)); + } + + String jarPathStr = userPathStr + "/" + userName; + Path dest = new Path(jarPathStr); + jarPathStr += "/" + currentJar; + dirStatus = fs.getFileStatus(dest); + if (dirStatus.isDir()) { + fs.copyFromLocalFile(new Path(currentVersionPathStr), dest); + jarPath = new Path(jarPathStr); + } else { + throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString())); + } + } + + return createLocalResource(fs, jarPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index ac536e2..c0dd791 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -74,7 +74,7 @@ public int execute(DriverContext driverContext) { // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. - LocalResource appJarLr = DagUtils.createHiveExecLocalResource(scratchDir); + LocalResource appJarLr = DagUtils.createHiveExecLocalResource(conf); // next we translate the TezWork to a Tez DAG DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);