Index: common/src/java/org/apache/hadoop/hive/common/FileUtils.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/FileUtils.java (revision 1036562) +++ common/src/java/org/apache/hadoop/hive/common/FileUtils.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; /** * Collection of file manipulation utilities common across Hive. @@ -34,12 +35,12 @@ public final class FileUtils { /** - * Variant of Path.makeQualified that qualifies the input path against the - * default file system indicated by the configuration + * Variant of Path.makeQualified that qualifies the input path against the default file system + * indicated by the configuration * - * This does not require a FileSystem handle in most cases - only requires the - * Filesystem URI. This saves the cost of opening the Filesystem - which can - * involve RPCs - as well as cause errors + * This does not require a FileSystem handle in most cases - only requires the Filesystem URI. + * This saves the cost of opening the Filesystem - which can involve RPCs - as well as cause + * errors * * @param path * path to be fully qualified @@ -70,14 +71,13 @@ // no scheme - use default file system uri scheme = fsUri.getScheme(); authority = fsUri.getAuthority(); - if(authority == null) { + if (authority == null) { authority = ""; } } else { - if(authority == null) { + if (authority == null) { // no authority - use default one if it applies - if(scheme.equals(fsUri.getScheme()) && - fsUri.getAuthority() != null) { + if (scheme.equals(fsUri.getScheme()) && fsUri.getAuthority() != null) { authority = fsUri.getAuthority(); } else { authority = ""; @@ -93,8 +93,7 @@ } - public static String makePartName(List partCols, - List vals) { + public static String makePartName(List partCols, List vals) { StringBuilder name = new StringBuilder(); for (int i = 0; i < partCols.size(); i++) { @@ -122,8 +121,8 @@ for (char c = 0; c < ' '; c++) { charToEscape.set(c); } - char[] clist = new char[] { '"', '#', '%', '\'', '*', '/', ':', '=', '?', - '\\', '\u007F', '{', ']' }; + char[] clist = new char[] {'"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{', + ']'}; for (char c : clist) { charToEscape.set(c); } @@ -177,18 +176,20 @@ } /** - * Recursively lists status for all files starting from a particular - * directory (or individual file as base case). + * Recursively lists status for all files starting from a particular directory (or individual file + * as base case). * - * @param fs file system + * @param fs + * file system * - * @param fileStatus starting point in file system + * @param fileStatus + * starting point in file system * - * @param results receives enumeration of all files found + * @param results + * receives enumeration of all files found */ public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus, - List results) - throws IOException { + List results) throws IOException { if (fileStatus.isDir()) { for (FileStatus stat : fs.listStatus(fileStatus.getPath())) { @@ -198,4 +199,30 @@ results.add(fileStatus); } } + + /** + * Archive all the files in the inputFiles into outputFile + * + * @param inputFiles + * @param outputFile + * @throws IOException + */ + public static void tar(String parentDir, String[] inputFiles, String outputFile) + throws IOException { + StringBuffer tarCommand = new StringBuffer(); + tarCommand.append("cd " + parentDir + " ; "); + tarCommand.append(" tar -zcvf "); + tarCommand.append(" " + outputFile); + for (int i = 0; i < inputFiles.length; i++) { + tarCommand.append(" " + inputFiles[i]); + } + String[] shellCmd = {"bash", "-c", tarCommand.toString()}; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error tarring file " + outputFile + + ". Tar process exited with exit code " + exitcode); + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (working copy) @@ -400,7 +400,7 @@ .entrySet()) { // get the key and value Byte tag = hashTables.getKey(); - HashMapWrapper hashTable = hashTables.getValue(); + HashMapWrapper hashTable = hashTables.getValue(); // get current input file name String bigBucketFileName = this.getExecContext().getCurrentBigBucketFile(); @@ -408,7 +408,7 @@ bigBucketFileName = "-"; } // get the tmp URI path; it will be a hdfs path if not local mode - String tmpURIPath = PathUtil.generatePath(tmpURI, tag, bigBucketFileName); + String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName); hashTable.isAbort(rowNumber, console); console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath); // get the hashtable file and path Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (working copy) @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; @@ -144,7 +145,7 @@ private void loadHashTable() throws HiveException { boolean localMode = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPJT).equals("local"); - String tmpURI = null; + String baseDir = null; HashMapWrapper hashtable; Byte pos; @@ -161,57 +162,36 @@ try { if (localMode) { - LOG.info("******* Load from tmp file uri ***"); - tmpURI = this.getExecContext().getLocalWork().getTmpFileURI(); - for (Map.Entry> entry : mapJoinTables - .entrySet()) { - pos = entry.getKey(); - hashtable = entry.getValue(); - String filePath = Utilities.generatePath(tmpURI, pos, currentFileName); - Path path = new Path(filePath); - LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString()); - - hashtable.initilizePersistentHash(path.toUri().getPath()); - } + baseDir = this.getExecContext().getLocalWork().getTmpFileURI(); } else { - - Path[] localFiles = DistributedCache.getLocalCacheFiles(this.hconf); - - for (Map.Entry> entry : mapJoinTables - .entrySet()) { - pos = entry.getKey(); - hashtable = entry.getValue(); - String suffix = Utilities.generateFileName(pos, currentFileName); - LOG.info("Looking for hashtable file with suffix: " + suffix); - - boolean found = false; - for (int i = 0; i < localFiles.length; i++) { - Path path = localFiles[i]; - - if (path.toString().endsWith(suffix)) { - LOG.info("Matching suffix with cached file:" + path.toString()); - LOG.info("\tInitializing the hashtable by cached file:" + path.toString()); - hashtable.initilizePersistentHash(path.toString()); - found = true; - LOG.info("\tLoad back 1 hashtable file from distributed cache:" + path.toString()); - break; - } + Path[] localArchives; + String stageID = this.getExecContext().getLocalWork().getStageID(); + String suffix = Utilities.generateTarFileName(stageID); + FileSystem localFs = FileSystem.getLocal(hconf); + localArchives = DistributedCache.getLocalCacheArchives(this.hconf); + Path archive; + for (int j = 0; j < localArchives.length; j++) { + archive = localArchives[j]; + if (!archive.getName().endsWith(suffix)) { + continue; } - if (!found) { - LOG.error("Load nothing from Distributed Cache"); - throw new HiveException(); - } + Path archiveLocalLink = archive.makeQualified(localFs); + baseDir = archiveLocalLink.toUri().getPath(); } - } + for (Map.Entry> entry : mapJoinTables + .entrySet()) { + pos = entry.getKey(); + hashtable = entry.getValue(); + String filePath = Utilities.generatePath(baseDir, pos, currentFileName); + Path path = new Path(filePath); + LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString()); + hashtable.initilizePersistentHash(path.toUri().getPath()); + } } catch (Exception e) { - e.printStackTrace(); - LOG.error("Load Hash Table error"); - - throw new HiveException(); + LOG.error("Load Distributed Cache Error"); + throw new HiveException(e.getMessage()); } - - } @Override Index: ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (working copy) @@ -1,20 +0,0 @@ -package org.apache.hadoop.hive.ql.exec; - -import org.apache.hadoop.fs.Path; - -public class PathUtil { - public static String suffix=".hashtable"; - public static String generatePath(String baseURI,Byte tag,String bigBucketFileName){ - String path = new String(baseURI+Path.SEPARATOR+"-"+tag+"-"+bigBucketFileName+suffix); - return path; - } - public static String generateFileName(Byte tag,String bigBucketFileName){ - String fileName = new String("-"+tag+"-"+bigBucketFileName+suffix); - return fileName; - } - - public static String generateTmpURI(String baseURI,String id){ - String tmpFileURI = new String(baseURI+Path.SEPARATOR+"HashTable-"+id); - return tmpFileURI; - } -} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -49,6 +49,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; @@ -538,6 +539,7 @@ throw new RuntimeException(e.getMessage()); } + // No-Op - we don't really write anything here .. job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); @@ -560,23 +562,21 @@ if (StringUtils.isNotBlank(addedFiles)) { initializeFiles("tmpfiles", addedFiles); } - // Transfer HIVEADDEDARCHIVES to "tmparchives" so hadoop understands it - String addedArchives = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDARCHIVES); - if (StringUtils.isNotBlank(addedArchives)) { - initializeFiles("tmparchives", addedArchives); - } - int returnVal = 0; RunningJob rj = null; - boolean noName = StringUtils.isEmpty(HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJOBNAME)); if (noName) { // This is for a special case to ensure unit tests pass HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, "JOB" + Utilities.randGen.nextInt()); } - try { - // propagate the file to distributed cache + String addedArchives = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDARCHIVES); + // Transfer HIVEADDEDARCHIVES to "tmparchives" so hadoop understands it + if (StringUtils.isNotBlank(addedArchives)) { + initializeFiles("tmparchives", addedArchives); + } + + try{ MapredLocalWork localwork = work.getMapLocalWork(); if (localwork != null) { boolean localMode = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local"); @@ -587,32 +587,39 @@ FileSystem hdfs = hdfsPath.getFileSystem(job); FileSystem localFS = localPath.getFileSystem(job); FileStatus[] hashtableFiles = localFS.listStatus(localPath); - for (int i = 0; i < hashtableFiles.length; i++) { - FileStatus file = hashtableFiles[i]; - Path path = file.getPath(); - String fileName = path.getName(); - String hdfsFile = hdfsPath + Path.SEPARATOR + fileName; + int fileNumber = hashtableFiles.length; + String[] fileNames = new String[fileNumber]; - LOG.info("Upload 1 HashTable from" + path + " to: " + hdfsFile); - Path hdfsFilePath = new Path(hdfsFile); - - hdfs.copyFromLocalFile(path, hdfsFilePath); - short replication = (short) job.getInt("mapred.submit.replication", 10); - hdfs.setReplication(hdfsFilePath, replication); + for ( int i = 0; i < fileNumber; i++){ + fileNames[i] = hashtableFiles[i].getPath().getName(); } - FileStatus[] hashtableRemoteFiles = hdfs.listStatus(hdfsPath); - for (int i = 0; i < hashtableRemoteFiles.length; i++) { - FileStatus file = hashtableRemoteFiles[i]; - Path path = file.getPath(); - DistributedCache.addCacheFile(path.toUri(), job); - LOG.info("add 1 hashtable file to distributed cache: " + path.toUri()); - } + //package and compress all the hashtable files to an archive file + String parentDir = localPath.toUri().getPath(); + String stageId = this.getId(); + String archiveFileURI = Utilities.generateTarURI(parentDir, stageId); + String archiveFileName = Utilities.generateTarFileName(stageId); + localwork.setStageID(stageId); + + FileUtils.tar(parentDir, fileNames,archiveFileName); + Path archivePath = new Path(archiveFileURI); + LOG.info("Archive "+ hashtableFiles.length+" hash table files to " + archiveFileURI); + + //upload archive file to hdfs + String hdfsFile =Utilities.generateTarURI(hdfsPath, stageId); + Path hdfsFilePath = new Path(hdfsFile); + short replication = (short) job.getInt("mapred.submit.replication", 10); + hdfs.setReplication(hdfsFilePath, replication); + hdfs.copyFromLocalFile(archivePath, hdfsFilePath); + LOG.info("Upload 1 archive file from" + archivePath + " to: " + hdfsFilePath); + + //add the archive file to distributed cache + DistributedCache.createSymlink(job); + DistributedCache.addCacheArchive(hdfsFilePath.toUri(), job); + LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri()); } } - - addInputPaths(job, work, emptyScratchDirStr); Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI()); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -1572,13 +1572,13 @@ public static String suffix = ".hashtable"; public static String generatePath(String baseURI, Byte tag, String bigBucketFileName) { - String path = new String(baseURI + Path.SEPARATOR + "-" + tag + "-" + bigBucketFileName + String path = new String(baseURI + Path.SEPARATOR + "MapJoin-" + tag + "-" + bigBucketFileName + suffix); return path; } public static String generateFileName(Byte tag, String bigBucketFileName) { - String fileName = new String("-" + tag + "-" + bigBucketFileName + suffix); + String fileName = new String("MapJoin-" + tag + "-" + bigBucketFileName + suffix); return fileName; } @@ -1587,6 +1587,26 @@ return tmpFileURI; } + public static String generateTarURI(String baseURI, String filename) { + String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename+".tar.gz"); + return tmpFileURI; + } + + public static String generateTarURI(Path baseURI, String filename) { + String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename+".tar.gz"); + return tmpFileURI; + } + + public static String generateTarFileName(String name) { + String tmpFileURI = new String(name+".tar.gz"); + return tmpFileURI; + } + + public static String generatePath(Path baseURI, String filename) { + String path = new String(baseURI + Path.SEPARATOR + filename); + return path; + } + public static String now() { Calendar cal = Calendar.getInstance(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (working copy) @@ -43,6 +43,7 @@ private boolean inputFileChangeSensitive; private BucketMapJoinContext bucketMapjoinContext; private String tmpFileURI; + private String stageID; private List> dummyParentOp ; @@ -81,7 +82,15 @@ return aliasToWork; } + public String getStageID() { + return stageID; + } + public void setStageID(String stageID) { + this.stageID = stageID; + } + + public void setAliasToWork( final LinkedHashMap> aliasToWork) { this.aliasToWork = aliasToWork;