Index: data/files/SortDescCol1Col2.txt =================================================================== --- data/files/SortDescCol1Col2.txt (revision 0) +++ data/files/SortDescCol1Col2.txt (working copy) @@ -0,0 +1,4 @@ +211 +210 +111 +110 Index: data/files/SortCol2Col1.txt =================================================================== --- data/files/SortCol2Col1.txt (revision 0) +++ data/files/SortCol2Col1.txt (working copy) @@ -0,0 +1,5 @@ +110 +210 +111 +211 + Index: data/files/SortDescCol2Col1.txt =================================================================== --- data/files/SortDescCol2Col1.txt (revision 0) +++ data/files/SortDescCol2Col1.txt (working copy) @@ -0,0 +1,4 @@ +211 +111 +210 +110 Index: data/files/SortCol1Col2.txt =================================================================== --- data/files/SortCol1Col2.txt (revision 0) +++ data/files/SortCol1Col2.txt (working copy) @@ -0,0 +1,4 @@ +110 +111 +210 +211 Index: ql/src/test/results/clientpositive/sort_merge_join_desc_3.q.out =================================================================== --- ql/src/test/results/clientpositive/sort_merge_join_desc_3.q.out (revision 0) +++ ql/src/test/results/clientpositive/sort_merge_join_desc_3.q.out (working copy) @@ -0,0 +1,165 @@ +PREHOOK: query: drop table table_desc1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table_desc2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc1 +PREHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc2 +PREHOOK: query: insert overwrite table table_desc1 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc1 +POSTHOOK: query: insert overwrite table table_desc1 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc1 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc2 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc2 +POSTHOOK: query: insert overwrite table table_desc2 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc2 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- The columns of the tables above are sorted in same orders. +-- descending followed by ascending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- The columns of the tables above are sorted in same orders. +-- descending followed by ascending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_desc1) a) (TOK_TABREF (TOK_TABNAME table_desc2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 10)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + outputColumnNames: _col0 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@table_desc1 +PREHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_desc1 +POSTHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +22 Index: ql/src/test/results/clientpositive/bucket_map_join_1.q.out =================================================================== --- ql/src/test/results/clientpositive/bucket_map_join_1.q.out (revision 0) +++ ql/src/test/results/clientpositive/bucket_map_join_1.q.out (working copy) @@ -0,0 +1,246 @@ +PREHOOK: query: drop table table1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table1(key string, value string) clustered by (key, value) +sorted by (key, value) into 1 BUCKETS stored as textfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table1(key string, value string) clustered by (key, value) +sorted by (key, value) into 1 BUCKETS stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table1 +PREHOOK: query: create table table2(key string, value string) clustered by (value, key) +sorted by (value, key) into 1 BUCKETS stored as textfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table2(key string, value string) clustered by (value, key) +sorted by (value, key) into 1 BUCKETS stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table2 +PREHOOK: query: load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1 +PREHOOK: type: LOAD +PREHOOK: Output: default@table1 +POSTHOOK: query: load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@table1 +PREHOOK: query: load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2 +PREHOOK: type: LOAD +PREHOOK: Output: default@table2 +POSTHOOK: query: load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@table2 +PREHOOK: query: -- The tables are bucketed in same columns in different order, +-- but sorted in different column orders +-- Bucketed map-join should be performed, not sort-merge join + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +PREHOOK: type: QUERY +POSTHOOK: query: -- The tables are bucketed in same columns in different order, +-- but sorted in different column orders +-- Bucketed map-join should be performed, not sort-merge join + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table1) a) (TOK_TABREF (TOK_TABNAME table2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-1 depends on stages: Stage-4 + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-4 + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + GatherStats: false + HashTable Sink Operator + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + Position of Big Table: 0 + Bucket Mapjoin Context: + Alias Bucket Base File Name Mapping: + b {SortCol1Col2.txt=[SortCol2Col1.txt]} + Alias Bucket File Name Mapping: +#### A masked pattern was here #### + Alias Bucket Output File Name Mapping: +#### A masked pattern was here #### + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + GatherStats: false + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Local Work: + Map Reduce Local Work + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: table1 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 1 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.table1 + serialization.ddl struct table1 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 1 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.table1 + serialization.ddl struct table1 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.table1 + name: default.table1 + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0 + columns.types bigint + escape.delim \ + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +PREHOOK: type: QUERY +PREHOOK: Input: default@table1 +PREHOOK: Input: default@table2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table1 +POSTHOOK: Input: default@table2 +#### A masked pattern was here #### +4 Index: ql/src/test/results/clientpositive/sort_merge_join_desc_2.q.out =================================================================== --- ql/src/test/results/clientpositive/sort_merge_join_desc_2.q.out (revision 0) +++ ql/src/test/results/clientpositive/sort_merge_join_desc_2.q.out (working copy) @@ -0,0 +1,165 @@ +PREHOOK: query: drop table table_desc1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table_desc2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc1 +PREHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc2 +PREHOOK: query: insert overwrite table table_desc1 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc1 +POSTHOOK: query: insert overwrite table table_desc1 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc1 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc2 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc2 +POSTHOOK: query: insert overwrite table table_desc2 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc2 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- The columns of the tables above are sorted in same order. +-- descending followed by descending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- The columns of the tables above are sorted in same order. +-- descending followed by descending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_desc1) a) (TOK_TABREF (TOK_TABNAME table_desc2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 10)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + outputColumnNames: _col0 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@table_desc1 +PREHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_desc1 +POSTHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +22 Index: ql/src/test/results/clientpositive/sort_merge_join_desc_4.q.out =================================================================== --- ql/src/test/results/clientpositive/sort_merge_join_desc_4.q.out (revision 0) +++ ql/src/test/results/clientpositive/sort_merge_join_desc_4.q.out (working copy) @@ -0,0 +1,190 @@ +PREHOOK: query: drop table table_desc1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table_desc2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc1 +PREHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc2 +PREHOOK: query: insert overwrite table table_desc1 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc1 +POSTHOOK: query: insert overwrite table table_desc1 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc1 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc2 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc2 +POSTHOOK: query: insert overwrite table table_desc2 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc2 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- The columns of the tables above are sorted in different orders. +-- So, sort merge join should not be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- The columns of the tables above are sorted in different orders. +-- So, sort merge join should not be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_desc1) a) (TOK_TABREF (TOK_TABNAME table_desc2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 10)))) + +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-1 depends on stages: Stage-4 + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-4 + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + HashTable Sink Operator + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + Position of Big Table: 0 + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + outputColumnNames: _col0 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@table_desc1 +PREHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_desc1 +POSTHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +22 Index: ql/src/test/results/clientpositive/sort_merge_join_desc_1.q.out =================================================================== --- ql/src/test/results/clientpositive/sort_merge_join_desc_1.q.out (revision 0) +++ ql/src/test/results/clientpositive/sort_merge_join_desc_1.q.out (working copy) @@ -0,0 +1,155 @@ +PREHOOK: query: drop table table_desc1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table_desc2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table_desc1(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc1(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc1 +PREHOOK: query: create table table_desc2(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc2(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc2 +PREHOOK: query: insert overwrite table table_desc1 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc1 +POSTHOOK: query: insert overwrite table table_desc1 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc1 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc2 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc2 +POSTHOOK: query: insert overwrite table table_desc2 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc2 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- The columns of the tables above are sorted in same descending order. +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- The columns of the tables above are sorted in same descending order. +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_desc1) a) (TOK_TABREF (TOK_TABNAME table_desc2) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 10)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@table_desc1 +PREHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_desc1 +POSTHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +22 Index: ql/src/test/results/clientpositive/bucket_map_join_2.q.out =================================================================== --- ql/src/test/results/clientpositive/bucket_map_join_2.q.out (revision 0) +++ ql/src/test/results/clientpositive/bucket_map_join_2.q.out (working copy) @@ -0,0 +1,246 @@ +PREHOOK: query: drop table table1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table1(key string, value string) clustered by (key, value) +sorted by (key desc, value desc) into 1 BUCKETS stored as textfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table1(key string, value string) clustered by (key, value) +sorted by (key desc, value desc) into 1 BUCKETS stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table1 +PREHOOK: query: create table table2(key string, value string) clustered by (value, key) +sorted by (value desc, key desc) into 1 BUCKETS stored as textfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table2(key string, value string) clustered by (value, key) +sorted by (value desc, key desc) into 1 BUCKETS stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table2 +PREHOOK: query: load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1 +PREHOOK: type: LOAD +PREHOOK: Output: default@table1 +POSTHOOK: query: load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@table1 +PREHOOK: query: load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2 +PREHOOK: type: LOAD +PREHOOK: Output: default@table2 +POSTHOOK: query: load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@table2 +PREHOOK: query: -- The tables are bucketed in same columns in different order, +-- but sorted in different column orders +-- Bucketed map-join should be performed, not sort-merge join + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +PREHOOK: type: QUERY +POSTHOOK: query: -- The tables are bucketed in same columns in different order, +-- but sorted in different column orders +-- Bucketed map-join should be performed, not sort-merge join + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table1) a) (TOK_TABREF (TOK_TABNAME table2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-1 depends on stages: Stage-4 + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-4 + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + GatherStats: false + HashTable Sink Operator + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + Position of Big Table: 0 + Bucket Mapjoin Context: + Alias Bucket Base File Name Mapping: + b {SortCol1Col2.txt=[SortCol2Col1.txt]} + Alias Bucket File Name Mapping: +#### A masked pattern was here #### + Alias Bucket Output File Name Mapping: +#### A masked pattern was here #### + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + GatherStats: false + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Local Work: + Map Reduce Local Work + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: table1 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 1 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.table1 + serialization.ddl struct table1 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 1 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.table1 + serialization.ddl struct table1 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.table1 + name: default.table1 + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0 + columns.types bigint + escape.delim \ + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +PREHOOK: type: QUERY +PREHOOK: Input: default@table1 +PREHOOK: Input: default@table2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table1 +POSTHOOK: Input: default@table2 +#### A masked pattern was here #### +4 Index: ql/src/test/queries/clientpositive/sort_merge_join_desc_2.q =================================================================== --- ql/src/test/queries/clientpositive/sort_merge_join_desc_2.q (revision 0) +++ ql/src/test/queries/clientpositive/sort_merge_join_desc_2.q (working copy) @@ -0,0 +1,28 @@ +drop table table_desc1; +drop table table_desc2; + +set hive.enforce.sorting = true; + +create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS; +create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS; + +insert overwrite table table_desc1 select key, value from src; +insert overwrite table table_desc2 select key, value from src; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The columns of the tables above are sorted in same order. +-- descending followed by descending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + Index: ql/src/test/queries/clientpositive/sort_merge_join_desc_4.q =================================================================== --- ql/src/test/queries/clientpositive/sort_merge_join_desc_4.q (revision 0) +++ ql/src/test/queries/clientpositive/sort_merge_join_desc_4.q (working copy) @@ -0,0 +1,27 @@ +drop table table_desc1; +drop table table_desc2; + +set hive.enforce.sorting = true; + +create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS; +create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS; + +insert overwrite table table_desc1 select key, value from src; +insert overwrite table table_desc2 select key, value from src; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The columns of the tables above are sorted in different orders. +-- So, sort merge join should not be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + Index: ql/src/test/queries/clientpositive/bucket_map_join_2.q =================================================================== --- ql/src/test/queries/clientpositive/bucket_map_join_2.q (revision 0) +++ ql/src/test/queries/clientpositive/bucket_map_join_2.q (working copy) @@ -0,0 +1,27 @@ +drop table table1; +drop table table2; + +set hive.enforce.bucketing = true; +set hive.enforce.sorting = true; + +create table table1(key string, value string) clustered by (key, value) +sorted by (key desc, value desc) into 1 BUCKETS stored as textfile; +create table table2(key string, value string) clustered by (value, key) +sorted by (value desc, key desc) into 1 BUCKETS stored as textfile; + +load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1; +load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The tables are bucketed in same columns in different order, +-- but sorted in different column orders +-- Bucketed map-join should be performed, not sort-merge join + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value; + +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value; + Index: ql/src/test/queries/clientpositive/sort_merge_join_desc_1.q =================================================================== --- ql/src/test/queries/clientpositive/sort_merge_join_desc_1.q (revision 0) +++ ql/src/test/queries/clientpositive/sort_merge_join_desc_1.q (working copy) @@ -0,0 +1,23 @@ +drop table table_desc1; +drop table table_desc2; + +set hive.enforce.sorting = true; + +create table table_desc1(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS; +create table table_desc2(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS; + +insert overwrite table table_desc1 select key, value from src; +insert overwrite table table_desc2 select key, value from src; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The columns of the tables above are sorted in same descending order. +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10; + +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10; + Index: ql/src/test/queries/clientpositive/sort_merge_join_desc_3.q =================================================================== --- ql/src/test/queries/clientpositive/sort_merge_join_desc_3.q (revision 0) +++ ql/src/test/queries/clientpositive/sort_merge_join_desc_3.q (working copy) @@ -0,0 +1,28 @@ +drop table table_desc1; +drop table table_desc2; + +set hive.enforce.sorting = true; + +create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS; +create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS; + +insert overwrite table table_desc1 select key, value from src; +insert overwrite table table_desc2 select key, value from src; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The columns of the tables above are sorted in same orders. +-- descending followed by ascending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + Index: ql/src/test/queries/clientpositive/bucket_map_join_1.q =================================================================== --- ql/src/test/queries/clientpositive/bucket_map_join_1.q (revision 0) +++ ql/src/test/queries/clientpositive/bucket_map_join_1.q (working copy) @@ -0,0 +1,27 @@ +drop table table1; +drop table table2; + +set hive.enforce.bucketing = true; +set hive.enforce.sorting = true; + +create table table1(key string, value string) clustered by (key, value) +sorted by (key, value) into 1 BUCKETS stored as textfile; +create table table2(key string, value string) clustered by (value, key) +sorted by (value, key) into 1 BUCKETS stored as textfile; + +load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1; +load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The tables are bucketed in same columns in different order, +-- but sorted in different column orders +-- Bucketed map-join should be performed, not sort-merge join + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value; + +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value; + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (revision 1369524) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (working copy) @@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QBJoinTree; @@ -136,9 +135,21 @@ } String[] srcs = joinCxt.getBaseSrc(); int pos = 0; + + // All the tables/partitions columns should be sorted in the same order + // For example, if tables A and B are being joined on columns c1, c2 and c3 + // which are the sorted and bucketed columns. The join would work, as long + // c1, c2 and c3 are sorted in the same order. + List sortColumnsFirstTable = new ArrayList(); + for (String src : srcs) { tableSorted = tableSorted - && isTableSorted(this.pGraphContext, mapJoinOp, joinCxt, src, pos); + && isTableSorted(this.pGraphContext, + mapJoinOp, + joinCxt, + src, + pos, + sortColumnsFirstTable); pos++; } if (!tableSorted) { @@ -202,8 +213,27 @@ return smbJop; } - private boolean isTableSorted(ParseContext pctx, MapJoinOperator op, - QBJoinTree joinTree, String alias, int pos) throws SemanticException { + /** + * Whether this table is eligible for a sort-merge join. + * + * @param pctx parse context + * @param op map join operator being considered + * @param joinTree join tree being considered + * @param alias table alias in the join tree being checked + * @param pos position of the table + * @param sortColumnsFirstTable The names and order of the sorted columns for the first table. + * It is not initialized when pos = 0. + * @return + * @throws SemanticException + */ + private boolean isTableSorted(ParseContext pctx, + MapJoinOperator op, + QBJoinTree joinTree, + String alias, + int pos, + List sortColumnsFirstTable) + throws SemanticException { + Map> topOps = this.pGraphContext .getTopOps(); Map topToTable = this.pGraphContext @@ -256,30 +286,57 @@ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); throw new SemanticException(e.getMessage(), e); } + List partitions = prunedParts.getNotDeniedPartns(); + // Populate the names and order of columns for the first partition of the + // first table + if ((pos == 0) && (partitions != null) && (!partitions.isEmpty())) { + Partition firstPartition = partitions.get(0); + sortColumnsFirstTable.addAll(firstPartition.getSortCols()); + } + for (Partition partition : prunedParts.getNotDeniedPartns()) { - if (!checkSortColsAndJoinCols(partition.getSortCols(), joinCols)) { + if (!checkSortColsAndJoinCols(partition.getSortCols(), + joinCols, + sortColumnsFirstTable)) { return false; } } return true; } - return checkSortColsAndJoinCols(tbl.getSortCols(), joinCols); + + // Populate the names and order of columns for the first table + if (pos == 0) { + sortColumnsFirstTable.addAll(tbl.getSortCols()); + } + + return checkSortColsAndJoinCols(tbl.getSortCols(), + joinCols, + sortColumnsFirstTable); } private boolean checkSortColsAndJoinCols(List sortCols, - List joinCols) { + List joinCols, + List sortColumnsFirstPartition) { + if (sortCols == null || sortCols.size() != joinCols.size()) { return false; } - // require all sort columns are asc, right now only support asc + List sortColNames = new ArrayList(); - for (Order o : sortCols) { - if (o.getOrder() != BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) { + + // The join columns should contain all the sort columns + // The sort columns of all the tables should be in the same order + // compare the column names and the order with the first table/partition. + for (int pos = 0; pos < sortCols.size(); pos++) { + Order o = sortCols.get(pos); + if (!o.equals(sortColumnsFirstPartition.get(pos))) { return false; } - sortColNames.add(o.getCol()); + sortColNames.add(sortColumnsFirstPartition.get(pos).getCol()); } + // The column names and order (ascending/descending) matched + // The join columns should contain sort columns return sortColNames.containsAll(joinCols); } }