diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index bcffdbc..5a87bd6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; import org.apache.hadoop.hive.ql.lib.Node; @@ -369,6 +370,26 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition); int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits().getNumBuckets(); + int size = -1; + for (Operator parentOp : joinOp.getParentOperators()) { + // each side better have 0 or more RS. if either side is unbalanced, cannot convert. + // This is a workaround for now. Right fix would be to refactor code in the + // MapRecordProcessor and ReduceRecordProcessor with respect to the sources. + Set set = + OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(), + ReduceSinkOperator.class); + if (size < 0) { + size = set.size(); + continue; + } + + if (((size > 0) && (set.size() > 0)) || ((size == 0) && (set.size() == 0))) { + continue; + } else { + return false; + } + } + // the sort and bucket cols have to match on both sides for this // transformation of the join operation for (Operator parentOp : joinOp.getParentOperators()) { diff --git ql/src/test/queries/clientpositive/tez_smb_1.q ql/src/test/queries/clientpositive/tez_smb_1.q index 580672f..62a415b 100644 --- ql/src/test/queries/clientpositive/tez_smb_1.q +++ ql/src/test/queries/clientpositive/tez_smb_1.q @@ -34,6 +34,37 @@ set hive.auto.convert.join.noconditionaltask.size=500; explain select count(*) from tab s1 join tab s3 on s1.key=s3.key; +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select count(*) from +tab vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.key=vt2.id; + +select count(*) from +tab vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.key=vt2.id; + +explain +select count(*) from +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +join +tab vt1 +where vt1.key=vt2.id; + +select count(*) from +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +join +tab vt1 +where vt1.key=vt2.id; + set hive.auto.convert.join=false; explain diff --git ql/src/test/results/clientpositive/tez/tez_smb_1.q.out ql/src/test/results/clientpositive/tez/tez_smb_1.q.out index d970bd9..e60d5af 100644 --- ql/src/test/results/clientpositive/tez/tez_smb_1.q.out +++ ql/src/test/results/clientpositive/tez/tez_smb_1.q.out @@ -179,6 +179,284 @@ STAGE PLANS: PREHOOK: query: explain select count(*) from +tab vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.key=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from +tab vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.key=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map 5 + Map Operator Tree: + TableScan + alias: vt1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: int) + sort order: + + Map-reduce partition columns: key (type: int) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 key (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col6 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = _col6) (type: boolean) + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 4 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +tab vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.key=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +tab vt1 +join +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +where vt1.key=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +480 +PREHOOK: query: explain +select count(*) from +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +join +tab vt1 +where vt1.key=vt2.id +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +join +tab vt1 +where vt1.key=vt2.id +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map 5 + Map Operator Tree: + TableScan + alias: vt1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: int) + sort order: + + Map-reduce partition columns: key (type: int) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 key (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col1 = _col0) (type: boolean) + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 4 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +join +tab vt1 +where vt1.key=vt2.id +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select rt2.id from +(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2 +join +tab vt1 +where vt1.key=vt2.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +480 +PREHOOK: query: explain +select count(*) from (select rt1.id from (select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1 join