diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index c951fca..34b063d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -19,7 +19,6 @@ import java.io.File; import java.io.IOException; -import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; import java.lang.management.ManagementFactory; @@ -46,7 +45,6 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.BucketMatcher; import org.apache.hadoop.hive.ql.exec.FetchOperator; -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SecureCmdDoAs; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -54,7 +52,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; @@ -340,6 +337,12 @@ public void startForward(String bigTableBucket) throws Exception { private void startForward(boolean inputFileChangeSenstive, String bigTableBucket) throws Exception { + for (Operator source : work.getAliasToWork().values()) { + source.reset(); + } + if (inputFileChangeSenstive) { + execContext.setCurrentBigBucketFile(bigTableBucket); + } for (Map.Entry entry : fetchOperators.entrySet()) { String alias = entry.getKey(); FetchOperator fetchOp = entry.getValue(); @@ -351,13 +354,6 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket // get the root operator Operator forwardOp = work.getAliasToWork().get(alias); - if (fetchOp.isEmptyTable()) { - //generate empty hashtable for empty table - this.generateDummyHashTable(alias, bigTableBucket); - forwardOp.close(false); - continue; - } - // walk through the operator tree while (!forwardOp.getDone()) { InspectableObject row = fetchOp.getNextRow(); @@ -366,11 +362,10 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket } forwardOp.processOp(row.o, 0); } - if (inputFileChangeSenstive) { - execContext.setCurrentBigBucketFile(bigTableBucket); - forwardOp.reset(); - } - forwardOp.close(false); + forwardOp.flush(); + } + for (Operator source : work.getAliasToWork().values()) { + source.close(false); } } @@ -421,43 +416,6 @@ private void initializeOperators(Map fetchOpJobConfMap) } } - private void generateDummyHashTable(String alias, String bigBucketFileName) - throws HiveException,IOException { - LOG.debug("generating dummy for " + alias); - // find the (byte)tag for the map join(HashTableSinkOperator) - Operator parentOp = work.getAliasToWork().get(alias); - Operator childOp = parentOp.getChildOperators().get(0); - while ((childOp != null) && (!(childOp instanceof HashTableSinkOperator))) { - parentOp = childOp; - assert parentOp.getChildOperators().size() == 1; - childOp = parentOp.getChildOperators().get(0); - } - if (childOp == null) { - throw new HiveException( - "Cannot find HashTableSink op by tracing down the table scan operator tree"); - } - byte tag = (byte) childOp.getParentOperators().indexOf(parentOp); - - // generate empty hashtable for this (byte)tag - Path tmpPath = this.getWork().getTmpPath(); - - String fileName = work.getBucketFileName(bigBucketFileName); - - HashTableSinkOperator htso = (HashTableSinkOperator)childOp; - Path path = Utilities.generatePath(tmpPath, htso.getConf().getDumpFilePrefix(), - tag, fileName); - console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + path); - FileSystem fs = path.getFileSystem(job); - ObjectOutputStream out = new ObjectOutputStream(fs.create(path)); - try { - MapJoinTableContainerSerDe.persistDummyTable(out); - } finally { - out.close(); - } - console.printInfo(Utilities.now() + "\tUpload 1 File to: " + path + " File size: " - + fs.getFileStatus(path).getLen()); - } - private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile) throws Exception { diff --git ql/src/test/queries/clientpositive/auto_sortmerge_join_11.q ql/src/test/queries/clientpositive/auto_sortmerge_join_11.q index da2e26f..73e98dd 100644 --- ql/src/test/queries/clientpositive/auto_sortmerge_join_11.q +++ ql/src/test/queries/clientpositive/auto_sortmerge_join_11.q @@ -34,3 +34,7 @@ select count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; -- The join is converted to a bucketed mapjoin with a mapjoin hint explain extended select /*+ mapjoin(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; select /*+ mapjoin(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; + +-- HIVE-7023 +explain extended select /* + MAPJOIN(a,b) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key JOIN bucket_big c ON a.key = c.key; +select /* + MAPJOIN(a,b) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key JOIN bucket_big c ON a.key = c.key; diff --git ql/src/test/results/clientpositive/auto_sortmerge_join_11.q.out ql/src/test/results/clientpositive/auto_sortmerge_join_11.q.out index e502e97..97bb81f 100644 Binary files ql/src/test/results/clientpositive/auto_sortmerge_join_11.q.out and ql/src/test/results/clientpositive/auto_sortmerge_join_11.q.out differ