Index: conf/hive-default.xml.template
===================================================================
--- conf/hive-default.xml.template (revision 1471824)
+++ conf/hive-default.xml.template (working copy)
@@ -842,6 +842,16 @@
+ hive.optimize.mapjoin.mapreduce
+ false
+ If hive.auto.convert.join is off, this parameter does not take
+ affect. If it is on, and if there are map-join jobs followed by a map-reduce
+ job (for e.g a group by), each map-only job is merged with the following
+ map-reduce job.
+
+
+
+
hive.script.auto.progress
false
Whether Hive Tranform/Map/Reduce Clause should automatically send progress information to TaskTracker to avoid the task getting killed because of inactivity. Hive sends progress information when the script is outputting to stderr. This option removes the need of periodically producing stderr messages, but users should be cautious because this may prevent infinite loops in the scripts to be killed by TaskTracker.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1471824)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -494,6 +494,7 @@
HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),
HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD("hive.auto.convert.join.noconditionaltask.size",
10000000L),
+ HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR("hive.optimize.mapjoin.mapreduce", false),
HIVESKEWJOINKEY("hive.skewjoin.key", 100000),
HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M
Index: ql/src/test/results/clientpositive/multiMapJoin1.q.out
===================================================================
--- ql/src/test/results/clientpositive/multiMapJoin1.q.out (revision 1471824)
+++ ql/src/test/results/clientpositive/multiMapJoin1.q.out (working copy)
@@ -285,7 +285,9 @@
POSTHOOK: Lineage: smalltbl2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
POSTHOOK: Lineage: smalltbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
580
-PREHOOK: query: explain
+PREHOOK: query: -- Now run a query with two-way join, which should be converted into a
+-- map-join followed by groupby - two MR jobs overall
+explain
select count(*) FROM
(select bigTbl.key as key, bigTbl.value as value1,
bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
@@ -294,7 +296,9 @@
JOIN
smallTbl2 on (firstjoin.value1 = smallTbl2.value)
PREHOOK: type: QUERY
-POSTHOOK: query: explain
+POSTHOOK: query: -- Now run a query with two-way join, which should be converted into a
+-- map-join followed by groupby - two MR jobs overall
+explain
select count(*) FROM
(select bigTbl.key as key, bigTbl.value as value1,
bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
@@ -469,6 +473,207 @@
POSTHOOK: Lineage: smalltbl2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
POSTHOOK: Lineage: smalltbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
580
+PREHOOK: query: -- Now run a query with two-way join, which should first be converted into a
+-- map-join followed by groupby and then finally into a single MR job.
+
+#### A masked pattern was here ####
+select count(*) FROM
+(select bigTbl.key as key, bigTbl.value as value1,
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key = smallTbl1.key)
+) firstjoin
+JOIN
+smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+group by smallTbl2.key
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Now run a query with two-way join, which should first be converted into a
+-- map-join followed by groupby and then finally into a single MR job.
+
+#### A masked pattern was here ####
+select count(*) FROM
+(select bigTbl.key as key, bigTbl.value as value1,
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key = smallTbl1.key)
+) firstjoin
+JOIN
+smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+group by smallTbl2.key
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: bigtbl.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+ABSTRACT SYNTAX TREE:
+#### A masked pattern was here ####
+
+STAGE DEPENDENCIES:
+ Stage-7 is a root stage
+ Stage-6 depends on stages: Stage-7
+ Stage-0 depends on stages: Stage-6
+
+STAGE PLANS:
+ Stage: Stage-7
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ firstjoin:smalltbl1
+ Fetch Operator
+ limit: -1
+ smalltbl2
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ firstjoin:smalltbl1
+ TableScan
+ alias: smalltbl1
+ HashTable Sink Operator
+ condition expressions:
+ 0 {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+ smalltbl2
+ TableScan
+ alias: smalltbl2
+ HashTable Sink Operator
+ condition expressions:
+ 0
+ 1 {key}
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[value]]
+ Position of Big Table: 0
+
+ Stage: Stage-6
+ Map Reduce
+ Alias -> Map Operator Tree:
+ firstjoin:bigtbl
+ TableScan
+ alias: bigtbl
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key]]
+ 1 [Column[key]]
+ outputColumnNames: _col1
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ outputColumnNames: _col1
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1 {key}
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[value]]
+ outputColumnNames: _col3
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col3
+ type: string
+ outputColumnNames: _col3
+ Group By Operator
+ aggregations:
+ expr: count()
+ bucketGroup: false
+ keys:
+ expr: _col3
+ type: string
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: string
+ tag: -1
+ value expressions:
+ expr: _col1
+ type: bigint
+ Local Work:
+ Map Reduce Local Work
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(VALUE._col0)
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: string
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: _col1
+ type: bigint
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+
+#### A masked pattern was here ####
+select count(*) FROM
+(select bigTbl.key as key, bigTbl.value as value1,
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key = smallTbl1.key)
+) firstjoin
+JOIN
+smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+group by smallTbl2.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bigtbl
+PREHOOK: Input: default@smalltbl1
+PREHOOK: Input: default@smalltbl2
+#### A masked pattern was here ####
+select count(*) FROM
+(select bigTbl.key as key, bigTbl.value as value1,
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key = smallTbl1.key)
+) firstjoin
+JOIN
+smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+group by smallTbl2.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bigtbl
+POSTHOOK: Input: default@smalltbl1
+POSTHOOK: Input: default@smalltbl2
+#### A masked pattern was here ####
+POSTHOOK: Lineage: bigtbl.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
PREHOOK: query: create table smallTbl3(key string, value string)
PREHOOK: type: CREATETABLE
POSTHOOK: query: create table smallTbl3(key string, value string)
@@ -588,6 +793,577 @@
POSTHOOK: Lineage: smalltbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
POSTHOOK: Lineage: smalltbl3.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
POSTHOOK: Lineage: smalltbl3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: explain
+select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: bigtbl.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.key1 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.key2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl3.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bigTbl)) (TOK_TABREF (TOK_TABNAME smallTbl1)) (= (. (TOK_TABLE_OR_COL bigTbl) key1) (. (TOK_TABLE_OR_COL smallTbl1) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL bigTbl) key1) key1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL bigTbl) key2) key2) (TOK_SELEXPR (. (TOK_TABLE_OR_COL bigTbl) value) value1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL bigTbl) value) value2)))) firstjoin) (TOK_TABREF (TOK_TABNAME smallTbl2)) (= (. (TOK_TABLE_OR_COL firstjoin) value1) (. (TOK_TABLE_OR_COL smallTbl2) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL firstjoin) key1) key1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL firstjoin) key2) key2) (TOK_SELEXPR (. (TOK_TABLE_OR_COL smallTbl2) key) key3) (TOK_SELEXPR (. (TOK_TABLE_OR_COL firstjoin) value1) value1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL firstjoin) value2) value2)))) secondjoin) (TOK_TABREF (TOK_TABNAME smallTbl3)) (= (. (TOK_TABLE_OR_COL secondjoin) key2) (. (TOK_TABLE_OR_COL smallTbl3) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count)))))
+
+STAGE DEPENDENCIES:
+ Stage-16 is a root stage , consists of Stage-21, Stage-22, Stage-1
+ Stage-21 has a backup stage: Stage-1
+ Stage-14 depends on stages: Stage-21
+ Stage-13 depends on stages: Stage-1, Stage-14, Stage-15 , consists of Stage-19, Stage-20, Stage-2
+ Stage-19 has a backup stage: Stage-2
+ Stage-11 depends on stages: Stage-19
+ Stage-10 depends on stages: Stage-2, Stage-11, Stage-12 , consists of Stage-17, Stage-18, Stage-3
+ Stage-17 has a backup stage: Stage-3
+ Stage-8 depends on stages: Stage-17
+ Stage-4 depends on stages: Stage-3, Stage-8, Stage-9
+ Stage-18 has a backup stage: Stage-3
+ Stage-9 depends on stages: Stage-18
+ Stage-3
+ Stage-20 has a backup stage: Stage-2
+ Stage-12 depends on stages: Stage-20
+ Stage-2
+ Stage-22 has a backup stage: Stage-1
+ Stage-15 depends on stages: Stage-22
+ Stage-1
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-16
+ Conditional Operator
+
+ Stage: Stage-21
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ secondjoin:firstjoin:smalltbl1
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ secondjoin:firstjoin:smalltbl1
+ TableScan
+ alias: smalltbl1
+ HashTable Sink Operator
+ condition expressions:
+ 0 {key2} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key1]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+
+ Stage: Stage-14
+ Map Reduce
+ Alias -> Map Operator Tree:
+ secondjoin:firstjoin:bigtbl
+ TableScan
+ alias: bigtbl
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key2} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key1]]
+ 1 [Column[key]]
+ outputColumnNames: _col1, _col2
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ expr: _col2
+ type: string
+ outputColumnNames: _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-13
+ Conditional Operator
+
+ Stage: Stage-19
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ secondjoin:smalltbl2
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ secondjoin:smalltbl2
+ TableScan
+ alias: smalltbl2
+ HashTable Sink Operator
+ condition expressions:
+ 0 {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col2]]
+ 1 [Column[value]]
+ Position of Big Table: 0
+
+ Stage: Stage-11
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col2]]
+ 1 [Column[value]]
+ outputColumnNames: _col1
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ outputColumnNames: _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-10
+ Conditional Operator
+
+ Stage: Stage-17
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ smalltbl3
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ smalltbl3
+ TableScan
+ alias: smalltbl3
+ HashTable Sink Operator
+ condition expressions:
+ 0
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+
+ Stage: Stage-8
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+ Select Operator
+ Group By Operator
+ aggregations:
+ expr: count()
+ bucketGroup: false
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-4
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ Reduce Output Operator
+ sort order:
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: bigint
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(VALUE._col0)
+ bucketGroup: false
+ mode: mergepartial
+ outputColumnNames: _col0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: bigint
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-18
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME
+ HashTable Sink Operator
+ condition expressions:
+ 0
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[key]]
+ Position of Big Table: 1
+
+ Stage: Stage-9
+ Map Reduce
+ Alias -> Map Operator Tree:
+ smalltbl3
+ TableScan
+ alias: smalltbl3
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[key]]
+ Position of Big Table: 1
+ Select Operator
+ Group By Operator
+ aggregations:
+ expr: count()
+ bucketGroup: false
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-3
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ Reduce Output Operator
+ key expressions:
+ expr: _col1
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col1
+ type: string
+ tag: 0
+ smalltbl3
+ TableScan
+ alias: smalltbl3
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: string
+ tag: 1
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1
+ handleSkewJoin: false
+ Select Operator
+ Group By Operator
+ aggregations:
+ expr: count()
+ bucketGroup: false
+ mode: hash
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+ Stage: Stage-20
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ $INTNAME
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ $INTNAME
+ HashTable Sink Operator
+ condition expressions:
+ 0 {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col2]]
+ 1 [Column[value]]
+ Position of Big Table: 1
+
+ Stage: Stage-12
+ Map Reduce
+ Alias -> Map Operator Tree:
+ secondjoin:smalltbl2
+ TableScan
+ alias: smalltbl2
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col2]]
+ 1 [Column[value]]
+ outputColumnNames: _col1
+ Position of Big Table: 1
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ outputColumnNames: _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-2
+ Map Reduce
+ Alias -> Map Operator Tree:
+ $INTNAME
+ Reduce Output Operator
+ key expressions:
+ expr: _col2
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col2
+ type: string
+ tag: 0
+ value expressions:
+ expr: _col1
+ type: string
+ secondjoin:smalltbl2
+ TableScan
+ alias: smalltbl2
+ Reduce Output Operator
+ key expressions:
+ expr: value
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: value
+ type: string
+ tag: 1
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col1}
+ 1
+ handleSkewJoin: false
+ outputColumnNames: _col1
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ outputColumnNames: _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+ Stage: Stage-22
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ secondjoin:firstjoin:bigtbl
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ secondjoin:firstjoin:bigtbl
+ TableScan
+ alias: bigtbl
+ HashTable Sink Operator
+ condition expressions:
+ 0 {key2} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key1]]
+ 1 [Column[key]]
+ Position of Big Table: 1
+
+ Stage: Stage-15
+ Map Reduce
+ Alias -> Map Operator Tree:
+ secondjoin:firstjoin:smalltbl1
+ TableScan
+ alias: smalltbl1
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key2} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key1]]
+ 1 [Column[key]]
+ outputColumnNames: _col1, _col2
+ Position of Big Table: 1
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ expr: _col2
+ type: string
+ outputColumnNames: _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ secondjoin:firstjoin:bigtbl
+ TableScan
+ alias: bigtbl
+ Reduce Output Operator
+ key expressions:
+ expr: key1
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key1
+ type: string
+ tag: 0
+ value expressions:
+ expr: key2
+ type: string
+ expr: value
+ type: string
+ secondjoin:firstjoin:smalltbl1
+ TableScan
+ alias: smalltbl1
+ Reduce Output Operator
+ key expressions:
+ expr: key
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: key
+ type: string
+ tag: 1
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {VALUE._col1} {VALUE._col2}
+ 1
+ handleSkewJoin: false
+ outputColumnNames: _col1, _col2
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ expr: _col2
+ type: string
+ outputColumnNames: _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
PREHOOK: query: select count(*) FROM
(
SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
@@ -638,7 +1414,8 @@
POSTHOOK: Lineage: smalltbl3.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
POSTHOOK: Lineage: smalltbl3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
1660
-PREHOOK: query: -- join with 4 tables on different keys is also executed as a single MR job
+PREHOOK: query: -- join with 4 tables on different keys is also executed as a single MR job,
+-- So, overall two jobs - one for multi-way join and one for count(*)
explain
select count(*) FROM
(
@@ -654,7 +1431,8 @@
) secondjoin
JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key)
PREHOOK: type: QUERY
-POSTHOOK: query: -- join with 4 tables on different keys is also executed as a single MR job
+POSTHOOK: query: -- join with 4 tables on different keys is also executed as a single MR job,
+-- So, overall two jobs - one for multi-way join and one for count(*)
explain
select count(*) FROM
(
@@ -870,6 +1648,252 @@
(SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
bigTbl.value as value1, bigTbl.value as value2
FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bigtbl
+POSTHOOK: Input: default@smalltbl1
+POSTHOOK: Input: default@smalltbl2
+POSTHOOK: Input: default@smalltbl3
+#### A masked pattern was here ####
+POSTHOOK: Lineage: bigtbl.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.key1 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.key2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl3.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+1660
+PREHOOK: query: -- Now run the above query with M-MR optimization
+-- This should be a single MR job end-to-end.
+explain
+select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Now run the above query with M-MR optimization
+-- This should be a single MR job end-to-end.
+explain
+select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key)
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: bigtbl.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.key1 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.key2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: bigtbl.value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl3.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: smalltbl3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bigTbl)) (TOK_TABREF (TOK_TABNAME smallTbl1)) (= (. (TOK_TABLE_OR_COL bigTbl) key1) (. (TOK_TABLE_OR_COL smallTbl1) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL bigTbl) key1) key1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL bigTbl) key2) key2) (TOK_SELEXPR (. (TOK_TABLE_OR_COL bigTbl) value) value1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL bigTbl) value) value2)))) firstjoin) (TOK_TABREF (TOK_TABNAME smallTbl2)) (= (. (TOK_TABLE_OR_COL firstjoin) value1) (. (TOK_TABLE_OR_COL smallTbl2) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL firstjoin) key1) key1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL firstjoin) key2) key2) (TOK_SELEXPR (. (TOK_TABLE_OR_COL smallTbl2) key) key3) (TOK_SELEXPR (. (TOK_TABLE_OR_COL firstjoin) value1) value1) (TOK_SELEXPR (. (TOK_TABLE_OR_COL firstjoin) value2) value2)))) secondjoin) (TOK_TABREF (TOK_TABNAME smallTbl3)) (= (. (TOK_TABLE_OR_COL secondjoin) key2) (. (TOK_TABLE_OR_COL smallTbl3) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTIONSTAR count)))))
+
+STAGE DEPENDENCIES:
+ Stage-11 is a root stage
+ Stage-10 depends on stages: Stage-11
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-11
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ secondjoin:firstjoin:smalltbl1
+ Fetch Operator
+ limit: -1
+ secondjoin:smalltbl2
+ Fetch Operator
+ limit: -1
+ smalltbl3
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ secondjoin:firstjoin:smalltbl1
+ TableScan
+ alias: smalltbl1
+ HashTable Sink Operator
+ condition expressions:
+ 0 {key2} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key1]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+ secondjoin:smalltbl2
+ TableScan
+ alias: smalltbl2
+ HashTable Sink Operator
+ condition expressions:
+ 0 {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col2]]
+ 1 [Column[value]]
+ Position of Big Table: 0
+ smalltbl3
+ TableScan
+ alias: smalltbl3
+ HashTable Sink Operator
+ condition expressions:
+ 0
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+
+ Stage: Stage-10
+ Map Reduce
+ Alias -> Map Operator Tree:
+ secondjoin:firstjoin:bigtbl
+ TableScan
+ alias: bigtbl
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {key2} {value}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[key1]]
+ 1 [Column[key]]
+ outputColumnNames: _col1, _col2
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ expr: _col2
+ type: string
+ outputColumnNames: _col1, _col2
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col1}
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col2]]
+ 1 [Column[value]]
+ outputColumnNames: _col1
+ Position of Big Table: 0
+ Select Operator
+ expressions:
+ expr: _col1
+ type: string
+ outputColumnNames: _col1
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1
+ handleSkewJoin: false
+ keys:
+ 0 [Column[_col1]]
+ 1 [Column[key]]
+ Position of Big Table: 0
+ Select Operator
+ Group By Operator
+ aggregations:
+ expr: count()
+ bucketGroup: false
+ mode: hash
+ outputColumnNames: _col0
+ Reduce Output Operator
+ sort order:
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: bigint
+ Local Work:
+ Map Reduce Local Work
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(VALUE._col0)
+ bucketGroup: false
+ mode: mergepartial
+ outputColumnNames: _col0
+ Select Operator
+ expressions:
+ expr: _col0
+ type: bigint
+ outputColumnNames: _col0
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+
+
+PREHOOK: query: select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bigtbl
+PREHOOK: Input: default@smalltbl1
+PREHOOK: Input: default@smalltbl2
+PREHOOK: Input: default@smalltbl3
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
on (bigTbl.key1 = smallTbl1.key)
) firstjoin
JOIN
Index: ql/src/test/queries/clientpositive/multiMapJoin1.q
===================================================================
--- ql/src/test/queries/clientpositive/multiMapJoin1.q (revision 1471824)
+++ ql/src/test/queries/clientpositive/multiMapJoin1.q (working copy)
@@ -52,6 +52,8 @@
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000;
+-- Now run a query with two-way join, which should be converted into a
+-- map-join followed by groupby - two MR jobs overall
explain
select count(*) FROM
(select bigTbl.key as key, bigTbl.value as value1,
@@ -69,6 +71,32 @@
JOIN
smallTbl2 on (firstjoin.value1 = smallTbl2.value);
+set hive.optimize.mapjoin.mapreduce=true;
+
+-- Now run a query with two-way join, which should first be converted into a
+-- map-join followed by groupby and then finally into a single MR job.
+
+explain insert overwrite directory '${system:test.tmp.dir}/multiJoin1.output'
+select count(*) FROM
+(select bigTbl.key as key, bigTbl.value as value1,
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key = smallTbl1.key)
+) firstjoin
+JOIN
+smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+group by smallTbl2.key;
+
+insert overwrite directory '${system:test.tmp.dir}/multiJoin1.output'
+select count(*) FROM
+(select bigTbl.key as key, bigTbl.value as value1,
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key = smallTbl1.key)
+) firstjoin
+JOIN
+smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+group by smallTbl2.key;
+set hive.optimize.mapjoin.mapreduce=false;
+
create table smallTbl3(key string, value string);
insert overwrite table smallTbl3 select * from src where key < 10;
@@ -101,6 +129,7 @@
set hive.auto.convert.join.noconditionaltask=false;
+explain
select count(*) FROM
(
SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
@@ -115,10 +144,25 @@
) secondjoin
JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000;
--- join with 4 tables on different keys is also executed as a single MR job
+-- join with 4 tables on different keys is also executed as a single MR job,
+-- So, overall two jobs - one for multi-way join and one for count(*)
explain
select count(*) FROM
(
@@ -147,3 +191,37 @@
smallTbl2 on (firstjoin.value1 = smallTbl2.value)
) secondjoin
JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+
+set hive.optimize.mapjoin.mapreduce=true;
+-- Now run the above query with M-MR optimization
+-- This should be a single MR job end-to-end.
+explain
+select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+
+select count(*) FROM
+ (
+ SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+ firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+ (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2,
+ bigTbl.value as value1, bigTbl.value as value2
+ FROM bigTbl JOIN smallTbl1
+ on (bigTbl.key1 = smallTbl1.key)
+ ) firstjoin
+ JOIN
+ smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+
+set hive.optimize.mapjoin.mapreduce=false;
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (revision 1471824)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (working copy)
@@ -113,9 +113,11 @@
}
/**
- * Generate the MapRed Local Work
+ * Generate the MapRed Local Work for the given map-join operator
+ *
* @param newWork
* @param mapJoinOp
+ * map-join operator for which local work needs to be generated.
* @param bigTablePos
* @return
* @throws SemanticException
@@ -225,6 +227,16 @@
return bigTableAlias;
}
+ /**
+ * Convert the join to a map-join and also generate any local work needed.
+ *
+ * @param newWork MapredWork in which the conversion is to happen
+ * @param op
+ * The join operator that needs to be converted to map-join
+ * @param bigTablePos
+ * @return the alias to the big table
+ * @throws SemanticException
+ */
public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
throws SemanticException {
LinkedHashMap, OpParseContext> opParseCtxMap =
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 1471824)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy)
@@ -32,6 +32,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -761,6 +763,41 @@
}
/**
+ * Set the key and value description for all the tasks rooted at the given
+ * task. Loops over all the tasks recursively.
+ *
+ * @param task
+ */
+ public static void setKeyAndValueDescForTaskTree(Task extends Serializable> task) {
+
+ if (task instanceof ConditionalTask) {
+ List> listTasks = ((ConditionalTask) task)
+ .getListTasks();
+ for (Task extends Serializable> tsk : listTasks) {
+ setKeyAndValueDescForTaskTree(tsk);
+ }
+ } else if (task instanceof ExecDriver) {
+ MapredWork work = (MapredWork) task.getWork();
+ work.deriveExplainAttributes();
+ HashMap> opMap = work
+ .getAliasToWork();
+ if (opMap != null && !opMap.isEmpty()) {
+ for (Operator extends OperatorDesc> op : opMap.values()) {
+ setKeyAndValueDesc(work, op);
+ }
+ }
+ }
+
+ if (task.getChildTasks() == null) {
+ return;
+ }
+
+ for (Task extends Serializable> childTask : task.getChildTasks()) {
+ setKeyAndValueDescForTaskTree(childTask);
+ }
+ }
+
+ /**
* create a new plan and return.
*
* @return the new plan
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (revision 1471824)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (working copy)
@@ -40,6 +40,7 @@
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
@@ -239,8 +240,37 @@
oldChildTask.getParentTasks().add(task);
}
}
+
+ boolean convertToSingleJob = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR);
+ if (convertToSingleJob) {
+ copyReducerConf(task, childTask);
+ }
}
+ /**
+ * Copy reducer configuration if the childTask also has a reducer.
+ *
+ * @param task
+ * @param childTask
+ */
+ private void copyReducerConf(MapRedTask task, MapRedTask childTask) {
+ MapredWork childWork = childTask.getWork();
+ Operator childReducer = childWork.getReducer();
+ MapredWork work = task.getWork();
+ if (childReducer == null) {
+ return;
+ }
+ work.setReducer(childReducer);
+ work.setNumReduceTasks(childWork.getNumReduceTasks());
+ work.setJoinTree(childWork.getJoinTree());
+ work.setNeedsTagging(childWork.getNeedsTagging());
+
+ // Make sure the key configuration is correct, clear and regenerate.
+ work.getTagToValueDesc().clear();
+ GenMapRedUtils.setKeyAndValueDescForTaskTree(task);
+ }
+
// create map join task and set big table as bigTablePosition
private ObjectPair convertTaskToMapJoinTask(MapredWork newWork,
int bigTablePosition) throws UnsupportedEncodingException, SemanticException {
@@ -255,6 +285,125 @@
return new ObjectPair(newTask, bigTableAlias);
}
+ /*
+ * A task and its child task has been converted from join to mapjoin.
+ * See if the two tasks can be merged.
+ */
+ private void mergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Configuration conf) {
+ if (mapJoinTask.getChildTasks() == null
+ || mapJoinTask.getChildTasks().size() > 1) {
+ // No child-task to merge, nothing to do or there are more than one
+ // child-tasks in which case we don't want to do anything.
+ return;
+ }
+ Task extends Serializable> firstChildTask = mapJoinTask.getChildTasks().get(0);
+ if (!(firstChildTask instanceof MapRedTask)) {
+ // Nothing to do if it is not a mapreduce task.
+ return;
+ }
+ MapRedTask childTask = (MapRedTask) firstChildTask;
+ MapredWork mapJoinWork = mapJoinTask.getWork();
+ MapredWork childWork = childTask.getWork();
+ Operator childReducer = childWork.getReducer();
+ if (childReducer == null) {
+ // Not a MR job, nothing to merge.
+ return;
+ }
+
+ // Can this be merged
+ Map> aliasToWork = mapJoinWork.getAliasToWork();
+ if (aliasToWork.size() > 1) {
+ return;
+ }
+ Map> childPathToAliases = childWork.getPathToAliases();
+ if (childPathToAliases.size() > 1) {
+ return;
+ }
+
+ // Locate leaf operator of the map-join task. Start by initializing leaf
+ // operator to be root operator.
+ Operator extends OperatorDesc> mapJoinLeafOperator = aliasToWork.values().iterator().next();
+ while (mapJoinLeafOperator.getChildOperators() != null) {
+ // Dont perform this optimization for multi-table inserts
+ if (mapJoinLeafOperator.getChildOperators().size() > 1) {
+ return;
+ }
+ mapJoinLeafOperator = mapJoinLeafOperator.getChildOperators().get(0);
+ }
+
+ assert (mapJoinLeafOperator instanceof FileSinkOperator);
+ if (!(mapJoinLeafOperator instanceof FileSinkOperator)) {
+ // Sanity check, shouldn't happen.
+ return;
+ }
+
+ FileSinkOperator mapJoinTaskFileSinkOperator = (FileSinkOperator) mapJoinLeafOperator;
+
+ // The filesink writes to a different directory
+ String workDir = mapJoinTaskFileSinkOperator.getConf().getDirName();
+ if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
+ return;
+ }
+
+ MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork();
+ MapredLocalWork childLocalWork = childWork.getMapLocalWork();
+
+ // Either of them should not be bucketed
+ if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() != null) ||
+ (childLocalWork != null && childLocalWork.getBucketMapjoinContext() != null)) {
+ return;
+ }
+
+ if (childWork.getAliasToWork().size() > 1) {
+ return;
+ }
+
+ Operator extends Serializable> childAliasOp =
+ childWork.getAliasToWork().values().iterator().next();
+ if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) {
+ return;
+ }
+
+ // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
+ // top of the second
+ Operator extends Serializable> parentFOp = mapJoinTaskFileSinkOperator
+ .getParentOperators().get(0);
+ parentFOp.getChildOperators().remove(mapJoinTaskFileSinkOperator);
+ parentFOp.getChildOperators().add(childAliasOp);
+ List> parentOps =
+ new ArrayList>();
+ parentOps.add(parentFOp);
+ childAliasOp.setParentOperators(parentOps);
+
+ mapJoinWork.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
+ for (Map.Entry childWorkEntry : childWork.getPathToPartitionInfo()
+ .entrySet()) {
+ if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
+ mapJoinWork.getPathToPartitionInfo()
+ .put(childWorkEntry.getKey(), childWorkEntry.getValue());
+ }
+ }
+
+ // Fill up stuff in local work
+ if (mapJoinLocalWork != null && childLocalWork != null) {
+ mapJoinLocalWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
+ mapJoinLocalWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
+ }
+
+ // remove the child task
+ List> oldChildTasks = childTask.getChildTasks();
+ mapJoinTask.setChildTasks(oldChildTasks);
+ if (oldChildTasks != null) {
+ for (Task extends Serializable> oldChildTask : oldChildTasks) {
+ oldChildTask.getParentTasks().remove(childTask);
+ oldChildTask.getParentTasks().add(mapJoinTask);
+ }
+ }
+
+ // Copy the reducer conf.
+ copyReducerConf(mapJoinTask, childTask);
+ }
+
@Override
public Task extends Serializable> processCurrentTask(MapRedTask currTask,
ConditionalTask conditionalTask, Context context)
@@ -365,11 +514,21 @@
// Can this task be merged with the child task. This can happen if a big table is being
// joined with multiple small tables on different keys
- // Further optimizations are possible here, a join which has been converted to a mapjoin
- // followed by a mapjoin can be performed in a single MR job.
- if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)
- && (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)) {
- mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
+ if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)) {
+ if (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP) {
+ // Merging two map-join tasks
+ mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
+ }
+
+ // Converted the join operator into a map-join. Now see if it can
+ // be merged into the following map-reduce job.
+ boolean convertToSingleJob = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR);
+ if (convertToSingleJob) {
+ // Try merging a map-join task with a mapreduce job to have a
+ // single job.
+ mergeMapJoinTaskWithMapReduceTask(newTask, conf);
+ }
}
return newTask;
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1471824)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy)
@@ -243,6 +243,12 @@
return keyDesc;
}
+ /**
+ * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
+ * to keySerializeInfo of the ReduceSink
+ *
+ * @param keyDesc
+ */
public void setKeyDesc(final TableDesc keyDesc) {
this.keyDesc = keyDesc;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1471824)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -8373,7 +8373,7 @@
// For each task, set the key descriptor for the reducer
for (Task extends Serializable> rootTask : rootTasks) {
- setKeyDescTaskTree(rootTask);
+ GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
}
// If a task contains an operator which instructs bucketizedhiveinputformat
@@ -8599,36 +8599,6 @@
}
}
- // loop over all the tasks recursviely
- private void setKeyDescTaskTree(Task extends Serializable> task) {
-
- if (task instanceof ExecDriver) {
- MapredWork work = (MapredWork) task.getWork();
- work.deriveExplainAttributes();
- HashMap> opMap = work
- .getAliasToWork();
- if (!opMap.isEmpty()) {
- for (Operator extends OperatorDesc> op : opMap.values()) {
- GenMapRedUtils.setKeyAndValueDesc(work, op);
- }
- }
- } else if (task instanceof ConditionalTask) {
- List> listTasks = ((ConditionalTask) task)
- .getListTasks();
- for (Task extends Serializable> tsk : listTasks) {
- setKeyDescTaskTree(tsk);
- }
- }
-
- if (task.getChildTasks() == null) {
- return;
- }
-
- for (Task extends Serializable> childTask : task.getChildTasks()) {
- setKeyDescTaskTree(childTask);
- }
- }
-
@SuppressWarnings("nls")
public Phase1Ctx initPhase1Ctx() {