diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index afbeccb..8876b3d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -139,13 +139,21 @@ public Object process(Node nd, Stack stack, if (!context.currentMapJoinOperators.isEmpty()) { for (MapJoinOperator mj: context.currentMapJoinOperators) { LOG.debug("Processing map join: " + mj); + // remember the mapping in case we scan another branch of the mapjoin later if (!context.mapJoinWorkMap.containsKey(mj)) { - List workItems = new LinkedList(); - workItems.add(work); - context.mapJoinWorkMap.put(mj, workItems); + context.mapJoinWorkMap.put(mj, new LinkedList()); + } + List workItems = context.mapJoinWorkMap.get(mj); + + // Don't process the work if it is already added. This could happen if, for instance, + // the MapJoin operator has a LateralViewForward operator as its ancestor, in which + // case the former could be reached via the latter through two paths. + // See HIVE-12629 + if (workItems.contains(work)) { + continue; } else { - context.mapJoinWorkMap.get(mj).add(work); + workItems.add(work); } /* diff --git a/ql/src/test/queries/clientpositive/spark_lateral_view_mapjoin.q b/ql/src/test/queries/clientpositive/spark_lateral_view_mapjoin.q new file mode 100644 index 0000000..f2917b7 --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_lateral_view_mapjoin.q @@ -0,0 +1,17 @@ +set hive.execution.engine=spark; +set hive.auto.convert.join=true; + +-- Testing the case when a MJ operator has a LVF operator +-- as its ancestor. See HIVE-12629. + +EXPLAIN +SELECT count(*) FROM +(SELECT key FROM src GROUP BY key) a JOIN +(SELECT key FROM src LATERAL VIEW json_tuple(value, 'ss') v AS ss) b +ON a.key = b.key; + +SELECT count(*) FROM +(SELECT key FROM src GROUP BY key) a JOIN +(SELECT key FROM src LATERAL VIEW json_tuple(value, 'ss') v AS ss) b +ON a.key = b.key; + diff --git a/ql/src/test/results/clientpositive/spark/spark_lateral_view_mapjoin.q.out b/ql/src/test/results/clientpositive/spark/spark_lateral_view_mapjoin.q.out new file mode 100644 index 0000000..d2c729e --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_lateral_view_mapjoin.q.out @@ -0,0 +1,171 @@ +PREHOOK: query: EXPLAIN +SELECT count(*) FROM +(SELECT key FROM src GROUP BY key) a JOIN +(SELECT key FROM src LATERAL VIEW json_tuple(value, 'ss') v AS ss) b +ON a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT count(*) FROM +(SELECT key FROM src GROUP BY key) a JOIN +(SELECT key FROM src LATERAL VIEW json_tuple(value, 'ss') v AS ss) b +ON a.key = b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Local Work: + Map Reduce Local Work + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + + Stage: Stage-1 + Spark + Edges: + Reducer 4 <- Map 3 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 3 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Lateral View Forward + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Lateral View Join Operator + outputColumnNames: _col0, _col5 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 0 Reducer 2 + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Select Operator + expressions: value (type: string), 'ss' (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + UDTF Operator + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + function name: json_tuple + Lateral View Join Operator + outputColumnNames: _col0, _col5 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 0 Reducer 2 + Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Local Work: + Map Reduce Local Work + Reducer 4 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT count(*) FROM +(SELECT key FROM src GROUP BY key) a JOIN +(SELECT key FROM src LATERAL VIEW json_tuple(value, 'ss') v AS ss) b +ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT count(*) FROM +(SELECT key FROM src GROUP BY key) a JOIN +(SELECT key FROM src LATERAL VIEW json_tuple(value, 'ss') v AS ss) b +ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +500