Index: data/files/SortCol2Col1.txt =================================================================== --- data/files/SortCol2Col1.txt (revision 0) +++ data/files/SortCol2Col1.txt (working copy) @@ -0,0 +1,5 @@ +110 +210 +111 +211 + Index: data/files/SortCol1Col2.txt =================================================================== --- data/files/SortCol1Col2.txt (revision 0) +++ data/files/SortCol1Col2.txt (working copy) @@ -0,0 +1,5 @@ +110 +111 +210 +211 + Index: ql/src/test/results/clientpositive/sort_merge_join_desc_3.q.out =================================================================== --- ql/src/test/results/clientpositive/sort_merge_join_desc_3.q.out (revision 0) +++ ql/src/test/results/clientpositive/sort_merge_join_desc_3.q.out (working copy) @@ -0,0 +1,165 @@ +PREHOOK: query: drop table table_desc1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table_desc2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc1 +PREHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc2 +PREHOOK: query: insert overwrite table table_desc1 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc1 +POSTHOOK: query: insert overwrite table table_desc1 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc1 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc2 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc2 +POSTHOOK: query: insert overwrite table table_desc2 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc2 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- The columns of the tables above are sorted in same orders. +-- descending followed by ascending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- The columns of the tables above are sorted in same orders. +-- descending followed by ascending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_desc1) a) (TOK_TABREF (TOK_TABNAME table_desc2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 10)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + outputColumnNames: _col0 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@table_desc1 +PREHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_desc1 +POSTHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +22 Index: ql/src/test/results/clientpositive/bucket_map_join_1.q.out =================================================================== --- ql/src/test/results/clientpositive/bucket_map_join_1.q.out (revision 0) +++ ql/src/test/results/clientpositive/bucket_map_join_1.q.out (working copy) @@ -0,0 +1,244 @@ +PREHOOK: query: drop table table1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table1(key string, value string) clustered by (key, value) +sorted by (key, value) into 1 BUCKETS stored as textfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table1(key string, value string) clustered by (key, value) +sorted by (key, value) into 1 BUCKETS stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table1 +PREHOOK: query: create table table2(key string, value string) clustered by (value, key) +sorted by (value, key) into 1 BUCKETS stored as textfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table2(key string, value string) clustered by (value, key) +sorted by (value, key) into 1 BUCKETS stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table2 +PREHOOK: query: load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1 +PREHOOK: type: LOAD +PREHOOK: Output: default@table1 +POSTHOOK: query: load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@table1 +PREHOOK: query: load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2 +PREHOOK: type: LOAD +PREHOOK: Output: default@table2 +POSTHOOK: query: load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@table2 +PREHOOK: query: -- The tables are bucketed and sorted in different column orders +-- Neither buckted map-join should be performed + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +PREHOOK: type: QUERY +POSTHOOK: query: -- The tables are bucketed and sorted in different column orders +-- Neither buckted map-join should be performed + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table1) a) (TOK_TABREF (TOK_TABNAME table2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-1 depends on stages: Stage-4 + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-4 + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + GatherStats: false + HashTable Sink Operator + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + Position of Big Table: 0 + Bucket Mapjoin Context: + Alias Bucket Base File Name Mapping: + b {SortCol1Col2.txt=[SortCol2Col1.txt]} + Alias Bucket File Name Mapping: +#### A masked pattern was here #### + Alias Bucket Output File Name Mapping: +#### A masked pattern was here #### + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + GatherStats: false + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Local Work: + Map Reduce Local Work + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: table1 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 1 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.table1 + serialization.ddl struct table1 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 1 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.table1 + serialization.ddl struct table1 { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.table1 + name: default.table1 + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0 + columns.types bigint + escape.delim \ + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +PREHOOK: type: QUERY +PREHOOK: Input: default@table1 +PREHOOK: Input: default@table2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table1 +POSTHOOK: Input: default@table2 +#### A masked pattern was here #### +4 Index: ql/src/test/results/clientpositive/sort_merge_join_desc_2.q.out =================================================================== --- ql/src/test/results/clientpositive/sort_merge_join_desc_2.q.out (revision 0) +++ ql/src/test/results/clientpositive/sort_merge_join_desc_2.q.out (working copy) @@ -0,0 +1,165 @@ +PREHOOK: query: drop table table_desc1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table_desc2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc1 +PREHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc2 +PREHOOK: query: insert overwrite table table_desc1 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc1 +POSTHOOK: query: insert overwrite table table_desc1 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc1 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc2 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc2 +POSTHOOK: query: insert overwrite table table_desc2 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc2 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- The columns of the tables above are sorted in same order. +-- descending followed by descending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- The columns of the tables above are sorted in same order. +-- descending followed by descending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_desc1) a) (TOK_TABREF (TOK_TABNAME table_desc2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 10)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + outputColumnNames: _col0 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@table_desc1 +PREHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_desc1 +POSTHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +22 Index: ql/src/test/results/clientpositive/sort_merge_join_desc_4.q.out =================================================================== --- ql/src/test/results/clientpositive/sort_merge_join_desc_4.q.out (revision 0) +++ ql/src/test/results/clientpositive/sort_merge_join_desc_4.q.out (working copy) @@ -0,0 +1,190 @@ +PREHOOK: query: drop table table_desc1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table_desc2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc1 +PREHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc2 +PREHOOK: query: insert overwrite table table_desc1 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc1 +POSTHOOK: query: insert overwrite table table_desc1 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc1 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc2 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc2 +POSTHOOK: query: insert overwrite table table_desc2 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc2 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- The columns of the tables above are sorted in different orders. +-- So, sort merge join should not be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- The columns of the tables above are sorted in different orders. +-- So, sort merge join should not be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_desc1) a) (TOK_TABREF (TOK_TABNAME table_desc2) b) (and (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 10)))) + +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-1 depends on stages: Stage-4 + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-4 + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + HashTable Sink Operator + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + Position of Big Table: 0 + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key], Column[value]] + 1 [Column[key], Column[value]] + outputColumnNames: _col0 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@table_desc1 +PREHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_desc1 +POSTHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +22 Index: ql/src/test/results/clientpositive/sort_merge_join_desc_1.q.out =================================================================== --- ql/src/test/results/clientpositive/sort_merge_join_desc_1.q.out (revision 0) +++ ql/src/test/results/clientpositive/sort_merge_join_desc_1.q.out (working copy) @@ -0,0 +1,155 @@ +PREHOOK: query: drop table table_desc1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table table_desc2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table table_desc2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table table_desc1(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc1(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc1 +PREHOOK: query: create table table_desc2(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table table_desc2(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@table_desc2 +PREHOOK: query: insert overwrite table table_desc1 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc1 +POSTHOOK: query: insert overwrite table table_desc1 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc1 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table table_desc2 select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@table_desc2 +POSTHOOK: query: insert overwrite table table_desc2 select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@table_desc2 +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- The columns of the tables above are sorted in same descending order. +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- The columns of the tables above are sorted in same descending order. +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME table_desc1) a) (TOK_TABREF (TOK_TABNAME table_desc2) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_WHERE (< (. (TOK_TABLE_OR_COL a) key) 10)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Filter Operator + predicate: + expr: (key < 10.0) + type: boolean + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0 + Position of Big Table: 0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@table_desc1 +PREHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: query: select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@table_desc1 +POSTHOOK: Input: default@table_desc2 +#### A masked pattern was here #### +POSTHOOK: Lineage: table_desc1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: table_desc2.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +22 Index: ql/src/test/queries/clientpositive/sort_merge_join_desc_2.q =================================================================== --- ql/src/test/queries/clientpositive/sort_merge_join_desc_2.q (revision 0) +++ ql/src/test/queries/clientpositive/sort_merge_join_desc_2.q (working copy) @@ -0,0 +1,28 @@ +drop table table_desc1; +drop table table_desc2; + +set hive.enforce.sorting = true; + +create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS; +create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS; + +insert overwrite table table_desc1 select key, value from src; +insert overwrite table table_desc2 select key, value from src; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The columns of the tables above are sorted in same order. +-- descending followed by descending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + Index: ql/src/test/queries/clientpositive/sort_merge_join_desc_4.q =================================================================== --- ql/src/test/queries/clientpositive/sort_merge_join_desc_4.q (revision 0) +++ ql/src/test/queries/clientpositive/sort_merge_join_desc_4.q (working copy) @@ -0,0 +1,27 @@ +drop table table_desc1; +drop table table_desc2; + +set hive.enforce.sorting = true; + +create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS; +create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value DESC) into 1 BUCKETS; + +insert overwrite table table_desc1 select key, value from src; +insert overwrite table table_desc2 select key, value from src; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The columns of the tables above are sorted in different orders. +-- So, sort merge join should not be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + Index: ql/src/test/queries/clientpositive/sort_merge_join_desc_1.q =================================================================== --- ql/src/test/queries/clientpositive/sort_merge_join_desc_1.q (revision 0) +++ ql/src/test/queries/clientpositive/sort_merge_join_desc_1.q (working copy) @@ -0,0 +1,23 @@ +drop table table_desc1; +drop table table_desc2; + +set hive.enforce.sorting = true; + +create table table_desc1(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS; +create table table_desc2(key string, value string) clustered by (key) sorted by (key DESC) into 1 BUCKETS; + +insert overwrite table table_desc1 select key, value from src; +insert overwrite table table_desc2 select key, value from src; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The columns of the tables above are sorted in same descending order. +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10; + +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b on a.key=b.key where a.key < 10; + Index: ql/src/test/queries/clientpositive/sort_merge_join_desc_3.q =================================================================== --- ql/src/test/queries/clientpositive/sort_merge_join_desc_3.q (revision 0) +++ ql/src/test/queries/clientpositive/sort_merge_join_desc_3.q (working copy) @@ -0,0 +1,28 @@ +drop table table_desc1; +drop table table_desc2; + +set hive.enforce.sorting = true; + +create table table_desc1(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS; +create table table_desc2(key string, value string) clustered by (key, value) +sorted by (key DESC, value ASC) into 1 BUCKETS; + +insert overwrite table table_desc1 select key, value from src; +insert overwrite table table_desc2 select key, value from src; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The columns of the tables above are sorted in same orders. +-- descending followed by ascending +-- So, sort merge join should be performed + +explain +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + +select /*+ mapjoin(b) */ count(*) from table_desc1 a join table_desc2 b +on a.key=b.key and a.value=b.value where a.key < 10; + Index: ql/src/test/queries/clientpositive/bucket_map_join_1.q =================================================================== --- ql/src/test/queries/clientpositive/bucket_map_join_1.q (revision 0) +++ ql/src/test/queries/clientpositive/bucket_map_join_1.q (working copy) @@ -0,0 +1,26 @@ +drop table table1; +drop table table2; + +set hive.enforce.bucketing = true; +set hive.enforce.sorting = true; + +create table table1(key string, value string) clustered by (key, value) +sorted by (key, value) into 1 BUCKETS stored as textfile; +create table table2(key string, value string) clustered by (value, key) +sorted by (value, key) into 1 BUCKETS stored as textfile; + +load data local inpath '../data/files/SortCol1Col2.txt' overwrite into table table1; +load data local inpath '../data/files/SortCol2Col1.txt' overwrite into table table2; + +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +-- The tables are bucketed and sorted in different column orders +-- Neither buckted map-join should be performed + +explain extended +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value; + +select /*+ mapjoin(b) */ count(*) from table1 a join table2 b on a.key=b.key and a.value=b.value; + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (revision 1367873) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (working copy) @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.BitSet; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -104,13 +105,18 @@ } class SortedMergeBucketMapjoinProc implements NodeProcessor { - ParseContext pGraphContext; + ParseContext pGraphContext; + BitSet orderSortedColumns; + List sortColumnNames; + boolean firstObject; public SortedMergeBucketMapjoinProc(ParseContext pctx) { this.pGraphContext = pctx; + firstObject = true; } public SortedMergeBucketMapjoinProc() { + firstObject = true; } @Override @@ -133,6 +139,11 @@ } String[] srcs = joinCxt.getBaseSrc(); int pos = 0; + + // All the tables/partitions columns should be sorted in the same order + // For example, if tables A and B are being joined on columns c1, c2 and c3 + // which are the sorted and bucketed columns. The join would work, as long + // c1, c2 and c3 are sorted in the same order. for (String src : srcs) { tableSorted = tableSorted && isTableSorted(this.pGraphContext, mapJoinOp, joinCxt, src, pos); @@ -253,16 +264,39 @@ if (sortCols == null || sortCols.size() != joinCols.size()) { return false; } - // require all sort columns are asc, right now only support asc - List sortColNames = new ArrayList(); - for (Order o : sortCols) { - if (o.getOrder() != BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) { - return false; + + // All objects should be in the same order + // The join columns should contain all the sort columns + // The sort columns of all the tables should be in the same order + if (firstObject) { + orderSortedColumns = new BitSet(sortCols.size()); + sortColumnNames = new ArrayList(sortCols.size()); + orderSortedColumns.clear(); + } + + for (int pos = 0; pos < sortCols.size(); pos++) { + Order o = sortCols.get(pos); + if (firstObject) { + orderSortedColumns.set(pos, o.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC); + sortColumnNames.add(o.getCol()); } - sortColNames.add(o.getCol()); + else { + boolean asc = + o.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC; + if ((!sortColumnNames.get(pos).equals(o.getCol())) || + (orderSortedColumns.get(pos) != asc)) { + return false; + } + } } - return sortColNames.containsAll(joinCols); + if (firstObject) { + firstObject = false; + return sortColumnNames.containsAll(joinCols); + } + + // The column order and types matched + return true; } }