Index: ql/src/test/results/clientpositive/join_empty.q.out =================================================================== --- ql/src/test/results/clientpositive/join_empty.q.out (revision 0) +++ ql/src/test/results/clientpositive/join_empty.q.out (revision 0) @@ -0,0 +1,46 @@ +PREHOOK: query: create table srcpart_empty(key int, value string) partitioned by (ds string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table srcpart_empty(key int, value string) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcpart_empty +PREHOOK: query: create table src2_empty (key int, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table src2_empty (key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@src2_empty +PREHOOK: query: select /*+mapjoin(a)*/ a.key, b.value from srcpart_empty a join src b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-39_138_107997606390957326/-mr-10000 +POSTHOOK: query: select /*+mapjoin(a)*/ a.key, b.value from srcpart_empty a join src b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-39_138_107997606390957326/-mr-10000 +PREHOOK: query: select /*+mapjoin(a)*/ a.key, b.value from src2_empty a join src b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src2_empty +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-43_473_3587274534534962486/-mr-10000 +POSTHOOK: query: select /*+mapjoin(a)*/ a.key, b.value from src2_empty a join src b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src2_empty +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-43_473_3587274534534962486/-mr-10000 +PREHOOK: query: select a.key, b.value from srcpart_empty a join src b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-47_872_1531725911506590422/-mr-10000 +POSTHOOK: query: select a.key, b.value from srcpart_empty a join src b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-47_872_1531725911506590422/-mr-10000 +PREHOOK: query: select a.key, b.value from src2_empty a join src b on a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src2_empty +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-52_182_4671419724484282586/-mr-10000 +POSTHOOK: query: select a.key, b.value from src2_empty a join src b on a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src2_empty +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-22_17-44-52_182_4671419724484282586/-mr-10000 Index: ql/src/test/queries/clientpositive/join_empty.q =================================================================== --- ql/src/test/queries/clientpositive/join_empty.q (revision 0) +++ ql/src/test/queries/clientpositive/join_empty.q (revision 0) @@ -0,0 +1,10 @@ +create table srcpart_empty(key int, value string) partitioned by (ds string); +create table src2_empty (key int, value string); + +select /*+mapjoin(a)*/ a.key, b.value from srcpart_empty a join src b on a.key=b.key; +select /*+mapjoin(a)*/ a.key, b.value from src2_empty a join src b on a.key=b.key; + +set hive.mapred.mode=nonstrict; +set hive.auto.convert.join = true; +select a.key, b.value from srcpart_empty a join src b on a.key=b.key; +select a.key, b.value from src2_empty a join src b on a.key=b.key; \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1038849) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -582,6 +582,7 @@ if (aliasPartnDesc == null) { aliasPartnDesc = new PartitionDesc(Utilities.getTableDesc(parseCtx .getTopToTable().get(topOp)), null); + } plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (revision 1038849) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (working copy) @@ -184,10 +184,6 @@ pathToAliases.remove(path); } - if (pathSet.size() == 0) { - throw new SemanticException("No input path for alias " + alias); - } - // create fetch work FetchWork fetchWork = null; List partDir = new ArrayList(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 1038849) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -64,6 +64,26 @@ static Log LOG = LogFactory.getLog(FetchOperator.class.getName()); static LogHelper console = new LogHelper(LOG); + private boolean isEmptyTable; + private boolean isNativeTable; + private FetchWork work; + private int splitNum; + private PartitionDesc currPart; + private TableDesc currTbl; + private boolean tblDataDone; + + private transient RecordReader currRecReader; + private transient InputSplit[] inputSplits; + private transient InputFormat inputFormat; + private transient JobConf job; + private transient WritableComparable key; + private transient Writable value; + private transient Deserializer serde; + private transient Iterator iterPath; + private transient Iterator iterPartDesc; + private transient Path currPath; + private transient StructObjectInspector rowObjectInspector; + private transient Object[] rowWithPart; public FetchOperator() { } @@ -123,38 +143,24 @@ this.tblDataDone = tblDataDone; } - private boolean isNativeTable; - private FetchWork work; - private int splitNum; - private PartitionDesc currPart; - private TableDesc currTbl; - private boolean tblDataDone; + public boolean isEmptyTable() { + return isEmptyTable; + } - private transient RecordReader currRecReader; - private transient InputSplit[] inputSplits; - private transient InputFormat inputFormat; - private transient JobConf job; - private transient WritableComparable key; - private transient Writable value; - private transient Deserializer serde; - private transient Iterator iterPath; - private transient Iterator iterPartDesc; - private transient Path currPath; - private transient StructObjectInspector rowObjectInspector; - private transient Object[] rowWithPart; + public void setEmptyTable(boolean isEmptyTable) { + this.isEmptyTable = isEmptyTable; + } /** * A cache of InputFormat instances. */ - private static Map> inputFormats = - new HashMap>(); + private static Map> inputFormats = new HashMap>(); - static InputFormat getInputFormatFromCache( - Class inputFormatClass, Configuration conf) throws IOException { + static InputFormat getInputFormatFromCache(Class inputFormatClass, + Configuration conf) throws IOException { if (!inputFormats.containsKey(inputFormatClass)) { try { - InputFormat newInstance = - (InputFormat) ReflectionUtils + InputFormat newInstance = (InputFormat) ReflectionUtils .newInstance(inputFormatClass, conf); inputFormats.put(inputFormatClass, newInstance); } catch (Exception e) { @@ -169,10 +175,7 @@ List partNames = new ArrayList(); List partValues = new ArrayList(); - String pcols = currPart - .getTableDesc() - .getProperties() - .getProperty( + String pcols = currPart.getTableDesc().getProperties().getProperty( org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS); LinkedHashMap partSpec = currPart.getPartSpec(); @@ -181,16 +184,14 @@ for (String key : partKeys) { partNames.add(key); partValues.add(partSpec.get(key)); - partObjectInspectors - .add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + partObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); } StructObjectInspector partObjectInspector = ObjectInspectorFactory .getStandardStructObjectInspector(partNames, partObjectInspectors); rowObjectInspector = (StructObjectInspector) serde.getObjectInspector(); rowWithPart[1] = partValues; - rowObjectInspector = ObjectInspectorFactory - .getUnionStructObjectInspector(Arrays + rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(Arrays .asList(new StructObjectInspector[] {rowObjectInspector, partObjectInspector})); } @@ -226,8 +227,7 @@ } return; } else { - iterPath = FetchWork.convertStringToPathArray(work.getPartDir()) - .iterator(); + iterPath = FetchWork.convertStringToPathArray(work.getPartDir()).iterator(); iterPartDesc = work.getPartDesc().iterator(); } } @@ -235,15 +235,16 @@ while (iterPath.hasNext()) { Path nxt = iterPath.next(); PartitionDesc prt = null; - if(iterPartDesc != null) + if (iterPartDesc != null) { prt = iterPartDesc.next(); + } FileSystem fs = nxt.getFileSystem(job); if (fs.exists(nxt)) { FileStatus[] fStats = listStatusUnderPath(fs, nxt); for (FileStatus fStat : fStats) { if (fStat.getLen() > 0) { currPath = nxt; - if(iterPartDesc != null) { + if (iterPartDesc != null) { currPart = prt; } return; @@ -265,14 +266,13 @@ // to the default file system - which may or may not be online during pure // metadata // operations - job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils - .escapeString(currPath.toString())); + job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath + .toString())); PartitionDesc tmp; if (currTbl == null) { tmp = currPart; - } - else { + } else { tmp = new PartitionDesc(currTbl, null); } @@ -283,9 +283,9 @@ serde = tmp.getDeserializerClass().newInstance(); serde.initialize(job, tmp.getProperties()); - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Creating fetchTask with deserializer typeinfo: " - + serde.getObjectInspector().getTypeName()); + + serde.getObjectInspector().getTypeName()); LOG.debug("deserializer properties: " + tmp.getProperties()); } @@ -303,8 +303,7 @@ return getRecordReader(); } - currRecReader = inputFormat.getRecordReader(inputSplits[splitNum++], job, - Reporter.NULL); + currRecReader = inputFormat.getRecordReader(inputSplits[splitNum++], job, Reporter.NULL); key = currRecReader.createKey(); value = currRecReader.createValue(); return currRecReader; @@ -363,17 +362,17 @@ } /** - * used for bucket map join. there is a hack for getting partitionDesc. - * bucket map join right now only allow one partition present in bucket map join. + * used for bucket map join. there is a hack for getting partitionDesc. bucket map join right now + * only allow one partition present in bucket map join. */ - public void setupContext (Iterator iterPath, Iterator iterPartDesc) { + public void setupContext(Iterator iterPath, Iterator iterPartDesc) { this.iterPath = iterPath; this.iterPartDesc = iterPartDesc; - if(iterPartDesc == null) { + if (iterPartDesc == null) { if (work.getTblDir() != null) { this.currTbl = work.getTblDesc(); } else { - //hack, get the first. + // hack, get the first. List listParts = work.getPartDesc(); currPart = listParts.get(0); } @@ -387,14 +386,19 @@ Deserializer serde = tbl.getDeserializerClass().newInstance(); serde.initialize(job, tbl.getProperties()); return serde.getObjectInspector(); - } else { + } else if (work.getPartDesc() != null) { List listParts = work.getPartDesc(); + if(listParts.size() == 0) { + return null; + } currPart = listParts.get(0); serde = currPart.getTableDesc().getDeserializerClass().newInstance(); serde.initialize(job, currPart.getTableDesc().getProperties()); setPrtnDesc(); currPart = null; return rowObjectInspector; + } else { + return null; } } catch (Exception e) { throw new HiveException("Failed with exception " + e.getMessage() @@ -403,21 +407,20 @@ } /** - * Lists status for all files under a given path. Whether or not - * this is recursive depends on the setting of - * job configuration parameter mapred.input.dir.recursive. + * Lists status for all files under a given path. Whether or not this is recursive depends on the + * setting of job configuration parameter mapred.input.dir.recursive. * - * @param fs file system + * @param fs + * file system * - * @param p path in file system + * @param p + * path in file system * * @return list of file status entries */ - private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) - throws IOException { + private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) throws IOException { HiveConf hiveConf = new HiveConf(job, FetchOperator.class); - boolean recursive = - hiveConf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE); + boolean recursive = hiveConf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE); if (!recursive) { return fs.listStatus(p); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (revision 1038849) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (working copy) @@ -44,6 +44,9 @@ import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; +import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -234,7 +237,7 @@ memoryMXBean = ManagementFactory.getMemoryMXBean(); long startTime = System.currentTimeMillis(); console.printInfo(Utilities.now() - + "\tStarting to luaunch local task to process map join;\tmaximum memory = " + + "\tStarting to launch local task to process map join;\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); fetchOperators = new HashMap(); Map fetchOpJobConfMap = new HashMap(); @@ -286,6 +289,12 @@ setUpFetchOpContext(fetchOp, alias, bigTableBucket); } + if (fetchOp.isEmptyTable()) { + //generate empty hashtable for empty table + this.generateDummyHashTable(alias, bigTableBucket); + continue; + } + // get the root operator Operator forwardOp = work.getAliasToWork().get(alias); // walk through the operator tree @@ -341,7 +350,8 @@ // initilize all forward operator for (Map.Entry entry : fetchOperators.entrySet()) { // get the forward op - Operator forwardOp = work.getAliasToWork().get(entry.getKey()); + String alias = entry.getKey(); + Operator forwardOp = work.getAliasToWork().get(alias); // put the exe context into all the operators forwardOp.setExecContext(execContext); @@ -353,12 +363,51 @@ jobConf = job; } // initialize the forward operator - forwardOp.initialize(jobConf, new ObjectInspector[] {fetchOp.getOutputObjectInspector()}); - l4j.info("fetchoperator for " + entry.getKey() + " initialized"); + ObjectInspector objectInspector = fetchOp.getOutputObjectInspector(); + if (objectInspector != null) { + forwardOp.initialize(jobConf, new ObjectInspector[] {objectInspector}); + l4j.info("fetchoperator for " + entry.getKey() + " initialized"); + } else { + fetchOp.setEmptyTable(true); + } } } + private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException { + // find the (byte)tag for the map join(HashTableSinkOperator) + Operator parentOp = work.getAliasToWork().get(alias); + Operator childOp = parentOp.getChildOperators().get(0); + while ((childOp != null) && (!(childOp instanceof HashTableSinkOperator))) { + parentOp = childOp; + assert parentOp.getChildOperators().size() == 1; + childOp = parentOp.getChildOperators().get(0); + } + if (childOp == null) { + throw new HiveException( + "Cannot find HashTableSink op by tracing down the table scan operator tree"); + } + byte tag = (byte) childOp.getParentOperators().indexOf(parentOp); + // generate empty hashtable for this (byte)tag + String tmpURI = this.getWork().getTmpFileURI(); + HashMapWrapper hashTable = + new HashMapWrapper(); + + if (bigBucketFileName == null || bigBucketFileName.length() == 0) { + bigBucketFileName = "-"; + } + String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName); + console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath); + Path path = new Path(tmpURIPath); + FileSystem fs = path.getFileSystem(job); + File file = new File(path.toUri().getPath()); + fs.create(path); + long fileLength = hashTable.flushMemoryCacheToPersistent(file); + console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: " + + fileLength); + hashTable.close(); + } + private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile) throws Exception { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java (revision 1038849) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java (working copy) @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.metadata.HiveException; /** * ConditionalResolverSkewJoin. @@ -126,9 +125,6 @@ for (int i = 0; i < fstatus.length; i++) { fileSize += fstatus[i].getLen(); } - if (fileSize == 0) { - throw new HiveException("Input file size is 0"); - } // put into list and sorted set aliasList.add(alias);