From 6e0e0e6c0515b1ba78444093352105f07bc8bb6c Mon Sep 17 00:00:00 2001 From: sparksfyz Date: Tue, 12 May 2020 11:50:54 +0800 Subject: [PATCH] HIVE-23438: Fix left out join in n-way hybridGraceHashjoin --- data/files/kv11.txt | 9 + data/scripts/q_test_init_tez.sql | 15 + pom.xml | 2 +- .../hadoop/hive/ql/exec/MapJoinOperator.java | 11 +- .../hive/ql/exec/vector/VectorMapJoinOperator.java | 4 +- .../clientpositive/hybridgrace_hashjoin_2.q | 53 +++ .../tez/hybridgrace_hashjoin_2.q.out | 367 +++++++++++++++++++++ 7 files changed, 455 insertions(+), 6 deletions(-) create mode 100644 data/files/kv11.txt diff --git a/data/files/kv11.txt b/data/files/kv11.txt new file mode 100644 index 0000000000..90c7b77164 --- /dev/null +++ b/data/files/kv11.txt @@ -0,0 +1,9 @@ +2381val_2381 +3111val_3111 +4011val_4011 +1501val_1501 +2731val_2731 +6611val_6611 +2131val_2131 +1461val_1461 +4061val_4061 \ No newline at end of file diff --git a/data/scripts/q_test_init_tez.sql b/data/scripts/q_test_init_tez.sql index 4e6176b836..cdc90578bb 100644 --- a/data/scripts/q_test_init_tez.sql +++ b/data/scripts/q_test_init_tez.sql @@ -26,6 +26,21 @@ ANALYZE TABLE src1 COMPUTE STATISTICS; ANALYZE TABLE src1 COMPUTE STATISTICS FOR COLUMNS key,value; + +-- +-- Table src2 +-- +DROP TABLE IF EXISTS src2; + +CREATE TABLE src2(key STRING COMMENT 'default', value STRING COMMENT 'default') STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH "${hiveconf:test.data.dir}/kv11.txt" OVERWRITE INTO TABLE src2; + +ANALYZE TABLE src2 COMPUTE STATISTICS; + +ANALYZE TABLE src2 COMPUTE STATISTICS FOR COLUMNS key,value; + + -- -- Table srcpart -- diff --git a/pom.xml b/pom.xml index 1d667226fb..ab772d6e97 100644 --- a/pom.xml +++ b/pom.xml @@ -1065,7 +1065,7 @@ ${test.warehouse.scheme}${test.warehouse.dir} true - src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase,cbo_t1,cbo_t2,cbo_t3,src_cbo,part,lineitem + src,src1,src2,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase,cbo_t1,cbo_t2,cbo_t3,src_cbo,part,lineitem ${test.conf.dir}/krb5.conf ${antlr.version} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 07aa2ea6a3..3b31aba8c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -212,6 +212,12 @@ protected void completeInitializationOp(Object[] os) throws HiveException { // we can't use the cached table because it has spilled. loadHashTable(getExecContext(), MapredContext.get()); + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + if (pos != conf.getPosBigTable()) { + firstSmallTable = (HybridHashTableContainer) mapJoinTables[pos]; + break; + } + } } else { if (LOG.isDebugEnabled()) { String s = "Using tables from cache: ["; @@ -439,7 +445,7 @@ public void process(Object row, int tag) throws HiveException { // postpone the join processing for this pair by also spilling this big table row. if (joinResult == JoinUtil.JoinResult.SPILL && !bigTableRowSpilled) { // For n-way join, only spill big table rows once - spillBigTableRow(mapJoinTables[pos], row); + spillBigTableRow(firstSmallTable, row, ((HybridHashTableContainer) mapJoinTables[pos]).getToSpillPartitionId()); bigTableRowSpilled = true; } } @@ -471,9 +477,8 @@ public void process(Object row, int tag) throws HiveException { * @param hybridHtContainer Hybrid hashtable container * @param row big table row */ - protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) throws HiveException { + protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row, int partitionId) throws HiveException { HybridHashTableContainer ht = (HybridHashTableContainer) hybridHtContainer; - int partitionId = ht.getToSpillPartitionId(); HashPartition hp = ht.getHashPartitions()[partitionId]; ObjectContainer bigTable = hp.getMatchfileObjContainer(); bigTable.add(row); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 4e05fa3ab5..e8fb9b5526 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -238,12 +238,12 @@ public void process(Object row, int tag) throws HiveException { } @Override - protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) + protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row, int partitionId) throws HiveException { // Extract the actual row from row batch VectorizedRowBatch inBatch = (VectorizedRowBatch) row; Object[] actualRow = getRowObject(inBatch, batchIndex); - super.spillBigTableRow(hybridHtContainer, actualRow); + super.spillBigTableRow(hybridHtContainer, actualRow, partitionId); } // Code borrowed from VectorReduceSinkOperator diff --git a/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_2.q b/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_2.q index b9923f61f3..e02b48d6c2 100644 --- a/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_2.q +++ b/ql/src/test/queries/clientpositive/hybridgrace_hashjoin_2.q @@ -153,5 +153,58 @@ JOIN src y2 ON (x.value = y2.value) WHERE z1.key < 'zzzzzzzz' AND z2.key < 'zzzzzzzzzz' AND y1.value < 'zzzzzzzz' AND y2.value < 'zzzzzzzzzz'; +-- 3-way mapjoin with left outer join (1 big table, 2 small tables) +SELECT 1; + +set hive.mapjoin.hybridgrace.hashtable=false; + +EXPLAIN +select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key; + +select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key; + +set hive.mapjoin.hybridgrace.hashtable=true; + +EXPLAIN +select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key; + +select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key; + reset hive.cbo.enable; diff --git a/ql/src/test/results/clientpositive/tez/hybridgrace_hashjoin_2.q.out b/ql/src/test/results/clientpositive/tez/hybridgrace_hashjoin_2.q.out index 6f5a3a96ca..5fda9cf082 100644 --- a/ql/src/test/results/clientpositive/tez/hybridgrace_hashjoin_2.q.out +++ b/ql/src/test/results/clientpositive/tez/hybridgrace_hashjoin_2.q.out @@ -1423,3 +1423,370 @@ POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 #### A masked pattern was here #### 18256 +PREHOOK: query: SELECT 1 +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +1 +PREHOOK: query: EXPLAIN +select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (BROADCAST_EDGE), Reducer 4 (BROADCAST_EDGE) + Reducer 4 <- Map 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 25 Data size: 2150 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 12 Data size: 1032 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 12 Data size: 1032 Basic stats: COMPLETE Column stats: COMPLETE + Map 3 + Map Operator Tree: + TableScan + alias: y + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 205 Data size: 17835 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 205 Data size: 17835 Basic stats: COMPLETE Column stats: COMPLETE + Map 5 + Map Operator Tree: + TableScan + alias: z + Statistics: Num rows: 9 Data size: 1620 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 9 Data size: 1620 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: value (type: string) + Reducer 2 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 12 Data size: 1032 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Left Outer Join0 to 1 + Inner Join 0 to 2 + keys: + 0 _col0 (type: string) + 1 key (type: string) + 2 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col6 + input vertices: + 1 Map 5 + 2 Reducer 4 + Statistics: Num rows: 7 Data size: 2471 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col6 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 7 Data size: 2471 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 7 Data size: 2471 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 205 Data size: 17835 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 205 Data size: 17835 Basic stats: COMPLETE Column stats: COMPLETE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src1 +PREHOOK: Input: default@src2 +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src1 +POSTHOOK: Input: default@src2 +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +128 NULL NULL 128 +146 146 1val_1461 146 +150 150 1val_1501 150 +213 213 1val_2131 213 +224 NULL NULL 224 +238 NULL NULL 238 +255 NULL NULL 255 +273 273 1val_2731 273 +278 NULL NULL 278 +311 NULL NULL 311 +369 NULL NULL 369 +401 401 1val_4011 401 +406 406 1val_4061 406 +66 66 11val_6611 66 +98 NULL NULL 98 +PREHOOK: query: EXPLAIN +select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (BROADCAST_EDGE), Reducer 4 (BROADCAST_EDGE) + Reducer 4 <- Map 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 25 Data size: 2150 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 12 Data size: 1032 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 12 Data size: 1032 Basic stats: COMPLETE Column stats: COMPLETE + Map 3 + Map Operator Tree: + TableScan + alias: y + Statistics: Num rows: 2000 Data size: 174000 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 205 Data size: 17835 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 205 Data size: 17835 Basic stats: COMPLETE Column stats: COMPLETE + Map 5 + Map Operator Tree: + TableScan + alias: z + Statistics: Num rows: 9 Data size: 1620 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 9 Data size: 1620 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: value (type: string) + Reducer 2 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 12 Data size: 1032 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Left Outer Join0 to 1 + Inner Join 0 to 2 + keys: + 0 _col0 (type: string) + 1 key (type: string) + 2 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col6 + input vertices: + 1 Map 5 + 2 Reducer 4 + Statistics: Num rows: 7 Data size: 2471 Basic stats: COMPLETE Column stats: COMPLETE + HybridGraceHashJoin: true + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col6 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 7 Data size: 2471 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 7 Data size: 2471 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 205 Data size: 17835 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 205 Data size: 17835 Basic stats: COMPLETE Column stats: COMPLETE + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src1 +PREHOOK: Input: default@src2 +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select * +from +( +select key from src1 group by key +) x +left join src2 z on x.key = z.key +join +( +select key from srcpart y group by key +) y on y.key = x.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src1 +POSTHOOK: Input: default@src2 +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +128 NULL NULL 128 +146 146 1val_1461 146 +150 150 1val_1501 150 +238 NULL NULL 238 +369 NULL NULL 369 +406 406 1val_4061 406 +273 273 1val_2731 273 +213 213 1val_2131 213 +401 401 1val_4011 401 +66 66 11val_6611 66 -- 2.13.0