Index: ql/src/test/results/clientpositive/smb_mapjoin_13.q.out =================================================================== --- ql/src/test/results/clientpositive/smb_mapjoin_13.q.out (revision 0) +++ ql/src/test/results/clientpositive/smb_mapjoin_13.q.out (revision 0) @@ -0,0 +1,412 @@ +PREHOOK: query: -- This test verifies that the sort merge join optimizer works when the tables are joined on columns with different names + +-- Create bucketed and sorted tables +CREATE TABLE test_table1 (key INT, value STRING) CLUSTERED BY (key) SORTED BY (key ASC) INTO 16 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- This test verifies that the sort merge join optimizer works when the tables are joined on columns with different names + +-- Create bucketed and sorted tables +CREATE TABLE test_table1 (key INT, value STRING) CLUSTERED BY (key) SORTED BY (key ASC) INTO 16 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@test_table1 +PREHOOK: query: CREATE TABLE test_table2 (value INT, key STRING) CLUSTERED BY (value) SORTED BY (value ASC) INTO 16 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE test_table2 (value INT, key STRING) CLUSTERED BY (value) SORTED BY (value ASC) INTO 16 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@test_table2 +PREHOOK: query: CREATE TABLE test_table3 (key INT, value STRING) CLUSTERED BY (key, value) SORTED BY (key ASC, value ASC) INTO 16 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE test_table3 (key INT, value STRING) CLUSTERED BY (key, value) SORTED BY (key ASC, value ASC) INTO 16 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@test_table3 +PREHOOK: query: CREATE TABLE test_table4 (key INT, value STRING) CLUSTERED BY (key, value) SORTED BY (value ASC, key ASC) INTO 16 BUCKETS +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE test_table4 (key INT, value STRING) CLUSTERED BY (key, value) SORTED BY (value ASC, key ASC) INTO 16 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@test_table4 +PREHOOK: query: FROM src +INSERT OVERWRITE TABLE test_table1 SELECT * +INSERT OVERWRITE TABLE test_table2 SELECT * +INSERT OVERWRITE TABLE test_table3 SELECT * +INSERT OVERWRITE TABLE test_table4 SELECT * +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@test_table1 +PREHOOK: Output: default@test_table2 +PREHOOK: Output: default@test_table3 +PREHOOK: Output: default@test_table4 +POSTHOOK: query: FROM src +INSERT OVERWRITE TABLE test_table1 SELECT * +INSERT OVERWRITE TABLE test_table2 SELECT * +INSERT OVERWRITE TABLE test_table3 SELECT * +INSERT OVERWRITE TABLE test_table4 SELECT * +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@test_table1 +POSTHOOK: Output: default@test_table2 +POSTHOOK: Output: default@test_table3 +POSTHOOK: Output: default@test_table4 +POSTHOOK: Lineage: test_table1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.value EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- Join data from 2 tables on their respective sorted columns (one each, with different names) and +-- verify sort merge join is used +EXPLAIN EXTENDED +SELECT /*+mapjoin(b)*/ * FROM test_table1 a JOIN test_table2 b ON a.key = b.value LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- Join data from 2 tables on their respective sorted columns (one each, with different names) and +-- verify sort merge join is used +EXPLAIN EXTENDED +SELECT /*+mapjoin(b)*/ * FROM test_table1 a JOIN test_table2 b ON a.key = b.value LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: test_table1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.value EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME test_table1) a) (TOK_TABREF (TOK_TABNAME test_table2) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 10))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + GatherStats: false + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 {value} {key} + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[value]] + outputColumnNames: _col0, _col1, _col4, _col5 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col4 + type: int + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col4, _col5 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col4 + type: int + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col2, _col3 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1,_col2,_col3 + columns.types int:string:int:string + escape.delim \ + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: test_table1 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 16 + bucket_field_name key + columns key,value + columns.types int:string +#### A masked pattern was here #### + name default.test_table1 + numFiles 16 + numPartitions 0 + numRows 500 + rawDataSize 5312 + serialization.ddl struct test_table1 { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 16 + bucket_field_name key + columns key,value + columns.types int:string +#### A masked pattern was here #### + name default.test_table1 + numFiles 16 + numPartitions 0 + numRows 500 + rawDataSize 5312 + serialization.ddl struct test_table1 { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.test_table1 + name: default.test_table1 + Truncated Path -> Alias: + /test_table1 [a] + + Stage: Stage-0 + Fetch Operator + limit: 10 + + +PREHOOK: query: SELECT /*+mapjoin(b)*/ * FROM test_table1 a JOIN test_table2 b ON a.key = b.value LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_table1 +PREHOOK: Input: default@test_table2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT /*+mapjoin(b)*/ * FROM test_table1 a JOIN test_table2 b ON a.key = b.value LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_table1 +POSTHOOK: Input: default@test_table2 +#### A masked pattern was here #### +POSTHOOK: Lineage: test_table1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.value EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +0 val_0 0 val_0 +0 val_0 0 val_0 +0 val_0 0 val_0 +0 val_0 0 val_0 +0 val_0 0 val_0 +0 val_0 0 val_0 +0 val_0 0 val_0 +0 val_0 0 val_0 +0 val_0 0 val_0 +64 val_64 64 val_64 +PREHOOK: query: -- Join data from 2 tables on their respective columns (two each, with the same names but sorted +-- with different priorities) and verify sort merge join is not used +EXPLAIN EXTENDED +SELECT /*+mapjoin(b)*/ * FROM test_table3 a JOIN test_table4 b ON a.key = b.value LIMIT 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- Join data from 2 tables on their respective columns (two each, with the same names but sorted +-- with different priorities) and verify sort merge join is not used +EXPLAIN EXTENDED +SELECT /*+mapjoin(b)*/ * FROM test_table3 a JOIN test_table4 b ON a.key = b.value LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: test_table1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.value EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME test_table3) a) (TOK_TABREF (TOK_TABNAME test_table4) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST b))) (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 10))) + +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-3 + Map Reduce Local Work + Alias -> Map Local Tables: + b + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + b + TableScan + alias: b + GatherStats: false + HashTable Sink Operator + condition expressions: + 0 {key} {value} + 1 {key} {value} + handleSkewJoin: false + keys: + 0 [class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[key]()] + 1 [class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[value]()] + Position of Big Table: 0 + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + GatherStats: false + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 {key} {value} + handleSkewJoin: false + keys: + 0 [class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[key]()] + 1 [class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[value]()] + outputColumnNames: _col0, _col1, _col4, _col5 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col4 + type: int + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col4, _col5 + Select Operator + expressions: + expr: _col0 + type: int + expr: _col1 + type: string + expr: _col4 + type: int + expr: _col5 + type: string + outputColumnNames: _col0, _col1, _col2, _col3 + Limit + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1,_col2,_col3 + columns.types int:string:int:string + escape.delim \ + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Local Work: + Map Reduce Local Work + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: test_table3 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 16 + bucket_field_name key + columns key,value + columns.types int:string +#### A masked pattern was here #### + name default.test_table3 + numFiles 16 + numPartitions 0 + numRows 500 + rawDataSize 5312 + serialization.ddl struct test_table3 { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 16 + bucket_field_name key + columns key,value + columns.types int:string +#### A masked pattern was here #### + name default.test_table3 + numFiles 16 + numPartitions 0 + numRows 500 + rawDataSize 5312 + serialization.ddl struct test_table3 { i32 key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.test_table3 + name: default.test_table3 + Truncated Path -> Alias: + /test_table3 [a] + + Stage: Stage-0 + Fetch Operator + limit: 10 + + +PREHOOK: query: SELECT /*+mapjoin(b)*/ * FROM test_table3 a JOIN test_table4 b ON a.key = b.value LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@test_table3 +PREHOOK: Input: default@test_table4 +#### A masked pattern was here #### +POSTHOOK: query: SELECT /*+mapjoin(b)*/ * FROM test_table3 a JOIN test_table4 b ON a.key = b.value LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_table3 +POSTHOOK: Input: default@test_table4 +#### A masked pattern was here #### +POSTHOOK: Lineage: test_table1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table1.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table2.value EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: test_table4.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/queries/clientpositive/smb_mapjoin_13.q =================================================================== --- ql/src/test/queries/clientpositive/smb_mapjoin_13.q (revision 0) +++ ql/src/test/queries/clientpositive/smb_mapjoin_13.q (revision 0) @@ -0,0 +1,36 @@ +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; +set hive.enforce.bucketing=true; +set hive.enforce.sorting=true; +set hive.exec.reducers.max = 1; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +-- This test verifies that the sort merge join optimizer works when the tables are joined on columns with different names + +-- Create bucketed and sorted tables +CREATE TABLE test_table1 (key INT, value STRING) CLUSTERED BY (key) SORTED BY (key ASC) INTO 16 BUCKETS; +CREATE TABLE test_table2 (value INT, key STRING) CLUSTERED BY (value) SORTED BY (value ASC) INTO 16 BUCKETS; +CREATE TABLE test_table3 (key INT, value STRING) CLUSTERED BY (key, value) SORTED BY (key ASC, value ASC) INTO 16 BUCKETS; +CREATE TABLE test_table4 (key INT, value STRING) CLUSTERED BY (key, value) SORTED BY (value ASC, key ASC) INTO 16 BUCKETS; + +FROM src +INSERT OVERWRITE TABLE test_table1 SELECT * +INSERT OVERWRITE TABLE test_table2 SELECT * +INSERT OVERWRITE TABLE test_table3 SELECT * +INSERT OVERWRITE TABLE test_table4 SELECT *; + +-- Join data from 2 tables on their respective sorted columns (one each, with different names) and +-- verify sort merge join is used +EXPLAIN EXTENDED +SELECT /*+mapjoin(b)*/ * FROM test_table1 a JOIN test_table2 b ON a.key = b.value LIMIT 10; + +SELECT /*+mapjoin(b)*/ * FROM test_table1 a JOIN test_table2 b ON a.key = b.value LIMIT 10; + +-- Join data from 2 tables on their respective columns (two each, with the same names but sorted +-- with different priorities) and verify sort merge join is not used +EXPLAIN EXTENDED +SELECT /*+mapjoin(b)*/ * FROM test_table3 a JOIN test_table4 b ON a.key = b.value LIMIT 10; + +SELECT /*+mapjoin(b)*/ * FROM test_table3 a JOIN test_table4 b ON a.key = b.value LIMIT 10; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (revision 1406041) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (working copy) @@ -320,10 +320,10 @@ // compare the column names and the order with the first table/partition. for (int pos = 0; pos < sortCols.size(); pos++) { Order o = sortCols.get(pos); - if (!o.equals(sortColumnsFirstPartition.get(pos))) { + if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) { return false; } - sortColNames.add(sortColumnsFirstPartition.get(pos).getCol()); + sortColNames.add(o.getCol()); } // The column names and order (ascending/descending) matched