Index: common/src/java/org/apache/hadoop/hive/common/FileUtils.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/FileUtils.java (revision 1036562) +++ common/src/java/org/apache/hadoop/hive/common/FileUtils.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; /** * Collection of file manipulation utilities common across Hive. @@ -34,12 +35,12 @@ public final class FileUtils { /** - * Variant of Path.makeQualified that qualifies the input path against the - * default file system indicated by the configuration + * Variant of Path.makeQualified that qualifies the input path against the default file system + * indicated by the configuration * - * This does not require a FileSystem handle in most cases - only requires the - * Filesystem URI. This saves the cost of opening the Filesystem - which can - * involve RPCs - as well as cause errors + * This does not require a FileSystem handle in most cases - only requires the Filesystem URI. + * This saves the cost of opening the Filesystem - which can involve RPCs - as well as cause + * errors * * @param path * path to be fully qualified @@ -70,14 +71,13 @@ // no scheme - use default file system uri scheme = fsUri.getScheme(); authority = fsUri.getAuthority(); - if(authority == null) { + if (authority == null) { authority = ""; } } else { - if(authority == null) { + if (authority == null) { // no authority - use default one if it applies - if(scheme.equals(fsUri.getScheme()) && - fsUri.getAuthority() != null) { + if (scheme.equals(fsUri.getScheme()) && fsUri.getAuthority() != null) { authority = fsUri.getAuthority(); } else { authority = ""; @@ -93,8 +93,7 @@ } - public static String makePartName(List partCols, - List vals) { + public static String makePartName(List partCols, List vals) { StringBuilder name = new StringBuilder(); for (int i = 0; i < partCols.size(); i++) { @@ -122,8 +121,8 @@ for (char c = 0; c < ' '; c++) { charToEscape.set(c); } - char[] clist = new char[] { '"', '#', '%', '\'', '*', '/', ':', '=', '?', - '\\', '\u007F', '{', ']' }; + char[] clist = new char[] {'"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', '{', + ']'}; for (char c : clist) { charToEscape.set(c); } @@ -177,18 +176,20 @@ } /** - * Recursively lists status for all files starting from a particular - * directory (or individual file as base case). + * Recursively lists status for all files starting from a particular directory (or individual file + * as base case). * - * @param fs file system + * @param fs + * file system * - * @param fileStatus starting point in file system + * @param fileStatus + * starting point in file system * - * @param results receives enumeration of all files found + * @param results + * receives enumeration of all files found */ public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus, - List results) - throws IOException { + List results) throws IOException { if (fileStatus.isDir()) { for (FileStatus stat : fs.listStatus(fileStatus.getPath())) { @@ -198,4 +199,30 @@ results.add(fileStatus); } } + + /** + * Archive all the files in the inputFiles into outputFile + * + * @param inputFiles + * @param outputFile + * @throws IOException + */ + public static void tar(String parentDir, String[] inputFiles, String outputFile) + throws IOException { + StringBuffer tarCommand = new StringBuffer(); + tarCommand.append("cd " + parentDir + " ; "); + tarCommand.append(" tar -zcvf "); + tarCommand.append(" " + outputFile); + for (int i = 0; i < inputFiles.length; i++) { + tarCommand.append(" " + inputFiles[i]); + } + String[] shellCmd = {"bash", "-c", tarCommand.toString()}; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error tarring file " + outputFile + + ". Tar process exited with exit code " + exitcode); + } + } } Index: ql/src/test/results/clientpositive/join_empty.q.out =================================================================== --- ql/src/test/results/clientpositive/join_empty.q.out (revision 0) +++ ql/src/test/results/clientpositive/join_empty.q.out (revision 0) @@ -0,0 +1,46 @@ +PREHOOK: query: create table srcpart_empty(key int, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table srcpart_empty(key int, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcpart_empty +PREHOOK: query: create table src2_empty (key int, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table src2_empty (key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@src2_empty +PREHOOK: query: select /*+mapjoin(a)*/ a.key, b.value from srcpart_empty a join src b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-39_138_107997606390957326/-mr-10000 +POSTHOOK: query: select /*+mapjoin(a)*/ a.key, b.value from srcpart_empty a join src b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-39_138_107997606390957326/-mr-10000 +PREHOOK: query: select /*+mapjoin(a)*/ a.key, b.value from src2_empty a join src b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src2_empty +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-43_473_3587274534534962486/-mr-10000 +POSTHOOK: query: select /*+mapjoin(a)*/ a.key, b.value from src2_empty a join src b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src2_empty +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-43_473_3587274534534962486/-mr-10000 +PREHOOK: query: select a.key, b.value from srcpart_empty a join src b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-47_872_1531725911506590422/-mr-10000 +POSTHOOK: query: select a.key, b.value from srcpart_empty a join src b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-47_872_1531725911506590422/-mr-10000 +PREHOOK: query: select a.key, b.value from src2_empty a join src b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src2_empty +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-52_182_4671419724484282586/-mr-10000 +POSTHOOK: query: select a.key, b.value from src2_empty a join src b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src2_empty +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-52_182_4671419724484282586/-mr-10000 Index: ql/src/test/queries/clientpositive/join_empty.q =================================================================== --- ql/src/test/queries/clientpositive/join_empty.q (revision 0) +++ ql/src/test/queries/clientpositive/join_empty.q (revision 0) @@ -0,0 +1,10 @@ +create table srcpart_empty(key int, value string) partitioned by (ds string); +create table src2_empty (key int, value string); + +select /*+mapjoin(a)*/ a.key, b.value from srcpart_empty a join src b on a.key=b.key; +select /*+mapjoin(a)*/ a.key, b.value from src2_empty a join src b on a.key=b.key; + +set hive.mapred.mode=nonstrict; +set hive.auto.convert.join = true; +select a.key, b.value from srcpart_empty a join src b on a.key=b.key; +select a.key, b.value from src2_empty a join src b on a.key=b.key; \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -582,6 +582,7 @@ if (aliasPartnDesc == null) { aliasPartnDesc = new PartitionDesc(Utilities.getTableDesc(parseCtx .getTopToTable().get(topOp)), null); + } plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (working copy) @@ -184,10 +184,6 @@ pathToAliases.remove(path); } - if (pathSet.size() == 0) { - throw new SemanticException("No input path for alias " + alias); - } - // create fetch work FetchWork fetchWork = null; List partDir = new ArrayList(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (working copy) @@ -400,7 +400,7 @@ .entrySet()) { // get the key and value Byte tag = hashTables.getKey(); - HashMapWrapper hashTable = hashTables.getValue(); + HashMapWrapper hashTable = hashTables.getValue(); // get current input file name String bigBucketFileName = this.getExecContext().getCurrentBigBucketFile(); @@ -408,7 +408,7 @@ bigBucketFileName = "-"; } // get the tmp URI path; it will be a hdfs path if not local mode - String tmpURIPath = PathUtil.generatePath(tmpURI, tag, bigBucketFileName); + String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName); hashTable.isAbort(rowNumber, console); console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath); // get the hashtable file and path Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -64,6 +64,26 @@ static Log LOG = LogFactory.getLog(FetchOperator.class.getName()); static LogHelper console = new LogHelper(LOG); + private boolean isEmptyTable; + private boolean isNativeTable; + private FetchWork work; + private int splitNum; + private PartitionDesc currPart; + private TableDesc currTbl; + private boolean tblDataDone; + + private transient RecordReader currRecReader; + private transient InputSplit[] inputSplits; + private transient InputFormat inputFormat; + private transient JobConf job; + private transient WritableComparable key; + private transient Writable value; + private transient Deserializer serde; + private transient Iterator iterPath; + private transient Iterator iterPartDesc; + private transient Path currPath; + private transient StructObjectInspector rowObjectInspector; + private transient Object[] rowWithPart; public FetchOperator() { } @@ -123,38 +143,24 @@ this.tblDataDone = tblDataDone; } - private boolean isNativeTable; - private FetchWork work; - private int splitNum; - private PartitionDesc currPart; - private TableDesc currTbl; - private boolean tblDataDone; + public boolean isEmptyTable() { + return isEmptyTable; + } - private transient RecordReader currRecReader; - private transient InputSplit[] inputSplits; - private transient InputFormat inputFormat; - private transient JobConf job; - private transient WritableComparable key; - private transient Writable value; - private transient Deserializer serde; - private transient Iterator iterPath; - private transient Iterator iterPartDesc; - private transient Path currPath; - private transient StructObjectInspector rowObjectInspector; - private transient Object[] rowWithPart; + public void setEmptyTable(boolean isEmptyTable) { + this.isEmptyTable = isEmptyTable; + } /** * A cache of InputFormat instances. */ - private static Map> inputFormats = - new HashMap>(); + private static Map> inputFormats = new HashMap>(); - static InputFormat getInputFormatFromCache( - Class inputFormatClass, Configuration conf) throws IOException { + static InputFormat getInputFormatFromCache(Class inputFormatClass, + Configuration conf) throws IOException { if (!inputFormats.containsKey(inputFormatClass)) { try { - InputFormat newInstance = - (InputFormat) ReflectionUtils + InputFormat newInstance = (InputFormat) ReflectionUtils .newInstance(inputFormatClass, conf); inputFormats.put(inputFormatClass, newInstance); } catch (Exception e) { @@ -169,10 +175,7 @@ List partNames = new ArrayList(); List partValues = new ArrayList(); - String pcols = currPart - .getTableDesc() - .getProperties() - .getProperty( + String pcols = currPart.getTableDesc().getProperties().getProperty( org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS); LinkedHashMap partSpec = currPart.getPartSpec(); @@ -181,16 +184,14 @@ for (String key : partKeys) { partNames.add(key); partValues.add(partSpec.get(key)); - partObjectInspectors - .add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + partObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); } StructObjectInspector partObjectInspector = ObjectInspectorFactory .getStandardStructObjectInspector(partNames, partObjectInspectors); rowObjectInspector = (StructObjectInspector) serde.getObjectInspector(); rowWithPart[1] = partValues; - rowObjectInspector = ObjectInspectorFactory - .getUnionStructObjectInspector(Arrays + rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(Arrays .asList(new StructObjectInspector[] {rowObjectInspector, partObjectInspector})); } @@ -226,8 +227,7 @@ } return; } else { - iterPath = FetchWork.convertStringToPathArray(work.getPartDir()) - .iterator(); + iterPath = FetchWork.convertStringToPathArray(work.getPartDir()).iterator(); iterPartDesc = work.getPartDesc().iterator(); } } @@ -235,15 +235,16 @@ while (iterPath.hasNext()) { Path nxt = iterPath.next(); PartitionDesc prt = null; - if(iterPartDesc != null) + if (iterPartDesc != null) { prt = iterPartDesc.next(); + } FileSystem fs = nxt.getFileSystem(job); if (fs.exists(nxt)) { FileStatus[] fStats = listStatusUnderPath(fs, nxt); for (FileStatus fStat : fStats) { if (fStat.getLen() > 0) { currPath = nxt; - if(iterPartDesc != null) { + if (iterPartDesc != null) { currPart = prt; } return; @@ -265,14 +266,13 @@ // to the default file system - which may or may not be online during pure // metadata // operations - job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils - .escapeString(currPath.toString())); + job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath + .toString())); PartitionDesc tmp; if (currTbl == null) { tmp = currPart; - } - else { + } else { tmp = new PartitionDesc(currTbl, null); } @@ -283,9 +283,9 @@ serde = tmp.getDeserializerClass().newInstance(); serde.initialize(job, tmp.getProperties()); - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Creating fetchTask with deserializer typeinfo: " - + serde.getObjectInspector().getTypeName()); + + serde.getObjectInspector().getTypeName()); LOG.debug("deserializer properties: " + tmp.getProperties()); } @@ -303,8 +303,7 @@ return getRecordReader(); } - currRecReader = inputFormat.getRecordReader(inputSplits[splitNum++], job, - Reporter.NULL); + currRecReader = inputFormat.getRecordReader(inputSplits[splitNum++], job, Reporter.NULL); key = currRecReader.createKey(); value = currRecReader.createValue(); return currRecReader; @@ -363,17 +362,17 @@ } /** - * used for bucket map join. there is a hack for getting partitionDesc. - * bucket map join right now only allow one partition present in bucket map join. + * used for bucket map join. there is a hack for getting partitionDesc. bucket map join right now + * only allow one partition present in bucket map join. */ - public void setupContext (Iterator iterPath, Iterator iterPartDesc) { + public void setupContext(Iterator iterPath, Iterator iterPartDesc) { this.iterPath = iterPath; this.iterPartDesc = iterPartDesc; - if(iterPartDesc == null) { + if (iterPartDesc == null) { if (work.getTblDir() != null) { this.currTbl = work.getTblDesc(); } else { - //hack, get the first. + // hack, get the first. List listParts = work.getPartDesc(); currPart = listParts.get(0); } @@ -387,14 +386,19 @@ Deserializer serde = tbl.getDeserializerClass().newInstance(); serde.initialize(job, tbl.getProperties()); return serde.getObjectInspector(); - } else { + } else if (work.getPartDesc() != null) { List listParts = work.getPartDesc(); + if(listParts.size() == 0) { + return null; + } currPart = listParts.get(0); serde = currPart.getTableDesc().getDeserializerClass().newInstance(); serde.initialize(job, currPart.getTableDesc().getProperties()); setPrtnDesc(); currPart = null; return rowObjectInspector; + } else { + return null; } } catch (Exception e) { throw new HiveException("Failed with exception " + e.getMessage() @@ -403,21 +407,20 @@ } /** - * Lists status for all files under a given path. Whether or not - * this is recursive depends on the setting of - * job configuration parameter mapred.input.dir.recursive. + * Lists status for all files under a given path. Whether or not this is recursive depends on the + * setting of job configuration parameter mapred.input.dir.recursive. * - * @param fs file system + * @param fs + * file system * - * @param p path in file system + * @param p + * path in file system * * @return list of file status entries */ - private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) - throws IOException { + private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) throws IOException { HiveConf hiveConf = new HiveConf(job, FetchOperator.class); - boolean recursive = - hiveConf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE); + boolean recursive = hiveConf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE); if (!recursive) { return fs.listStatus(p); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (working copy) @@ -44,6 +44,9 @@ import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; +import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -234,7 +237,7 @@ memoryMXBean = ManagementFactory.getMemoryMXBean(); long startTime = System.currentTimeMillis(); console.printInfo(Utilities.now() - + "\tStarting to luaunch local task to process map join;\tmaximum memory = " + + "\tStarting to launch local task to process map join;\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); fetchOperators = new HashMap(); Map fetchOpJobConfMap = new HashMap(); @@ -286,6 +289,12 @@ setUpFetchOpContext(fetchOp, alias, bigTableBucket); } + if (fetchOp.isEmptyTable()) { + //generate empty hashtable for empty table + this.generateDummyHashTable(alias, bigTableBucket); + continue; + } + // get the root operator Operator forwardOp = work.getAliasToWork().get(alias); // walk through the operator tree @@ -341,7 +350,8 @@ // initilize all forward operator for (Map.Entry entry : fetchOperators.entrySet()) { // get the forward op - Operator forwardOp = work.getAliasToWork().get(entry.getKey()); + String alias = entry.getKey(); + Operator forwardOp = work.getAliasToWork().get(alias); // put the exe context into all the operators forwardOp.setExecContext(execContext); @@ -353,12 +363,51 @@ jobConf = job; } // initialize the forward operator - forwardOp.initialize(jobConf, new ObjectInspector[] {fetchOp.getOutputObjectInspector()}); - l4j.info("fetchoperator for " + entry.getKey() + " initialized"); + ObjectInspector objectInspector = fetchOp.getOutputObjectInspector(); + if (objectInspector != null) { + forwardOp.initialize(jobConf, new ObjectInspector[] {objectInspector}); + l4j.info("fetchoperator for " + entry.getKey() + " initialized"); + } else { + fetchOp.setEmptyTable(true); + } } } + private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException { + // find the (byte)tag for the map join(HashTableSinkOperator) + Operator parentOp = work.getAliasToWork().get(alias); + Operator childOp = parentOp.getChildOperators().get(0); + while ((childOp != null) && (!(childOp instanceof HashTableSinkOperator))) { + parentOp = childOp; + assert parentOp.getChildOperators().size() == 1; + childOp = parentOp.getChildOperators().get(0); + } + if (childOp == null) { + throw new HiveException( + "Cannot find HashTableSink op by tracing down the table scan operator tree"); + } + byte tag = (byte) childOp.getParentOperators().indexOf(parentOp); + // generate empty hashtable for this (byte)tag + String tmpURI = this.getWork().getTmpFileURI(); + HashMapWrapper hashTable = + new HashMapWrapper(); + + if (bigBucketFileName == null || bigBucketFileName.length() == 0) { + bigBucketFileName = "-"; + } + String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName); + console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath); + Path path = new Path(tmpURIPath); + FileSystem fs = path.getFileSystem(job); + File file = new File(path.toUri().getPath()); + fs.create(path); + long fileLength = hashTable.flushMemoryCacheToPersistent(file); + console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: " + + fileLength); + hashTable.close(); + } + private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile) throws Exception { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (working copy) @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; @@ -144,7 +145,7 @@ private void loadHashTable() throws HiveException { boolean localMode = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPJT).equals("local"); - String tmpURI = null; + String baseDir = null; HashMapWrapper hashtable; Byte pos; @@ -161,57 +162,36 @@ try { if (localMode) { - LOG.info("******* Load from tmp file uri ***"); - tmpURI = this.getExecContext().getLocalWork().getTmpFileURI(); - for (Map.Entry> entry : mapJoinTables - .entrySet()) { - pos = entry.getKey(); - hashtable = entry.getValue(); - String filePath = Utilities.generatePath(tmpURI, pos, currentFileName); - Path path = new Path(filePath); - LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString()); - - hashtable.initilizePersistentHash(path.toUri().getPath()); - } + baseDir = this.getExecContext().getLocalWork().getTmpFileURI(); } else { - - Path[] localFiles = DistributedCache.getLocalCacheFiles(this.hconf); - - for (Map.Entry> entry : mapJoinTables - .entrySet()) { - pos = entry.getKey(); - hashtable = entry.getValue(); - String suffix = Utilities.generateFileName(pos, currentFileName); - LOG.info("Looking for hashtable file with suffix: " + suffix); - - boolean found = false; - for (int i = 0; i < localFiles.length; i++) { - Path path = localFiles[i]; - - if (path.toString().endsWith(suffix)) { - LOG.info("Matching suffix with cached file:" + path.toString()); - LOG.info("\tInitializing the hashtable by cached file:" + path.toString()); - hashtable.initilizePersistentHash(path.toString()); - found = true; - LOG.info("\tLoad back 1 hashtable file from distributed cache:" + path.toString()); - break; - } + Path[] localArchives; + String stageID = this.getExecContext().getLocalWork().getStageID(); + String suffix = Utilities.generateTarFileName(stageID); + FileSystem localFs = FileSystem.getLocal(hconf); + localArchives = DistributedCache.getLocalCacheArchives(this.hconf); + Path archive; + for (int j = 0; j < localArchives.length; j++) { + archive = localArchives[j]; + if (!archive.getName().endsWith(suffix)) { + continue; } - if (!found) { - LOG.error("Load nothing from Distributed Cache"); - throw new HiveException(); - } + Path archiveLocalLink = archive.makeQualified(localFs); + baseDir = archiveLocalLink.toUri().getPath(); } - } + for (Map.Entry> entry : mapJoinTables + .entrySet()) { + pos = entry.getKey(); + hashtable = entry.getValue(); + String filePath = Utilities.generatePath(baseDir, pos, currentFileName); + Path path = new Path(filePath); + LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString()); + hashtable.initilizePersistentHash(path.toUri().getPath()); + } } catch (Exception e) { - e.printStackTrace(); - LOG.error("Load Hash Table error"); - - throw new HiveException(); + LOG.error("Load Distributed Cache Error"); + throw new HiveException(e.getMessage()); } - - } @Override Index: ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (working copy) @@ -1,20 +0,0 @@ -package org.apache.hadoop.hive.ql.exec; - -import org.apache.hadoop.fs.Path; - -public class PathUtil { - public static String suffix=".hashtable"; - public static String generatePath(String baseURI,Byte tag,String bigBucketFileName){ - String path = new String(baseURI+Path.SEPARATOR+"-"+tag+"-"+bigBucketFileName+suffix); - return path; - } - public static String generateFileName(Byte tag,String bigBucketFileName){ - String fileName = new String("-"+tag+"-"+bigBucketFileName+suffix); - return fileName; - } - - public static String generateTmpURI(String baseURI,String id){ - String tmpFileURI = new String(baseURI+Path.SEPARATOR+"HashTable-"+id); - return tmpFileURI; - } -} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -49,6 +49,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; @@ -538,6 +539,7 @@ throw new RuntimeException(e.getMessage()); } + // No-Op - we don't really write anything here .. job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); @@ -560,23 +562,21 @@ if (StringUtils.isNotBlank(addedFiles)) { initializeFiles("tmpfiles", addedFiles); } - // Transfer HIVEADDEDARCHIVES to "tmparchives" so hadoop understands it - String addedArchives = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDARCHIVES); - if (StringUtils.isNotBlank(addedArchives)) { - initializeFiles("tmparchives", addedArchives); - } - int returnVal = 0; RunningJob rj = null; - boolean noName = StringUtils.isEmpty(HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJOBNAME)); if (noName) { // This is for a special case to ensure unit tests pass HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, "JOB" + Utilities.randGen.nextInt()); } - try { - // propagate the file to distributed cache + String addedArchives = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDARCHIVES); + // Transfer HIVEADDEDARCHIVES to "tmparchives" so hadoop understands it + if (StringUtils.isNotBlank(addedArchives)) { + initializeFiles("tmparchives", addedArchives); + } + + try{ MapredLocalWork localwork = work.getMapLocalWork(); if (localwork != null) { boolean localMode = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local"); @@ -587,32 +587,39 @@ FileSystem hdfs = hdfsPath.getFileSystem(job); FileSystem localFS = localPath.getFileSystem(job); FileStatus[] hashtableFiles = localFS.listStatus(localPath); - for (int i = 0; i < hashtableFiles.length; i++) { - FileStatus file = hashtableFiles[i]; - Path path = file.getPath(); - String fileName = path.getName(); - String hdfsFile = hdfsPath + Path.SEPARATOR + fileName; + int fileNumber = hashtableFiles.length; + String[] fileNames = new String[fileNumber]; - LOG.info("Upload 1 HashTable from" + path + " to: " + hdfsFile); - Path hdfsFilePath = new Path(hdfsFile); - - hdfs.copyFromLocalFile(path, hdfsFilePath); - short replication = (short) job.getInt("mapred.submit.replication", 10); - hdfs.setReplication(hdfsFilePath, replication); + for ( int i = 0; i < fileNumber; i++){ + fileNames[i] = hashtableFiles[i].getPath().getName(); } - FileStatus[] hashtableRemoteFiles = hdfs.listStatus(hdfsPath); - for (int i = 0; i < hashtableRemoteFiles.length; i++) { - FileStatus file = hashtableRemoteFiles[i]; - Path path = file.getPath(); - DistributedCache.addCacheFile(path.toUri(), job); - LOG.info("add 1 hashtable file to distributed cache: " + path.toUri()); - } + //package and compress all the hashtable files to an archive file + String parentDir = localPath.toUri().getPath(); + String stageId = this.getId(); + String archiveFileURI = Utilities.generateTarURI(parentDir, stageId); + String archiveFileName = Utilities.generateTarFileName(stageId); + localwork.setStageID(stageId); + + FileUtils.tar(parentDir, fileNames,archiveFileName); + Path archivePath = new Path(archiveFileURI); + LOG.info("Archive "+ hashtableFiles.length+" hash table files to " + archiveFileURI); + + //upload archive file to hdfs + String hdfsFile =Utilities.generateTarURI(hdfsPath, stageId); + Path hdfsFilePath = new Path(hdfsFile); + short replication = (short) job.getInt("mapred.submit.replication", 10); + hdfs.setReplication(hdfsFilePath, replication); + hdfs.copyFromLocalFile(archivePath, hdfsFilePath); + LOG.info("Upload 1 archive file from" + archivePath + " to: " + hdfsFilePath); + + //add the archive file to distributed cache + DistributedCache.createSymlink(job); + DistributedCache.addCacheArchive(hdfsFilePath.toUri(), job); + LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri()); } } - - addInputPaths(job, work, emptyScratchDirStr); Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI()); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -1572,13 +1572,13 @@ public static String suffix = ".hashtable"; public static String generatePath(String baseURI, Byte tag, String bigBucketFileName) { - String path = new String(baseURI + Path.SEPARATOR + "-" + tag + "-" + bigBucketFileName + String path = new String(baseURI + Path.SEPARATOR + "MapJoin-" + tag + "-" + bigBucketFileName + suffix); return path; } public static String generateFileName(Byte tag, String bigBucketFileName) { - String fileName = new String("-" + tag + "-" + bigBucketFileName + suffix); + String fileName = new String("MapJoin-" + tag + "-" + bigBucketFileName + suffix); return fileName; } @@ -1587,6 +1587,26 @@ return tmpFileURI; } + public static String generateTarURI(String baseURI, String filename) { + String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename+".tar.gz"); + return tmpFileURI; + } + + public static String generateTarURI(Path baseURI, String filename) { + String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename+".tar.gz"); + return tmpFileURI; + } + + public static String generateTarFileName(String name) { + String tmpFileURI = new String(name+".tar.gz"); + return tmpFileURI; + } + + public static String generatePath(Path baseURI, String filename) { + String path = new String(baseURI + Path.SEPARATOR + filename); + return path; + } + public static String now() { Calendar cal = Calendar.getInstance(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java (working copy) @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.metadata.HiveException; /** * ConditionalResolverSkewJoin. @@ -126,9 +125,6 @@ for (int i = 0; i < fstatus.length; i++) { fileSize += fstatus[i].getLen(); } - if (fileSize == 0) { - throw new HiveException("Input file size is 0"); - } // put into list and sorted set aliasList.add(alias); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (revision 1036562) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (working copy) @@ -43,6 +43,7 @@ private boolean inputFileChangeSensitive; private BucketMapJoinContext bucketMapjoinContext; private String tmpFileURI; + private String stageID; private List> dummyParentOp ; @@ -81,7 +82,15 @@ return aliasToWork; } + public String getStageID() { + return stageID; + } + public void setStageID(String stageID) { + this.stageID = stageID; + } + + public void setAliasToWork( final LinkedHashMap> aliasToWork) { this.aliasToWork = aliasToWork;