diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index 377f01d..badaaac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -94,7 +94,7 @@ private transient Byte alias; private transient MapJoinTableContainer[] mapJoinTables; - private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; + private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; private static final MapJoinEagerRowContainer EMPTY_ROW_CONTAINER = new MapJoinEagerRowContainer(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java index e243f14..e3d21c3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; @@ -140,7 +139,7 @@ private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFil sink.setParentOperators(new ArrayList>(directWorks)); for (Operator operator : directWorks) { - if (operator instanceof TableScanOperator) { + if (operator != null) { operator.setChildOperators(Arrays.>asList(sink)); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 29fbb03..63f41ae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -394,7 +394,7 @@ private void initializeOperators(Map fetchOpJobConfMap) fetchOperators.put(entry.getKey(), fetchOp); l4j.info("fetchoperator for " + entry.getKey() + " created"); } - // initilize all forward operator + // initialize all forward operator for (Map.Entry entry : fetchOperators.entrySet()) { // get the forward op String alias = entry.getKey(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java index 726d315..c3cc4a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java @@ -213,10 +213,21 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. for (Operator op : dummyOperators) { context.addDummyParentOp(op); } - context.addDirectWorks(mapJoinOp, directOperators); + if (hasAnyDirectFetch(directOperators)) { + context.addDirectWorks(mapJoinOp, directOperators); + } return null; } + private boolean hasAnyDirectFetch(List> directOperators) { + for (Operator operator : directOperators) { + if (operator != null) { + return true; + } + } + return false; + } + public void hasGroupBy(Operator mapJoinOp, LocalMapJoinProcCtx localMapJoinProcCtx) throws Exception { List> childOps = mapJoinOp.getChildOperators(); diff --git ql/src/test/queries/clientpositive/mapjoin_mapjoin.q ql/src/test/queries/clientpositive/mapjoin_mapjoin.q index 87a7bb2..1eb95f6 100644 --- ql/src/test/queries/clientpositive/mapjoin_mapjoin.q +++ ql/src/test/queries/clientpositive/mapjoin_mapjoin.q @@ -6,9 +6,13 @@ set hive.auto.convert.join.noconditionaltask.size=10000; explain select srcpart.key from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key); -explain select count(*) from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) group by ds; +explain +select srcpart.key from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) where srcpart.value > 'val_450'; +select srcpart.key from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) where srcpart.value > 'val_450'; -select count(*) from srcpart join src src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds; +explain +select count(*) from srcpart join src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds; +select count(*) from srcpart join src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds; set hive.mapjoin.lazy.hashtable=false; diff --git ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out index 77f1e09..d79b984 100644 --- ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out +++ ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out @@ -73,16 +73,153 @@ STAGE PLANS: Fetch Operator limit: -1 -PREHOOK: query: explain select count(*) from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) group by ds +PREHOOK: query: explain +select srcpart.key from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) where srcpart.value > 'val_450' PREHOOK: type: QUERY -POSTHOOK: query: explain select count(*) from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) group by ds +POSTHOOK: query: explain +select srcpart.key from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) where srcpart.value > 'val_450' POSTHOOK: type: QUERY STAGE DEPENDENCIES: - Stage-3 is a root stage + Stage-7 is a root stage + Stage-5 depends on stages: Stage-7 Stage-0 is a root stage STAGE PLANS: - Stage: Stage-3 + Stage: Stage-7 + Map Reduce Local Work + Alias -> Map Local Tables: + src + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + src + TableScan + alias: src + Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (value > 'val_450') (type: boolean) + Statistics: Num rows: 19 Data size: 1903 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + condition expressions: + 0 {key} + 1 + keys: + 0 value (type: string) + 1 value (type: string) + + Stage: Stage-5 + Map Reduce + Map Operator Tree: + TableScan + alias: srcpart + Statistics: Num rows: 116 Data size: 23248 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (value > 'val_450') (type: boolean) + Statistics: Num rows: 38 Data size: 7615 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + keys: + 0 value (type: string) + 1 value (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 41 Data size: 8376 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col0} + 1 + keys: + 0 _col0 (type: string) + 1 key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 45 Data size: 9213 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 45 Data size: 9213 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 45 Data size: 9213 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + src + src1 + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + src + src1 + TableScan + alias: src1 + Statistics: Num rows: 2 Data size: 216 Basic stats: COMPLETE Column stats: NONE + + Stage: Stage-0 + Fetch Operator + limit: -1 + +PREHOOK: query: select srcpart.key from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) where srcpart.value > 'val_450' +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select srcpart.key from srcpart join src on (srcpart.value=src.value) join src1 on (srcpart.key=src1.key) where srcpart.value > 'val_450' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +98 +98 +66 +98 +98 +98 +98 +66 +98 +98 +98 +98 +66 +98 +98 +98 +98 +66 +98 +98 +PREHOOK: query: explain +select count(*) from srcpart join src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from srcpart join src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-7 is a root stage + Stage-3 depends on stages: Stage-7 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-7 Map Reduce Map Operator Tree: TableScan @@ -99,51 +236,66 @@ STAGE PLANS: 1 value (type: string) outputColumnNames: _col0, _col2 Statistics: Num rows: 127 Data size: 25572 Basic stats: COMPLETE Column stats: NONE - Map Join Operator - condition map: - Inner Join 0 to 1 - condition expressions: - 0 {_col2} - 1 - keys: - 0 _col0 (type: string) - 1 key (type: string) - outputColumnNames: _col2 - Statistics: Num rows: 139 Data size: 28129 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: _col2 (type: string) - outputColumnNames: _col2 - Statistics: Num rows: 139 Data size: 28129 Basic stats: COMPLETE Column stats: NONE - Group By Operator - aggregations: count() - keys: _col2 (type: string) - mode: hash - outputColumnNames: _col0, _col1 - Statistics: Num rows: 139 Data size: 28129 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Map-reduce partition columns: _col0 (type: string) - Statistics: Num rows: 139 Data size: 28129 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: bigint) + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Local Work: Map Reduce Local Work Alias -> Map Local Tables: src Fetch Operator limit: -1 - src1 - Fetch Operator - limit: -1 Alias -> Map Local Operator Tree: src TableScan alias: src Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + + Stage: Stage-3 + Map Reduce + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {_col2} + 1 + keys: + 0 _col0 (type: string) + 1 key (type: string) + outputColumnNames: _col2 + Statistics: Num rows: 139 Data size: 28129 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col2 (type: string) + outputColumnNames: _col2 + Statistics: Num rows: 139 Data size: 28129 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + keys: _col2 (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 139 Data size: 28129 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 139 Data size: 28129 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Local Work: + Map Reduce Local Work + Alias -> Map Local Tables: + src1 + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: src1 TableScan alias: src1 - Statistics: Num rows: 2 Data size: 216 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) @@ -167,7 +319,7 @@ STAGE PLANS: Fetch Operator limit: -1 -PREHOOK: query: select count(*) from srcpart join src src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds +PREHOOK: query: select count(*) from srcpart join src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Input: default@srcpart @@ -176,7 +328,7 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 #### A masked pattern was here #### -POSTHOOK: query: select count(*) from srcpart join src src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds +POSTHOOK: query: select count(*) from srcpart join src on (srcpart.value=src.value) join src src1 on (srcpart.key=src1.key) group by ds POSTHOOK: type: QUERY POSTHOOK: Input: default@src POSTHOOK: Input: default@srcpart