diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 79c38c1..3e3b779 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -786,6 +786,8 @@ (int) Math.pow(2, (5 + 10 + 10)) ), // 32MB HIVE_OPTIMIZE_TEZ("hive.optimize.tez", false), + HIVE_JAR_DIRECTORY("hive.jar.directory", "hdfs:///apps/hive/install/"), + HIVE_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/"), ; public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 12e9334..eab994b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -360,6 +360,11 @@ CANNOT_REPLACE_COLUMNS(10243, "Replace columns is not supported for table {0}. SerDe may be incompatible.", true), BAD_LOCATION_VALUE(10244, "{0} is not absolute or has no scheme information. Please specify a complete absolute uri with scheme information."), UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported"), + CANNOT_FIND_EXEC_JAR(10246, "{0} jar couldn't be found in location configured by hive.jar.directory = {1}", true), + INVALID_HDFS_URI(10247, "{0} is not a hdfs uri", true), + INVALID_DIR(10248, "{0} is not a directory", true), + NO_VALID_LOCATIONS(10249, "Could not find any valid location to place the jars. " + + "Please update hive.jar.directory or hive.user.install.directory with a valid location", false), SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " @@ -616,8 +621,8 @@ public String format(String reason) { return format(new String[]{reason}); } /** - * If the message is parametrized, this will fill the parameters with supplied - * {@code reasons}, otherwise {@code reasons} are appended at the end of the + * If the message is parametrized, this will fill the parameters with supplied + * {@code reasons}, otherwise {@code reasons} are appended at the end of the * message. */ public String format(String... reasons) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index faa99f7..97d0103 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -17,19 +17,26 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.security.auth.login.LoginException; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; @@ -44,6 +51,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -72,6 +80,10 @@ * map and reduce work to tez vertices and edges. It handles configuration * objects, file localization and vertex/edge creation. */ +/** + * DagUtils. + * + */ public class DagUtils { /* @@ -300,13 +312,46 @@ private static LocalResource createLocalResource(FileSystem remoteFs, Path file, } /** + * @param conf + * @return path to destination directory on hdfs + * @throws LoginException if we are unable to figure user information + * @throws IOException when any dfs operation fails. + */ + private static Path getDefaultDestDir(Configuration conf) throws LoginException, IOException { + UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); + String userName = ShimLoader.getHadoopShims().getShortUserName(ugi); + String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_INSTALL_DIR); + Path userPath = new Path(userPathStr); + FileSystem fs = userPath.getFileSystem(conf); + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(userPathStr)); + } + + String jarPathStr = userPathStr + "/" + userName; + String hdfsDirPathStr = jarPathStr; + Path hdfsDirPath = new Path(hdfsDirPathStr); + + FileStatus fstatus = fs.getFileStatus(hdfsDirPath); + if (!fstatus.isDir()) { + throw new IOException(ErrorMsg.INVALID_DIR.format(hdfsDirPath.toString())); + } + + Path retPath = new Path(hdfsDirPath.toString() + "/.hiveJars"); + + fs.mkdirs(retPath); + return retPath; + } + + /** * Localizes files, archives and jars the user has instructed us * to provide on the cluster as resources for execution. * * @param conf * @return List local resources to add to execution + * @throws IOException when hdfs operation fails + * @throws LoginException when getDefaultDestDir fails with the same exception */ - public static List localizeTempFiles(Configuration conf) { + public static List localizeTempFiles(Configuration conf) throws IOException, LoginException { List tmpResources = new ArrayList(); String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); @@ -316,15 +361,197 @@ private static LocalResource createLocalResource(FileSystem remoteFs, Path file, // need to localize the additional jars and files + // we need the directory on hdfs to which we shall put all these files + String hdfsDirPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_JAR_DIRECTORY); + Path hdfsDirPath = new Path(hdfsDirPathStr); + FileSystem fs = hdfsDirPath.getFileSystem(conf); + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hdfsDirPathStr)); + } + + FileStatus fstatus = null; + try { + fstatus = fs.getFileStatus(hdfsDirPath); + } catch (FileNotFoundException fe) { + // do nothing + } + + if ((fstatus == null) || (!fstatus.isDir())) { + Path destDir = getDefaultDestDir(conf); + hdfsDirPathStr = destDir.toString(); + } + + String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives; + String[] allFilesArr = allFiles.split(","); + for (String file : allFilesArr) { + String hdfsFilePathStr = hdfsDirPathStr + "/" + getResourceVersion(file); + LocalResource localResource = localizeResource(new Path(file), new Path(hdfsFilePathStr), conf); + tmpResources.add(localResource); + } + return tmpResources; } + // the api that finds the jar being used by this class on disk + private static String getExecJarPathLocal () { + try { + // java magic + return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString(); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + + return null; + } + + + /** + * @param pathStr - the string from which we try to determine the version + * @return returns a string of type 0.12.0 etc. + */ + private static String getResourceVersion(String pathStr) { + String[] splits = pathStr.split("/"); + return splits[splits.length - 1]; + } + /** - * Creates a local resource representing the hive-exec jar. This resource will + * @param src the source file. + * @param dest the destination file. + * @param conf the configuration + * @return true if the file names and cksum matches else returns false. + * @throws IOException when any file system related call fails + */ + private static boolean checkPreExisting(Path src, Path dest, Configuration conf) + throws IOException { + FileSystem srcFS = src.getFileSystem(conf); + FileSystem destFS = dest.getFileSystem(conf); + + if (!destFS.exists(dest)) { + return false; + } + FileStatus destStatus = destFS.getFileStatus(dest); + if (destStatus.isDir()) { + return false; + } + + String srcVersion = getResourceVersion(src.toString()); + String destVersion = getResourceVersion(dest.toString()); + + if (!srcVersion.equals(destVersion)) { + return false; + } + + FileChecksum srcCksum = srcFS.getFileChecksum(src); + FileChecksum destCksum = destFS.getFileChecksum(dest); + if ((srcCksum == null) || (destCksum == null)) { + return false; + } + // versions match. check the cksum + if (srcCksum.equals(destCksum)) { + // both cksum and versions match + System.out.println("Found an exact match skipping: " + src.toString()); + return true; + } + + return false; + } + + /** + * @param src path to the source for the resource + * @param dest path in hdfs for the resource + * @param conf + * @return localresource from tez localization. + * @throws IOException when any file system related calls fails. + */ + private static LocalResource localizeResource(Path src, Path dest, Configuration conf) + throws IOException { + FileSystem destFS = dest.getFileSystem(conf); + if (!(destFS instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(dest.toString())); + } + + if (src != null) { + if (!checkPreExisting(src, dest, conf)) { + // copy the src to the destination and create local resource. + // overwrite even if file already exists. + System.out.println("src: " + src.toString() + " and dest: " + dest.toString()); + destFS.copyFromLocalFile(false, true, src, dest); + } + } + + return createLocalResource(destFS, dest, LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION); + } + + /** + * Returns a local resource representing the hive-exec jar. This resource will * be used to execute the plan on the cluster. + * @param conf + * @return LocalResource corresponding to the localized hive exec resource. + * @throws IOException when any file system related call fails. + * @throws LoginException when we are unable to determine the user. */ - public static LocalResource createHiveExecLocalResource(Path mrScratchDir) { - return null; + public static LocalResource createHiveExecLocalResource(HiveConf conf) + throws IOException, LoginException { + String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY); + String currentVersionPathStr = getExecJarPathLocal(); + String currentJar = getResourceVersion(currentVersionPathStr); + FileSystem fs = null; + Path jarPath = null; + FileStatus dirStatus = null; + + if (hiveJarDir != null) { + // check if it is a valid directory in HDFS + Path hiveJarDirPath = new Path(hiveJarDir); + fs = hiveJarDirPath.getFileSystem(conf); + + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir)); + } + + try { + dirStatus = fs.getFileStatus(hiveJarDirPath); + } catch (FileNotFoundException fe) { + // do nothing + } + if ((dirStatus != null) && (dirStatus.isDir())) { + FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath); + for (FileStatus fstatus : listFileStatus) { + String jarVersion = getResourceVersion(fstatus.getPath().toString()); + if (jarVersion.equals(currentJar)) { + // we have found the jar we need. + jarPath = fstatus.getPath(); + return localizeResource(null, jarPath, conf); + } + } + + if (jarPath == null) { + dirStatus = null; + } + } + } + + /* + * specified location does not exist or is not a directory + * try to push the jar to the hdfs location pointed by + * config variable HIVE_INSTALL_DIR. Path will be + * HIVE_INSTALL_DIR/{username}/hive_install/ + */ + if ((hiveJarDir == null) || (dirStatus == null) || + ((dirStatus != null) && (!dirStatus.isDir()))) { + Path dest = getDefaultDestDir(conf); + String destPathStr = dest.toString(); + String jarPathStr = destPathStr + "/" + currentJar; + dirStatus = fs.getFileStatus(dest); + if (dirStatus.isDir()) { + return localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf); + } else { + throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString())); + } + } + + // we couldn't find any valid locations. Throw exception + throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg()); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index ac536e2..c0dd791 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -74,7 +74,7 @@ public int execute(DriverContext driverContext) { // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. - LocalResource appJarLr = DagUtils.createHiveExecLocalResource(scratchDir); + LocalResource appJarLr = DagUtils.createHiveExecLocalResource(conf); // next we translate the TezWork to a Tez DAG DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);